Table of Contents
1. Overview
In this quick article, we’ll introduce the reactor-bus by setting up a real-life scenario for a reactive, event-driven application.
NOTE: The reactor-bus project has been removed in Reactor 3.x: Archived reactor-bus repository
2. The Basics of Project Reactor
2.1. Why Reactor?
Modern applications need to deal with a huge number of concurrent requests and process a significant amount of data. Standard, blocking code is no longer sufficient to fulfill these requirements.
The reactive design pattern is an event-based architectural approach for asynchronous handling of a large volume of concurrent service requests coming from single or multiple service handlers.
The Project Reactor is based on this pattern and has a clear and ambitious goal of building non-blocking, reactive applications on the JVM.
2.2. Example Scenarios
Before we get started, here are a few interesting scenarios where leveraging the reactive architectural style would make sense, just to get an idea of where we might apply it:
- Notification services for a large online shopping platform like Amazon
- Huge transaction processing services for the banking sector
- Stocks trading businesses where stocks’ prices change simultaneously
3. Maven Dependencies
Let’s start to use Project Reactor Bus by adding the following dependency into our pom.xml:
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bus</artifactId> <version>2.0.8.RELEASE</version> </dependency>
We can check the latest version of reactor-bus in Maven Central.
4. Building a Demo Application
To better understand the benefits of the reactor-based approach, let’s look at a practical example.
We’ll build a simple application responsible for sending notifications to the users of an online shopping platform. For example, if a user places a new order, then the app sends an order confirmation via email or SMS.
A typical synchronous implementation would naturally be limited by the email or SMS service’s throughput. Therefore, traffic spikes, such as holidays would generally be problematic.
With a reactive approach, we can design our system to be more flexible and to adapt better to failures or timeouts that may occur in the external systems, such as gateway servers.
Let’s have a look at the application – starting with the more traditional aspects and moving on to the more reactive constructs.
4.1. Simple POJO
First, let’s create a POJO class to represent the notification data:
public class NotificationData { private long id; private String name; private String email; private String mobile; // getter and setter methods }
4.2. The Service Layer
Let’s now define a simple service layer:
public interface NotificationService { void initiateNotification(NotificationData notificationData) throws InterruptedException; }
And the implementation, simulating a long-running operation:
@Service public class NotificationServiceimpl implements NotificationService { @Override public void initiateNotification(NotificationData notificationData) throws InterruptedException { System.out.println("Notification service started for " + "Notification ID: " + notificationData.getId()); Thread.sleep(5000); System.out.println("Notification service ended for " + "Notification ID: " + notificationData.getId()); } }
Notice that to illustrate a real-life scenario of sending messages via an SMS or email gateway, we’re intentionally introducing a five seconds delay in the initiateNotification method with Thread.sleep(5000).
Consequently, when a thread hits the service, it’ll be blocked for five seconds.
4.3. The Consumer
Let’s now jump into the more reactive aspects of our application and implement a consumer – which we’ll then map to the reactor event bus:
@Service public class NotificationConsumer implements Consumer<Event<NotificationData>> { @Autowired private NotificationService notificationService; @Override public void accept(Event<NotificationData> notificationDataEvent) { NotificationData notificationData = notificationDataEvent.getData(); try { notificationService.initiateNotification(notificationData); } catch (InterruptedException e) { // ignore } } }
As we can see, the consumer we created implements the Consumer<T> interface. The main logic resides in the accept method.
This is a similar approach we can meet in a typical Spring listener implementation.
4.4. The Controller
Finally, now that we’re able to consume the events, let’s also generate them.
We’re going to do that in a simple controller:
@Controller public class NotificationController { @Autowired private EventBus eventBus; @GetMapping("/startNotification/{param}") public void startNotification(@PathVariable Integer param) { for (int i = 0; i < param; i++) { NotificationData data = new NotificationData(); data.setId(i); eventBus.notify("notificationConsumer", Event.wrap(data)); System.out.println( "Notification " + i + ": notification task submitted successfully"); } } }
This is quite self-explanatory – we’re emitting events through the EventBus here.
For example, if a client hits the URL with a param value of ten, then ten events will be sent through the event bus.
4.5. The Java Config
Let’s now put everything together and create a simple Spring Boot application.
First, we need to configure EventBus and Environment beans:
@Configuration public class Config { @Bean public Environment env() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean public EventBus createEventBus(Environment env) { return EventBus.create(env, Environment.THREAD_POOL); } }
In our case, we’re instantiating the EventBus with a default thread pool available in the environment.
Alternatively, we can use a customized Dispatcher instance:
EventBus evBus = EventBus.create( env, Environment.newDispatcher( REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));
Now, we’re ready to create a main application code:
import static reactor.bus.selector.Selectors.$; @SpringBootApplication public class NotificationApplication implements CommandLineRunner { @Autowired private EventBus eventBus; @Autowired private NotificationConsumer notificationConsumer; @Override public void run(String... args) throws Exception { eventBus.on($("notificationConsumer"), notificationConsumer); } public static void main(String[] args) { SpringApplication.run(NotificationApplication.class, args); } }
In our run method we’re registering the notificationConsumer to be triggered when the notification matches a given selector.
Notice how we’re using the static import of the $ attribute to create a Selector object.
5. Test the Application
Let’s now create a test to see our NotificationApplication in action:
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class NotificationApplicationIntegrationTest { @LocalServerPort private int port; @Test public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class); } }
As we can see, as soon as the request is executed, all ten tasks get submitted instantly without creating any blocking. And once submitted, the notification events get processed in parallel.
Notification 0: notification task submitted successfully Notification 1: notification task submitted successfully Notification 2: notification task submitted successfully Notification 3: notification task submitted successfully Notification 4: notification task submitted successfully Notification 5: notification task submitted successfully Notification 6: notification task submitted successfully Notification 7: notification task submitted successfully Notification 8: notification task submitted successfully Notification 9: notification task submitted successfully Notification service started for Notification ID: 1 Notification service started for Notification ID: 2 Notification service started for Notification ID: 3 Notification service started for Notification ID: 0 Notification service ended for Notification ID: 1 Notification service ended for Notification ID: 0 Notification service started for Notification ID: 4 Notification service ended for Notification ID: 3 Notification service ended for Notification ID: 2 Notification service started for Notification ID: 6 Notification service started for Notification ID: 5 Notification service started for Notification ID: 7 Notification service ended for Notification ID: 4 Notification service started for Notification ID: 8 Notification service ended for Notification ID: 6 Notification service ended for Notification ID: 5 Notification service started for Notification ID: 9 Notification service ended for Notification ID: 7 Notification service ended for Notification ID: 8 Notification service ended for Notification ID: 9
It’s important to keep in mind that in our scenario there’s no need to process these events in any particular order.
6. Conclusion
In this quick tutorial, we’ve created a simple event-driven application. We’ve also seen how to start writing a more reactive and non-blocking code.
However, this scenario just scratches the surface of the subject and represents just a good base to start experimenting with the reactive paradigm.
As always, the source code is available over on GitHub.