Introduction to Project Reactor Bus

1. Overview

In this quick article, we’ll introduce the reactor-bus by setting up a real-life scenario for a reactive, event-driven application.

NOTE: The reactor-bus project has been removed in Reactor 3.x: Archived reactor-bus repository

2. The Basics of Project Reactor

2.1. Why Reactor?

Modern applications need to deal with a huge number of concurrent requests and process a significant amount of data. Standard, blocking code is no longer sufficient to fulfill these requirements.

The reactive design pattern is an event-based architectural approach for asynchronous handling of a large volume of concurrent service requests coming from single or multiple service handlers.

The Project Reactor is based on this pattern and has a clear and ambitious goal of building non-blocking, reactive applications on the JVM.

2.2. Example Scenarios

Before we get started, here are a few interesting scenarios where leveraging the reactive architectural style would make sense, just to get an idea of where we might apply it:

  • Notification services for a large online shopping platform like Amazon
  • Huge transaction processing services for the banking sector
  • Stocks trading businesses where stocks’ prices change simultaneously

3. Maven Dependencies

Let’s start to use Project Reactor Bus by adding the following dependency into our pom.xml:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bus</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>

We can check the latest version of reactor-bus in Maven Central.

4. Building a Demo Application

To better understand the benefits of the reactor-based approach, let’s look at a practical example.

We’ll build a simple application responsible for sending notifications to the users of an online shopping platform. For example, if a user places a new order, then the app sends an order confirmation via email or SMS.

A typical synchronous implementation would naturally be limited by the email or SMS service’s throughput. Therefore, traffic spikes, such as holidays would generally be problematic.

With a reactive approach, we can design our system to be more flexible and to adapt better to failures or timeouts that may occur in the external systems, such as gateway servers.

Let’s have a look at the application – starting with the more traditional aspects and moving on to the more reactive constructs.

4.1. Simple POJO

First, let’s create a POJO class to represent the notification data:

public class NotificationData {
	
    private long id;
    private String name;
    private String email;
    private String mobile;
    
    // getter and setter methods
}

4.2. The Service Layer

Let’s now define a simple service layer:

public interface NotificationService {

    void initiateNotification(NotificationData notificationData) 
      throws InterruptedException;

}

And the implementation, simulating a long-running operation:

@Service
public class NotificationServiceimpl implements NotificationService {
	
    @Override
    public void initiateNotification(NotificationData notificationData) 
      throws InterruptedException {

      System.out.println("Notification service started for "
        + "Notification ID: " + notificationData.getId());
		
      Thread.sleep(5000);
		
      System.out.println("Notification service ended for "
        + "Notification ID: " + notificationData.getId());
    }
}

Notice that to illustrate a real-life scenario of sending messages via an SMS or email gateway, we’re intentionally introducing a five seconds delay in the initiateNotification method with Thread.sleep(5000).

Consequently, when a thread hits the service, it’ll be blocked for five seconds.

4.3. The Consumer

Let’s now jump into the more reactive aspects of our application and implement a consumer – which we’ll then map to the reactor event bus:

@Service
public class NotificationConsumer implements 
  Consumer<Event<NotificationData>> {

    @Autowired
    private NotificationService notificationService;
	
    @Override
    public void accept(Event<NotificationData> notificationDataEvent) {
        NotificationData notificationData = notificationDataEvent.getData();
        
        try {
            notificationService.initiateNotification(notificationData);
        } catch (InterruptedException e) {
            // ignore        
        }	
    }
}

As we can see, the consumer we created implements the Consumer<T> interface. The main logic resides in the accept method.

This is a similar approach we can meet in a typical Spring listener implementation.

4.4. The Controller

Finally, now that we’re able to consume the events, let’s also generate them.

We’re going to do that in a simple controller:

@Controller
public class NotificationController {

    @Autowired
    private EventBus eventBus;

    @GetMapping("/startNotification/{param}")
    public void startNotification(@PathVariable Integer param) {
        for (int i = 0; i < param; i++) {
            NotificationData data = new NotificationData();
            data.setId(i);

            eventBus.notify("notificationConsumer", Event.wrap(data));

            System.out.println(
              "Notification " + i + ": notification task submitted successfully");
        }
    }
}

This is quite self-explanatory – we’re emitting events through the EventBus here.

For example, if a client hits the URL with a param value of ten, then ten events will be sent through the event bus.

4.5. The Java Config

Let’s now put everything together and create a simple Spring Boot application.

First, we need to configure EventBus and Environment beans:

@Configuration
public class Config {

    @Bean
    public Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);
    }
}

In our case, we’re instantiating the EventBus with a default thread pool available in the environment.

Alternatively, we can use a customized Dispatcher instance:

EventBus evBus = EventBus.create(
  env, 
  Environment.newDispatcher(
    REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,   
    DispatcherType.THREAD_POOL_EXECUTOR));

Now, we’re ready to create a main application code:

import static reactor.bus.selector.Selectors.$;

@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {

    @Autowired
    private EventBus eventBus;

    @Autowired
    private NotificationConsumer notificationConsumer;

    @Override
    public void run(String... args) throws Exception {
        eventBus.on($("notificationConsumer"), notificationConsumer);
    }

    public static void main(String[] args) {
        SpringApplication.run(NotificationApplication.class, args);
    }
}

In our run method we’re registering the notificationConsumer to be triggered when the notification matches a given selector.

Notice how we’re using the static import of the $ attribute to create a Selector object.

5. Test the Application

Let’s now create a test to see our NotificationApplication in action:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {

    @LocalServerPort
    private int port;

    @Test
    public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
    }
}

As we can see, as soon as the request is executed, all ten tasks get submitted instantly without creating any blocking. And once submitted, the notification events get processed in parallel.

Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9

It’s important to keep in mind that in our scenario there’s no need to process these events in any particular order.

6. Conclusion

In this quick tutorial, we’ve created a simple event-driven application. We’ve also seen how to start writing a more reactive and non-blocking code.

However, this scenario just scratches the surface of the subject and represents just a good base to start experimenting with the reactive paradigm.

As always, the source code is available over on GitHub.

Related posts:

Java Program to Implement Floyd-Warshall Algorithm
Java Program to Find Nearest Neighbor for Static Data Set
A Quick Guide to Spring Cloud Consul
Apache Commons Collections SetUtils
Java Program to Implement Min Hash
Java Program to Find Strongly Connected Components in Graphs
Java Program to Perform Inorder Recursive Traversal of a Given Binary Tree
Java Program to Implement Flood Fill Algorithm
Java Program to Construct an Expression Tree for an Infix Expression
A Custom Media Type for a Spring REST API
Xử lý ngoại lệ trong Java (Exception Handling)
Merging Streams in Java
Java – Reader to InputStream
Java Program to Test Using DFS Whether a Directed Graph is Strongly Connected or Not
Introduction to the Functional Web Framework in Spring 5
Java Program to Implement Affine Cipher
Beans and Dependency Injection
@Lookup Annotation in Spring
Split a String in Java
Java Program to Implement Dijkstra’s Algorithm using Set
Java Program to Find the Number of Ways to Write a Number as the Sum of Numbers Smaller than Itself
Từ khóa throw và throws trong Java
XML Serialization and Deserialization with Jackson
Java Program to do a Breadth First Search/Traversal on a graph non-recursively
Java Program to Perform Partial Key Search in a K-D Tree
New Features in Java 11
Một số từ khóa trong Java
Inject Parameters into JUnit Jupiter Unit Tests
Java – Random Long, Float, Integer and Double
Automatic Property Expansion with Spring Boot
Java Program to Implement VList
Comparing getPath(), getAbsolutePath(), and getCanonicalPath() in Java