Logging a Reactive Sequence

1. Overview

With the introduction of Spring WebFlux, we got another powerful tool to write reactive, non-blocking applications. While using this technology is now way easier than before, debugging reactive sequences in Spring WebFlux can be quite cumbersome.

In this quick tutorial, we’ll see how to easily log events in asynchronous sequences and how to avoid some simple mistakes.

2. Maven Dependency

Let’s add the Spring WebFlux dependency to our project so we can create reactive streams:

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

We can get the latest spring-boot-starter-webflux dependency from Maven Central.

3. Creating a Reactive Stream

To begin let’s create a reactive stream using Flux and use the log() method to enable logging:

1
Flux<Integer> reactiveStream = Flux.range(1, 5).log();

Next, we will subscribe to it to consume generated values:

1
reactiveStream.subscribe();

4. Logging Reactive Stream

After running the above application we see our logger in action:

1
2
3
4
5
6
7
8
2018-11-11 22:37:04 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:37:04 INFO | request(unbounded)
2018-11-11 22:37:04 INFO | onNext(1)
2018-11-11 22:37:04 INFO | onNext(2)
2018-11-11 22:37:04 INFO | onNext(3)
2018-11-11 22:37:04 INFO | onNext(4)
2018-11-11 22:37:04 INFO | onNext(5)
2018-11-11 22:37:04 INFO | onComplete()

We see every event that occurred on our stream. Five values were emitted and then stream closed with an onComplete() event.

5. Advanced Logging Scenario

We can modify our application to see a more interesting scenario. Let’s add take() to Flux which will instruct the stream to provide only a specific number of events:

1
Flux<Integer> reactiveStream = Flux.range(1, 5).log().take(3);

After executing the code, we’ll see the following output:

1
2
3
4
5
6
2018-11-11 22:45:35 INFO | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2018-11-11 22:45:35 INFO | request(unbounded)
2018-11-11 22:45:35 INFO | onNext(1)
2018-11-11 22:45:35 INFO | onNext(2)
2018-11-11 22:45:35 INFO | onNext(3)
2018-11-11 22:45:35 INFO | cancel()

As we can see, take() caused the stream to cancel after emitting three events.

The placement of log() in your stream is crucial. Let’s see how placing log() after take() will produce different output:

1
Flux<Integer> reactiveStream = Flux.range(1, 5).take(3).log();

And the output:

1
2
3
4
5
6
2018-11-11 22:49:23 INFO | onSubscribe([Fuseable] FluxTake.TakeFuseableSubscriber)
2018-11-11 22:49:23 INFO | request(unbounded)
2018-11-11 22:49:23 INFO | onNext(1)
2018-11-11 22:49:23 INFO | onNext(2)
2018-11-11 22:49:23 INFO | onNext(3)
2018-11-11 22:49:23 INFO | onComplete()

As we can see changing the point of observation changed the output. Now the stream produced three events, but instead of cancel(), we see onComplete()This is because we observe the output of using take() instead of what was requested by this method.

6. Conclusion

In this quick article, we saw how to log reactive streams using built-in log() method.

And as always, the source code for the above example can be found over on GitHub.

Related posts:

Using the Not Operator in If Conditions in Java
Introduction to the Java NIO Selector
Java Program to Perform Inorder Non-Recursive Traversal of a Given Binary Tree
Spring Security and OpenID Connect
Java Program to Perform Cryptography Using Transposition Technique
Java Program to Find Nearest Neighbor Using Linear Search
Hướng dẫn Java Design Pattern – Object Pool
Call Methods at Runtime Using Java Reflection
Java Program to Find Number of Articulation points in a Graph
Java Program to Implement Meldable Heap
Java Program to Perform Postorder Recursive Traversal of a Given Binary Tree
Java Program to Sort an Array of 10 Elements Using Heap Sort Algorithm
Jackson JSON Views
Spring Security Basic Authentication
Request Method Not Supported (405) in Spring
Unsatisfied Dependency in Spring
Adding a Newline Character to a String in Java
Setting a Request Timeout for a Spring REST API
Hướng dẫn sử dụng biểu thức chính quy (Regular Expression) trong Java
Java Program to Implement Disjoint Sets
Java Program to Implement Miller Rabin Primality Test Algorithm
Remove All Occurrences of a Specific Value from a List
JWT – Token-based Authentication trong Jersey 2.x
Getting Started with Custom Deserialization in Jackson
Guide to the Fork/Join Framework in Java
Java Program to Implement Sorted Array
New Features in Java 15
Hướng dẫn Java Design Pattern – Composite
Spring Boot - Introduction
Converting a List to String in Java
Kết hợp Java Reflection và Java Annotations
Java Program to Represent Graph Using Incidence Matrix