Spring AMQP in Reactive Applications

1. Overview

This tutorial shows how to create a simple Spring Boot Reactive Application that integrates with the RabbitMQ messaging server, a popular implementation of the AMQP messaging standard.

We cover both – point-to-point and publish-subscribe scenarios – using a distributed setup that highlights the differences between both patterns.

Note that we assume a basic knowledge of AMQP, RabbitMQ and Spring Boot, in particular, key concepts such as Exchanges, Queues, Topics and so forth. More information about those concepts can be found in the links below:

  • Messaging With Spring AMQP
  • Introduction to RabbitMQ

2. RabbitMQ Server Setup

Although we could set up a local RabbitMQ locally, in practice, we’re more likely to use a dedicated installation with additional features such as high-availability, monitoring, security, etc.

In order to simulate such environment in our development machine, we’ll use Docker to create a server that our application will use.

The following command will start a standalone RabbitMQ server:

$ docker run -d --name rabbitmq -p 5672:5672 rabbitmq:3

We don’t declare any persistent volume, so unread messages will be lost between restarts. The service will be available at port 5672 on the host.

We can check server logs with the docker logs command, which should produce an output such as this:

$ docker logs rabbitmq
2018-06-09 13:42:29.718 [info] <0.33.0>
  Application lager started on node rabbit@rabbit
// ... some lines omitted
2018-06-09 13:42:33.491 [info] <0.226.0>
 Starting RabbitMQ 3.7.5 on Erlang 20.3.5
 Copyright (C) 2007-2018 Pivotal Software, Inc.
 Licensed under the MPL.  See http://www.rabbitmq.com/

  ##  ##
  ##  ##      RabbitMQ 3.7.5. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: <stdout>

              Starting broker...
2018-06-09 13:42:33.494 [info] <0.226.0>
 node           : rabbit@rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : CY9rzUYh03PK3k6DJie09g==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@rabbit

// ... more log lines

Since the image includes the rabbitmqctl utility, we can use it in order to execute administrative tasks in the context of our running image.

For instance, we can get server status information with the following command:

$ docker exec rabbitmq rabbitmqctl status
Status of node rabbit@rabbit ...
[{pid,299},
 {running_applications,
     [{rabbit,"RabbitMQ","3.7.5"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.7.5"},
// ... other info omitted for brevity

Other useful commands include:

  • list_exchanges:  List all declared Exchanges
  • list_queues:  List all declared Queues, including the number of unread messages
  • list_bindings:  List all defines Bindings between exchanges and queues, also including routing keys

3. Spring AMQP Project Setup

Once we have our RabbitMQ server up and running, we can move on to create our Spring project. This sample project will allow any REST client to post and/or receive messages to the messaging server, using the Spring AMQP module and the corresponding Spring Boot starter in order to communicate with it.

The main dependencies we need to add to our pom.xml project file are:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>2.0.2.RELEASE</version> 
</dependency>

The spring-boot-starter-amqp brings all AMQP-related stuff whereas the spring-boot-starter-webflux is the core dependency used to implement our reactive REST server.

Note: you can check the latest version of the Spring Boot Starter AMQP and Webflux modules on Maven Central.

4. Scenario 1: Point-to-Point Messaging

Is this first scenario, we’ll use a Direct Exchange, which is the logical entity in the broker to that receives messages from clients.

A Direct Exchange will route all incoming messages to one – and only one – queue, from which it will be available for consumption by clients. Multiple clients can subscribe to the same queue, but only one will receive a given message.

4.1. Exchange and Queues Setup

In our scenario, we use a DestinationInfo object that encapsulates the exchange name and routing key. A map keyed by destination name will be used to store all available destinations.

The following @PostConstruct method will be responsible for this initial setup:

@Autowired
private AmqpAdmin amqpAdmin;
    
@Autowired
private DestinationsConfig destinationsConfig;

@PostConstruct
public void setupQueueDestinations() {
    destinationsConfig.getQueues()
        .forEach((key, destination) -> {
            Exchange ex = ExchangeBuilder.directExchange(
              destination.getExchange())
              .durable(true)
              .build();
            amqpAdmin.declareExchange(ex);
            Queue q = QueueBuilder.durable(
              destination.getRoutingKey())
              .build();
            amqpAdmin.declareQueue(q);
            Binding b = BindingBuilder.bind(q)
              .to(ex)
              .with(destination.getRoutingKey())
              .noargs();
            amqpAdmin.declareBinding(b);
        });
}

This method uses the adminAmqp bean created by Spring to declare Exchanges, Queues and bind them together using a given routing key.

All destinations come from a DestinationsConfig bean, which is a @ConfigurationProperties class used in our example.

This class has a property that is populated with DestinationInfo objects built from mappings read from the application.yml configuration file.

4.2. Producer Endpoint

Producers will send messages by sending an HTTP POST to the /queue/{name} location.

This is a reactive endpoint, so we use a Mono to return a simple acknowledgment:

@SpringBootApplication
@EnableConfigurationProperties(DestinationsConfig.class)
@RestController
public class SpringWebfluxAmqpApplication {
 
    // ... other members omitted
 
    @Autowired
    private AmqpTemplate amqpTemplate;

    @PostMapping(value = "/queue/{name}")
    public Mono<ResponseEntity<?>> sendMessageToQueue(
      @PathVariable String name, @RequestBody String payload) {

        DestinationInfo d = destinationsConfig
          .getQueues().get(name);
        if (d == null) {
            return Mono.just(
              ResponseEntity.notFound().build());
        }
    
        return Mono.fromCallable(() -> {
            amqpTemplate.convertAndSend(
              d.getExchange(), 
              d.getRoutingKey(), 
              payload);  
            return ResponseEntity.accepted().build();
        });
    }

We first check if the name parameter corresponds to a valid destination and if so, we use the autowired amqpTemplate instance to actually send out the payload – a simple String message – to RabbitMQ.

4.3. MessageListenerContainer Factory

In order to receive messages asynchronously, Spring AMQP uses a MessageContainerListener abstract class that mediates the information flow from AMQP Queues and listeners provided by an application.

Since we need a concrete implementation of this class in order to attach our message listeners, we define a factory that isolates the controller code from its actual implementation.

In our case, the factory method returns a new SimpleMessageContainerListener every time we call its createMessageListenerContainer method:

@Component
public class MessageListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    public MessageListenerContainerFactory() {}

    public MessageListenerContainer createMessageListenerContainer(String queueName) {
        SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(connectionFactory);
        mlc.addQueueNames(queueName);
        return mlc;
    }
}

4.4. Consumer Endpoint

Consumers will access the same endpoint address used by producers (/queue/{name}) to get messages.

This endpoint returns a Flux of events, where each event corresponds to a received message:

@Autowired
private MessageListenerContainerFactory messageListenerContainerFactory;

@GetMapping(
  value = "/queue/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromQueue(@PathVariable String name) {

    DestinationInfo d = destinationsConfig
      .getQueues()
      .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
          .build());
    }

    MessageListenerContainer mlc = messageListenerContainerFactory
      .createMessageListenerContainer(d.getRoutingKey());

    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            mlc.stop();
        });
      });

    return Flux.interval(Duration.ofSeconds(5))
      .map(v -> "No news is good news")
      .mergeWith(f);
}

After the initial check on the destination name, the consumer endpoint creates MessageListenerContainer using the MessageListenerContainerFactory and the queue name recovered from our registry.

Once we have our MessageListenerContainer, we create the message Flux using one of its create() builder methods.

In our particular case, we use one that takes a lambda taking a FluxSink argument, which we then use to bridge Spring AMQP´s listener-based async API to our reactive application.

We also attach two additional lambdas to the emitter´s onRequest() and onDispose() callbacks so our MessageListenerContainer can allocate/release its internal resources following the Flux´s lifecycle.

Finally, we merge the resulting Flux with another one created with interval(), which creates a new event every five seconds. Those dummy messages play an important function in our case: without them, we’d only detect a client disconnection upon receiving a message and failing to send it, which can take a long time depending on your particular use case.

4.5. Testing

With both our consumer and publisher endpoints setup, we can now do some tests with our sample application.

We need to define RabbitMQ´s server connection details and at least one destination on our application.yml, which should look like this:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    
destinations:
  queues:
    NYSE:
      exchange: nyse
      routing-key: NYSE

The spring.rabbitmq.* properties define the basic properties required to connect to our RabbitMQ server running in a local Docker container. Please note that the IP shown above is just an example an may be different in a particular setup.

Queues are defines using destinations.queues.<name>.*, where <name> is used as the destination name. Here we declared a single destination named “NYSE” that will send messages to the “nyse” exchange on RabbitMQ with an “NYSE” routing key.

Once we start the server via command line or from our IDE, we can start sending and receiving messages. We’ll use the curl utility, a common utility available for both Windows, Mac & Linux OSs.

The following listing shows how to send a message to our destination and the expected response from the server:

$ curl -v -d "Test message" http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 12
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 12 out of 12 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

After executing this command we can verify that the message was received by RabbitMQ and is ready for consumption issuing the following command:

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    1

Now we can read messages with curl with the following command:

$ curl -v http://localhost:8080/queue/NYSE
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /queue/NYSE HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:Test message

data:No news is good news...

... same message repeating every 5 secs

As we can see, first we get the previously stored message and then we start to receive our dummy message every 5 seconds.

If we run again the command to list queues we can now see that there are no messages stored:

$ docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
NYSE    0

5. Scenario 2: Publish-Subscribe

Another common scenario for messaging applications is the Publish-Subscribe pattern, where a single message must be sent to multiple consumers.

RabbitMQ offers two types of exchanges that support those kinds of applications:  Fan-out and Topic.

The main difference between those two kinds is that the latter allows us to filter which messages to receive based on a routing key pattern (e.g. “alarm.mailserver.*”) supplied at registration time, whereas the former simply replicate incoming messages to all bound queues.

RabbitMQ also supports Header Exchanges, which allows for more complex message filtering, but its use is out of the scope of this article.

5.1. Destinations Setup

We define Pub/Sub destinations at startup time with another @PostConstruct method, as we did in the point-to-point scenario.

The only difference is that we only create the Exchanges, but no Queues –  those will be created on demand and bound to the Exchange later, as we want an exclusive Queue for each client:

@PostConstruct
public void setupTopicDestinations(
    destinationsConfig.getTopics()
      .forEach((key, destination) -> {
          Exchange ex = ExchangeBuilder
            .topicExchange(destination.getExchange())
            .durable(true)
            .build();
            amqpAdmin.declareExchange(ex);
      });
}

5.2. Publisher Endpoint

Clients will use the publisher endpoint available at the /topic/{name} location in order to post messages that will be sent to all connected clients.

As in the previous scenario, we use a @PostMapping that returns a Mono with the status after sending the message:

@PostMapping(value = "/topic/{name}")
public Mono<ResponseEntity<?>> sendMessageToTopic(
  @PathVariable String name, @RequestBody String payload) {

    DestinationInfo d = destinationsConfig
      .getTopics()
      .get(name);
    
    if (d == null) {
        return Mono.just(ResponseEntity.notFound().build());
    }      
    
   return Mono.fromCallable(() -> {
       amqpTemplate.convertAndSend(
         d.getExchange(), d.getRoutingKey(),payload);   
            return ResponseEntity.accepted().build();
        });
    }

5.3. Subscriber Endpoint

Our subscriber endpoint will be located at /topic/{name}, producing a Flux of messages for connected clients.

Those messages include both the received messages and dummy messages generated every 5 seconds:

@GetMapping(
  value = "/topic/{name}",
  produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> receiveMessagesFromTopic(@PathVariable String name) {
    DestinationInfo d = destinationsConfig.getTopics()
        .get(name);
    if (d == null) {
        return Flux.just(ResponseEntity.notFound()
            .build());
    }
    Queue topicQueue = createTopicQueue(d);
    String qname = topicQueue.getName();
    MessageListenerContainer mlc = messageListenerContainerFactory.createMessageListenerContainer(qname);
    Flux<String> f = Flux.<String> create(emitter -> {
        mlc.setupMessageListener((MessageListener) m -> {
            String payload = new String(m.getBody());
            emitter.next(payload);
        });
        emitter.onRequest(v -> {
            mlc.start();
        });
        emitter.onDispose(() -> {
            amqpAdmin.deleteQueue(qname);
            mlc.stop();
        });            
      });
    
    return Flux.interval(Duration.ofSeconds(5))
        .map(v -> "No news is good news")
        .mergeWith(f);
}

This code is basically the same as we’ve seen in the previous case, with only the following differences: first, we create a new Queue for every new subscriber.

We do that by a call to the createTopicQueue() method, which uses information from the DestinationInfo instance to create an exclusive, non-durable queue, that we then bind to the Exchange using the configured routing key:

private Queue createTopicQueue(DestinationInfo destination) {

    Exchange ex = ExchangeBuilder
      .topicExchange(destination.getExchange())
      .durable(true)
      .build();
    amqpAdmin.declareExchange(ex);
    Queue q = QueueBuilder
      .nonDurable()
      .build();     
    amqpAdmin.declareQueue(q);
    Binding b = BindingBuilder.bind(q)
      .to(ex)
      .with(destination.getRoutingKey())
      .noargs();        
    amqpAdmin.declareBinding(b);
    return q;
}

Note that, despite the fact we declare the Exchange again, RabbitMQ won’t create a new one, since we have already declared it at startup time.

The second difference is in the lambda that we pass to the onDispose() method, which this time will also delete the Queue when the subscriber disconnects.

5.3. Testing

In order to test the Pub-Sub scenario we must first define a topic destination in out application.yml like this:

destinations:
## ... queue destinations omitted      
  topics:
    weather:
      exchange: alerts
      routing-key: WEATHER

Here, we’ve defined a topic endpoint that will be available at the /topic/weather location. This endpoint will be used to post messages to the “alerts” exchange on RabbitMQ with a “WEATHER” routing key.

After starting out server we can verify that the exchange has been created using the rabbitmqctl command:

$ docker exec docker_rabbitmq_1 rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.topic       topic
amq.fanout      fanout
amq.match       headers
amq.headers     headers
        direct
amq.rabbitmq.trace      topic
amq.direct      direct
alerts  topic

Now, if we issue the list_bindings command, we can see that there are no queues related to the “alerts” exchange:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        NYSE    queue   NYSE    []
nyse    exchange        NYSE    queue   NYSE    []

Let´s start a couple of subscribers that will subscribe to our destination, by opening two command shells and issuing the following command on each one:

$ curl -v http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
<
data:No news is good news...

# ... same message repeating indefinitely

Finally, we use curl once again to send some alerts to our subscribers:

$ curl -v -d "Hurricane approaching!" http://localhost:8080/topic/weather
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> POST /topic/weather HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.49.1
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 22 out of 22 bytes
< HTTP/1.1 202 Accepted
< content-length: 0
<
* Connection #0 to host localhost left intact

Once we send the message, we can almost instantly see the message “Hurricane approaching !” on each subscriber´s shell.

If we check now the available bindings, we can see that we have one queue for each subscriber:

$ docker exec rabbitmq rabbitmqctl list_bindings
Listing bindings for vhost /...
        exchange        IBOV    queue   IBOV    []
        exchange        NYSE    queue   NYSE    []
        exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g       
  queue   spring.gen-i0m0pbyKQMqpz2_KFZCd0g       []
        exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ       
  queue   spring.gen-wCHALTsIS1q11PQbARJ7eQ       []
alerts  exchange        spring.gen-i0m0pbyKQMqpz2_KFZCd0g     
  queue   WEATHER []
alerts  exchange        spring.gen-wCHALTsIS1q11PQbARJ7eQ     
  queue   WEATHER []
ibov    exchange        IBOV    queue   IBOV    []
nyse    exchange        NYSE    queue   NYSE    []
quotes  exchange        NYSE    queue   NYSE    []

Once we hit Ctrl-C on the subscriber´s shell, our gateway will eventually detect that the client has disconnected and will remove those bindings.

6. Conclusion

In this article, we’ve demonstrated how to create a simple reactive application that interacts with a RabbitMQ server using the spring-amqp module.

With just a few lines of code, we were able to create a functional HTTP-to-AMQP gateway that supports both Point-to-Point and Publish-Subscribe integration patterns, which we can easily extend to add additional features such as security by the addition of standard Spring features.

The code shown in this article is available over on Github.

Related posts:

Spring Boot: Customize Whitelabel Error Page
Introduction to Spring Data JPA
Java Program to Implement Find all Back Edges in a Graph
Spring Boot - Tracing Micro Service Logs
Java Program to Implement Park-Miller Random Number Generation Algorithm
Spring Cloud – Securing Services
Java Program to Implement ArrayList API
ExecutorService – Waiting for Threads to Finish
Các nguyên lý thiết kế hướng đối tượng – SOLID
How to Get a Name of a Method Being Executed?
How to Use if/else Logic in Java 8 Streams
Java Program to Implement Multi-Threaded Version of Binary Search Tree
Java Program to Implement Hash Tables Chaining with Doubly Linked Lists
Java Program to Implement Dijkstra’s Algorithm using Priority Queue
Getting the Size of an Iterable in Java
Java Program to Implement Hash Tables with Double Hashing
Guide to the Volatile Keyword in Java
Java Program to Check the Connectivity of Graph Using BFS
Java Program to Find Transpose of a Graph Matrix
Java Program to Check if a Given Binary Tree is an AVL Tree or Not
Java Program to Find Median of Elements where Elements are Stored in 2 Different Arrays
Java Map With Case-Insensitive Keys
Java Program to do a Breadth First Search/Traversal on a graph non-recursively
Reactive Flow with MongoDB, Kotlin, and Spring WebFlux
Quick Guide to Spring Controllers
Spring Webflux with Kotlin
Java Program to Find the Peak Element of an Array O(n) time (Naive Method)
Java Program to Implement the Program Used in grep/egrep/fgrep
Setting a Request Timeout for a Spring REST API
Java Program to Perform Sorting Using B-Tree
Marker Interface trong Java
Spring Boot Change Context Path