Guide to the ConcurrentSkipListMap

1. Overview

In this quick article, we’ll be looking at the ConcurrentSkipListMap class from the java.util.concurrent package.

This construct allows us to create thread-safe logic in a lock-free way. It’s ideal for problems when we want to make an immutable snapshot of the data while other threads are still inserting data into the map.

We will be solving a problem of sorting a stream of events and getting a snapshot of the events that arrived in the last 60 seconds using that construct.

2. Stream Sorting Logic

Let’s say that we have a stream of events that are continually coming from multiple threads. We need to be able to take events from the last 60 seconds, and also events that are older than 60 seconds.

First, let’s define the structure of our event data:

public class Event {
    private ZonedDateTime eventTime;
    private String content;

    // standard constructors/getters
}

We want to keep our events sorted using the eventTime field. To achieve this using the ConcurrentSkipListMap, we need to pass a Comparator to its constructor while creating an instance of it:

ConcurrentSkipListMap<ZonedDateTime, String> events
 = new ConcurrentSkipListMap<>(
 Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));

We’ll be comparing all arrived events using their timestamps. We are using the comparingLong() method and passing the extract function that can take a long timestamp from the ZonedDateTime.

When our events are arriving, we need only to add them to the map using the put() method. Note that this method does not require any explicit synchronization:

public void acceptEvent(Event event) {
    events.put(event.getEventTime(), event.getContent());
}

The ConcurrentSkipListMap will handle the sorting of those events underneath using the Comparator that was passed to it in the constructor.

The most notable pros of the ConcurrentSkipListMap are the methods that can make an immutable snapshot of its data in a lock-free way. To get all events that arrived within the past minute, we can use the tailMap() method and pass the time from which we want to get elements:

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
    return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

It will return all events from the past minute. It will be an immutable snapshot and what is the most important is that other writing threads can add new events to the ConcurrentSkipListMap without any need to do explicit locking.

We can now get all events that arrived later that one minute from now – by using the headMap() method:

public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
    return events.headMap(ZonedDateTime.now().minusMinutes(1));
}

This will return an immutable snapshot of all events that are older than one minute. All of the above methods belong to the EventWindowSort class, which we’ll use in the next section.

3. Testing the Sorting Stream Logic

Once we implemented our sorting logic using the ConcurrentSkipListMap, we can now test it by creating two writer threads that will send one hundred events each:

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
  .rangeClosed(0, 100)
  .forEach(index -> eventWindowSort.acceptEvent(
      new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
  );

for (int i = 0; i < numberOfThreads; i++) {
    executorService.execute(producer);
}

Each thread is invoking the acceptEvent() method, sending the events that have eventTime from now to “now minus one hundred seconds”.

In the meantime, we can invoke the getEventsFromLastMinute() method that will return the snapshot of events that are within the one minute window:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsFromLastMinute();

The number of events in the eventsFromLastMinute will be varying in each test run depending on the speed at which the producer threads will be sending the events to the EventWindowSort. We can assert that there is not a single event in the returned snapshot that is older than one minute:

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventsOlderThanOneMinute, 0);

And that there are more than zero events in the snapshot that are within the one minute window:

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventYoungerThanOneMinute > 0);

Our getEventsFromLastMinute() uses the tailMap() underneath.

Let’s test now the getEventsOlderThatOneMinute() that is using the headMap() method from the ConcurrentSkipListMap:

ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute 
  = eventWindowSort.getEventsOlderThatOneMinute();

This time we get a snapshot of events that are older than one minute. We can assert that there are more than zero of such events:

long eventsOlderThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertTrue(eventsOlderThanOneMinute > 0);

And next, that there is not a single event that is from within the last minute:

long eventYoungerThanOneMinute = eventsFromLastMinute
  .entrySet()
  .stream()
  .filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
  .count();
 
assertEquals(eventYoungerThanOneMinute, 0);

The most important thing to note is that we can take the snapshot of data while other threads are still adding new values to the ConcurrentSkipListMap.

4. Conclusion

In this quick tutorial, we had a look at the basics of the ConcurrentSkipListMap, along with some practical examples.

We leveraged the high performance of the ConcurrentSkipListMap to implement a non-blocking algorithm that can serve us an immutable snapshot of data even if at the same time multiple threads are updating the map.

The implementation of all these examples and code snippets can be found in the GitHub project; this is a Maven project, so it should be easy to import and run as it is.

Related posts:

Java Program to Represent Graph Using Incidence Matrix
Adding Shutdown Hooks for JVM Applications
Java Program to Create a Random Graph Using Random Edge Generation
HttpClient with SSL
Java Program to Perform the Sorting Using Counting Sort
Java Program to Find Minimum Element in an Array using Linear Search
Jackson – Unmarshall to Collection/Array
Ignore Null Fields with Jackson
Java Streams vs Vavr Streams
Introduction to Spring MVC HandlerInterceptor
Guide to CountDownLatch in Java
Interface trong Java 8 – Default method và Static method
Spring Data Reactive Repositories with MongoDB
Hướng dẫn Java Design Pattern – Transfer Object
A Guide to LinkedHashMap in Java
Handling Errors in Spring WebFlux
Dockerizing a Spring Boot Application
Java Program to Implement the Hungarian Algorithm for Bipartite Matching
Java Program to find the maximum subarray sum O(n^2) time(naive method)
Validations for Enum Types
ETags for REST with Spring
Hướng dẫn Java Design Pattern – Chain of Responsibility
Introduction to Netflix Archaius with Spring Cloud
New Features in Java 13
Tránh lỗi NullPointerException trong Java như thế nào?
Spring Security Basic Authentication
The Order of Tests in JUnit
SOAP Web service: Upload và Download file sử dụng MTOM trong JAX-WS
RegEx for matching Date Pattern in Java
Java Program to Implement Slicker Algorithm that avoids Triangulation to Find Area of a Polygon
Java Program to Check whether Directed Graph is Connected using BFS
Java Program to Implement the MD5 Algorithm