A Guide to Java SynchronousQueue

1. Overview

In this article, we’ll be looking at the SynchronousQueue from the java.util.concurrent package.

Simply put, this implementation allows us to exchange information between threads in a thread-safe manner.

2. API Overview

The SynchronousQueue only has two supported operations: take() and put(), and both of them are blocking.

For example, when we want to add an element to the queue, we need to call the put() method. That method will block until some other thread calls the take() method, signaling that it is ready to take an element.

Although the SynchronousQueue has an interface of a queue, we should think about it as an exchange point for a single element between two threads, in which one thread is handing off an element, and another thread is taking that element.

3. Implementing Handoffs Using a Shared Variable

To see why the SynchronousQueue can be so useful, we will implement a logic using a shared variable between two threads and next, we will rewrite that logic using SynchronousQueue making our code a lot simpler and more readable.

Let’s say that we have two threads – a producer and a consumer – and when the producer is setting a value of a shared variable, we want to signal that fact to the consumer thread. Next, the consumer thread will fetch a value from a shared variable.

We will use the CountDownLatch to coordinate those two threads, to prevent a situation when the consumer accesses a value of a shared variable that was not set yet.

We will define a sharedState variable and a CountDownLatch that will be used for coordinating processing:

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

The producer will save a random integer to the sharedState variable, and execute the countDown() method on the countDownLatch, signaling to the consumer that it can fetch a value from the sharedState:

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};

The consumer will wait on the countDownLatch using the await() method. When the producer signals that the variable was set, the consumer will fetch it from the sharedState:

Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Last but not least, let’s start our program:

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

It will produce the following output:

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

We can see that this is a lot of code to implement such a simple functionality as exchanging an element between two threads. In the next section, we will try to make it better.

4. Implementing Handoffs Using the SynchronousQueue

Let’s now implement the same functionality as in the previous section, but with a SynchronousQueue. It has a double effect because we can use it for exchanging state between threads and for coordinating that action so that we don’t need to use anything besides SynchronousQueue.

Firstly, we will define a queue:

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

The producer will call a put() method that will block until some other thread takes an element from the queue:

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

The consumer will simply retrieve that element using the take() method:

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Next, we will start our program:

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

It will produce the following output:

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point

We can see that a SynchronousQueue is used as an exchange point between the threads, which is a lot better and more understandable than the previous example which used the shared state together with a CountDownLatch.

5. Conclusion

In this quick tutorial, we looked at the SynchronousQueue construct. We created a program that exchanges data between two threads using shared state, and then rewrote that program to leverage the SynchronousQueue construct. This serves as an exchange point that coordinates the producer and the consumer thread.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

Related posts:

Java Program to Implement JobStateReasons API
Java Program to Implement the Program Used in grep/egrep/fgrep
Spring WebClient Requests with Parameters
Java Program to Find the Minimum Element of a Rotated Sorted Array using Binary Search approach
Java Program to Implement Branch and Bound Method to Perform a Combinatorial Search
Java Program to Implement the String Search Algorithm for Short Text Sizes
Tránh lỗi ConcurrentModificationException trong Java như thế nào?
Java Program to Implement LinkedHashSet API
Java Program to Implement First Fit Decreasing for 1-D Objects and M Bins
Java Program to Implement a Binary Search Tree using Linked Lists
Lập trình đa luồng trong Java (Java Multi-threading)
Java Program to Construct an Expression Tree for an Prefix Expression
Lớp Properties trong java
Encode/Decode to/from Base64
Using Java Assertions
Circular Dependencies in Spring
Custom Error Pages with Spring MVC
An Intro to Spring Cloud Zookeeper
Java Program to Implement Pagoda
Login For a Spring Web App – Error Handling and Localization
Java Program to Solve the 0-1 Knapsack Problem
Spring RequestMapping
Quick Guide on Loading Initial Data with Spring Boot
Control Structures in Java
Semaphore trong Java
The Thread.join() Method in Java
Remove HTML tags from a file to extract only the TEXT
Java Program to Check if a Point d lies Inside or Outside a Circle Defined by Points a, b, c in a Pl...
Java Program to Implement Vector API
Thực thi nhiều tác vụ cùng lúc như thế nào trong Java?
Getting Started with Forms in Spring MVC
Java Program to Implement Graph Structured Stack