Queue Consumer Scaling with AMQP

Queue Consumer Scaling with AMQP

Oracle OpenWorld is right around the corner, and we couldn’t be more excited. This year, for the first time, there will be a Modern CX Hackathon. How cool!

Being that we’re marketing geeks, the idea of a hackathon during the conference is something we think is a fantastic idea. As the largest producer of apps on the Oracle Cloud Marketplace we spend a TON of time figuring out creative solutions to problems we’re trying to solve as we work to extend the functionality and connectivity of the core platforms.

We thought we would highlight one of the technical challenges we recently solved for. Let me set-up the use case for some context.

When you’re building an app, one of the considerations you need to take into account is how many API calls you can make/receive in a given timeframe. Sometimes you run into situations where a flood of new jobs need to get processed. Simple math leads to “T” (time to complete a single unit of work) X “N” (total number of jobs) X “C” (concurrent number of jobs) = Total processing time. Easy enough. When you are looking at high throughput needs you either need to adjust “T”, “C” or “N” in order to keep the total time to process to a reasonable amount. For the purposes of this post, I will focus on solving for the “C” (and can address ways to reduce the “T” and “N” in a different post).

Let me give you a real world example in case that is too abstract. Let’s say you have NetSuite CRM and are integrating it with Oracle Eloqua. On a normal day, you process the normal field changes and everything is fine and dandy. But at the end of your fiscal year, or with a new territory re-alignment, you might need to push a large volume of changes through the integration and you want them to process quickly. How do you deal with these sorts of temporary high volume spikes so you don’t have to wait for days for the records to process and you don’t bog down the hardware supporting the software?

In the big data world we live in, the way to scale things is to add more hardware but, this is costly and not very practical for solving for temporary spikes. What I’m going to show you allows you to to use the same processing power you have in your existing hardware stack by shifting which applications you’re devoting processing power to. This is a new way to trigger an event to scale up and down to meet the needs of the spike time.

***At this point, this is going to get real geeky! Be forewarned.***

Assume that you are using a RabbitMQ server and that you have a single Java application to consume messages off a number of queues. To handle a load spike, you can change the number of thread consumers without shutting down your application (and without opening any new ports). Here’s how.

First you need some way to dynamically change the number of concurrent consumers without shutting down the existing consumers. Start by wrapping a SimpleMessageListenerContainer in a custom wrapper. This will give us a starting point for storing some details about the listener.

public class ScaleableMessageListener {
   private ConnectionFactory connectionFactory;
   private ChannelAwareMessageListener messageListener;
   private SimpleMessageListenerContainer listenerContainer;
   private String name;
   private Queue queue;
   private int concurrentConsumers;
}

Changing the number of concurrent consumers is as easy as changing the value on the listener container. The catch here is that listeners can’t have zero concurrent consumers. Scaling down to zero is important because we might want to stop processing of a single queue without affecting others. In that case we need to shut down the listener instead. Likewise, if the listener is currently not running, a non-zero number of concurrent consumers needs to trigger a startup. With a few simple checks, you can decide whether to start a new listener container, stop the existing listener container, or update the number of concurrent consumers for the existing listener container.

public void scaleConcurrentConsumers(int concurrentConsumers) {
   if (this.concurrentConsumers != concurrentConsumers) {
        this.concurrentConsumers = concurrentConsumers
        if (concurrentConsumers <= 0) {
              if (listenerContainer != null) {
                    stopListener();
                    listenerContainer = null;
              }
       } else {
             if (listenerContainer == null) {
                   start();
              } else {
                    listenerContainer.setConcurrentConsumers(concurrentConsumers);
              }
         }
     }
 }

Next you’ll need to implement start and stop functionality. Note that you need to call afterPropertiesSet on the listener container before starting. This will initialize the listener container so that you can start it.

public void start() {
         if (concurrentConsumers > 0 && listenerContainer == null) {
               listenerContainer = makeListenerContainer();
               listenerContainer.afterPropertiesSet();
               listenerContainer.start();
          }
      }

Stopping the listener will occupy the current thread until the listener is done processing all of its messages. This could take a while, and tie up processing. To get around that, we can spawn a new thread to wait for the shutdown to complete. Here you can see that we need both a stop and a shutdown call.

private void stop() {
      SimpleMessageListenerContainer oldListenerContainer = listenerContainer;
      Executors.newSingleThreadExecutor().submit(() -> {
               oldListenerContainer.stop();
               oldListenerContainer.shutdown(); 
       });
 }

You’ll also need a registry of queue listeners so that we can programmatically find and modify the listener we want to change. Instead of defining each listener as a Spring bean, define a single bean that is a map of listeners.

@Configuration
public class AmqpConfiguration {
       @Autowired private ConnectionFactory connectionFactory;
       @Autowired private MessageConverter messageConverter;
       @Autowired private Queue queue1;
       @Autowired private ChannelAwareMessageListener listener1;
       @Value("${CONCURRENT_CONSUMERS:0}") private int concurrentConsumers;

       @Bean public Map<String, ScaleableMessageListener> messageListeners() throws URISyntaxException {
                               Map<String, ScaleableMessageListener> rabbitmqListeners = new HashMap<>();
                               ScaleableMessageListener listener1 = makeListener1();
                               rabbitmqListeners.put(listener1.getName(), listener1);
                               return rabbitmqListeners;
                     }

                     private ScaleableMessageListener makeListener1() throws URISyntaxException {
                              String name = "listener1";
                             return new ScaleableMessageListener(name, queue1, concurrentConsumers,         
connectionFactory, listener1);
          }
      }

Taking away Spring’s control of the message listener containers means that listener containers won’t automatically start when the application starts. Fortunately Spring isn’t doing anything too magical here. We can create a small method to loop over all the listeners in the map, and then start them all. To make sure they start up when the application starts, we can leverage the @PostConstruct annotation.

   @PostConstruct public void startListeners() throws URISyntaxException {
             for (ScaleableMessageListener listenerContainer : messageListeners().values()) {
                       listenerContainer.start();
              }
      }

On to the final trick: triggering the change. Since your application is already consuming messages off a queue, then leverage the queue. There is no need to open up REST endpoints, set file listeners, or have scheduled polling jobs.

You already have access to the queue server, so make a new queue just for this purpose. You’ll need a new message listener to handle messages from this queue. Simply serialize some JSON into whatever object is convenient. Then call the method which you created to change the number of concurrent consumers.

public class ScaleEventMessageListener implements MessageListener {
       @Resource private Map<String, ScaleableMessageListener> messageListeners;
        private static final Gson GSON = new Gson();

        public void onMessage(Message message) {
                  try {
                         String body = String.valueOf(message.getBody());
                         ScaleEvent scaleEvent = GSON.fromJson(body, ScaleEvent.class);
                         ScaleableMessageListener listener = messageListeners.get(scaleEvent.getListenerName());
                         if (listener != null) {

listener.scaleConcurrentConsumers(scaleEvent.getConcurrentConsumers());
                                 }
                       } catch (Exception e) {
                              e.printStackTrace();
                       }
                }
         }

Now you can publish a message onto a queue and have another queue dynamically change the number of consumers, starting and stopping if necessary. How your application publishes to the queue is up to you. You can even use RabbitMQ’s admin console and control it only as needed.

Granted, scaling could be done any number of ways, and the use of a queue is not required, but this very well might be a good fit for certain small-to-medium scale applications. Note that the examples here are stripped down and simplified versions of a more complicated implementation, which accounts for more than one JVM running simultaneously.

tl;dr Read messages off a queue to change consumers. Complete examples:

ScaleableMessageListener.java

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import java.util.concurrent.Executors;
public class ScaleableMessageListener {
        private ConnectionFactory connectionFactory;
        private ChannelAwareMessageListener messageListener;
        private SimpleMessageListenerContainer listenerContainer;
        private String name;
        private Queue queue;
        private int concurrentConsumers = 0;

        public ScaleableMessageListener(String name, Queue queue, int concurrentConsumers,
                                        ConnectionFactory connectionFactory, 
ChannelAwareMessageListener messageListener) {
       this.name = name;
       this.queue = queue;
       this.concurrentConsumers = concurrentConsumers;
       this.connectionFactory = connectionFactory;
       this.messageListener = messageListener;
   }
   
   public void start() {
       if (concurrentConsumers > 0 && listenerContainer == null) {
           listenerContainer = makeListenerContainer();
           listenerContainer.afterPropertiesSet();
           listenerContainer.start();
       }
   }
   public void scaleConcurrentConsumers(int concurrentConsumers) {
       if (this.concurrentConsumers != concurrentConsumers) {
           this.concurrentConsumers = concurrentConsumers;
           if (concurrentConsumers <= 0) {
               if (listenerContainer != null) {
                   stop();
                   listenerContainer = null;
               }
           } else {
               if (listenerContainer == null) {
                   start();
              } else {
                   listenerContainer.setConcurrentConsumers(concurrentConsumers);
              }
          }
       }
   }

   public String getName() {
       return name;
   }

   private void stop() {
       SimpleMessageListenerContainer oldListenerContainer = listenerContainer;
       Executors.newSingleThreadExecutor().submit(() -> {
           oldListenerContainer.stop();
           oldListenerContainer.shutdown();
       });
   }

   private SimpleMessageListenerContainer makeListenerContainer() {
       SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
              listenerContainer.setConnectionFactory(connectionFactory);
              listenerContainer.setQueues(queue);
              listenerContainer.setConcurrentConsumers(concurrentConsumers);
              listenerContainer.setMessageListener(messageListener);
              return listenerContainer;
           }
       }

ScaleEventMessageListener.java

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import javax.annotation.Resource;
import java.util.Map;

public class ScaleEventMessageListener implements MessageListener {
    @Resource private Map<String, ScaleableMessageListener> messageListeners;
    private static final Gson GSON = new Gson();

    public void onMessage(Message message) {
        try {
            String body = String.valueOf(message.getBody());
            ScaleEvent scaleEvent = GSON.fromJson(body, ScaleEvent.class);
            ScaleableMessageListener listener = 
messageListeners.get(scaleEvent.getListenerName());
           if (listener != null) {
               listener.scaleConcurrentConsumers(scaleEvent.getConcurrentConsumers());
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

AmqpConfiguration.java

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import javax.annotation.PostConstruct;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

@Configuration
@Import(SomeOtherConfiguration.class)
public class AmqpConfiguration {
   @Autowired private ConnectionFactory connectionFactory;
   @Autowired private MessageConverter messageConverter;
   @Autowired private Queue queue1;
   @Autowired private ChannelAwareMessageListener listener1;
   @Autowired private Queue queue2;
   @Autowired private ChannelAwareMessageListener listener2;
   @Value("${CONCURRENT_CONSUMERS:0}") private int concurrentConsumers;

   @Bean public Map<String, ScaleableMessageListener> messageListeners() throws URISyntaxException {
       Map<String, ScaleableMessageListener> rabbitmqListeners = new HashMap<>();
       ScaleableMessageListener listener1 = makeListener1();
       ScaleableMessageListener listener2 = makeListener2();
       rabbitmqListeners.put(listener1.getName(), listener1);
       rabbitmqListeners.put(listener2.getName(), listener2);
       return rabbitmqListeners;
   }

   @PostConstruct public void startListeners() throws URISyntaxException {
       for (ScaleableMessageListener listenerContainer : messageListeners().values()) {
           listenerContainer.start();
       }
   }

   private ScaleableMessageListener makeListener1() throws URISyntaxException {
       String name = "listener1";
       return new ScaleableMessageListener(name, queue1, concurrentConsumers, connectionFactory, listener1);
   }

   private ScaleableMessageListener makeListener2() throws URISyntaxException {
       String name = "listener2";
       return new ScaleableMessageListener(name, queue2, concurrentConsumers, connectionFactory, listener2);
   }
}

Well, that’s how we solved for this problem. If it went over your head, hopefully you can pass this along to your dev team. They will be able to implement this within applications they’re building or give it to them to jump start the upcoming hackathon.

We look forward to seeing some of the cool stuff that comes out of San Francisco during OpenWorld. As the new creations come to life, we hope this blog helps fuel some creativity around scaling for large volume spikes.

Don’t forget, if you are looking for an app to extend the functionality of any of the applications in the Oracle Marketing Cloud, please contact us. We would love to help you out.

By | 2017-10-25T02:36:48+00:00 September 13th, 2017|Let's Get Geeky (Development)|0 Comments

About the Author:

Benjamin joined Relationship One's product development team in early 2017. He adheres the philosophy of "best idea wins" and prefers tabs over spaces. Away from software he enjoys writing music, working on his novel, growing edible plants, telling dad jokes, and playing games of all types--and being interrupted by cats, apparently.


Thank you for subscribing!
Subscribe to our Thought Leadership Today