Logging a Reactive Sequence

1. Overview

With the introduction of Spring WebFlux, we got another powerful tool to write reactive, non-blocking applications. While using this technology is now way easier than before, debugging reactive sequences in Spring WebFlux can be quite cumbersome.

In this quick tutorial, we’ll see how to easily log events in asynchronous sequences and how to avoid some simple mistakes.

2. Maven Dependency

Let’s add the Spring WebFlux dependency to our project so we can create reactive streams:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

We can get the latest spring-boot-starter-webflux dependency from Maven Central.

3. Creating a Reactive Stream

To begin let’s create a reactive stream using Flux and use the log() method to enable logging:

Flux<Integer> reactiveStream = Flux.range(1, 5).log();

Next, we will subscribe to it to consume generated values:

reactiveStream.subscribe();

4. Logging Reactive Stream

After running the above application we see our logger in action:

2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()

We see every event that occurred on our stream. Five values were emitted and then stream closed with an onComplete() event.

5. Advanced Logging Scenario

We can modify our application to see a more interesting scenario. Let’s add take() to Flux which will instruct the stream to provide only a specific number of events:

Flux<Integer> reactiveStream = Flux.range(1, 5).log().take(3);

After executing the code, we’ll see the following output:

2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()

As we can see, take() caused the stream to cancel after emitting three events.

The placement of log() in your stream is crucial. Let’s see how placing log() after take() will produce different output:

Flux<Integer> reactiveStream = Flux.range(1, 5).take(3).log();

And the output:

2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()

As we can see changing the point of observation changed the output. Now the stream produced three events, but instead of cancel(), we see onComplete()This is because we observe the output of using take() instead of what was requested by this method.

6. Conclusion

In this quick article, we saw how to log reactive streams using built-in log() method.

And as always, the source code for the above example can be found over on GitHub.

Related posts:

The Java 8 Stream API Tutorial
Đồng bộ hóa các luồng trong Java
The SpringJUnitConfig and SpringJUnitWebConfig Annotations in Spring 5
Mix plain text and HTML content in a mail
OAuth2 for a Spring REST API – Handle the Refresh Token in AngularJS
Serve Static Resources with Spring
Sending Emails with Java
Show Hibernate/JPA SQL Statements from Spring Boot
Java Program to Implement CopyOnWriteArraySet API
How to Read a Large File Efficiently with Java
Spring AMQP in Reactive Applications
Spring REST API + OAuth2 + Angular
Java Program to Check Multiplicability of Two Matrices
Thao tác với tập tin và thư mục trong Java
Java Program to Perform Left Rotation on a Binary Search Tree
Java Program to find the maximum subarray sum using Binary Search approach
Spring Boot - Quick Start
Spring Boot - Runners
So sánh HashSet, LinkedHashSet và TreeSet trong Java
Java Program to Implement Solovay Strassen Primality Test Algorithm
Java Program to Implement ConcurrentLinkedQueue API
Java Web Services – Jersey JAX-RS – REST và sử dụng REST API testing tools với Postman
Java Program to Check if a Given Graph Contain Hamiltonian Cycle or Not
Java Program to Use the Bellman-Ford Algorithm to Find the Shortest Path
Java Program to Implement Segment Tree
Using JWT with Spring Security OAuth
Java Program to Implement Shoelace Algorithm
Jackson – Change Name of Field
Explain about URL and HTTPS protocol
Java Program to Solve Tower of Hanoi Problem using Stacks
Mapping Nested Values with Jackson
Using the Map.Entry Java Class