Custom Thread Pools In Java 8 Parallel Streams

1. Overview

Java 8 introduced the concept of Streams as an efficient way of carrying out bulk operations on data. And parallel Streams can be obtained in environments that support concurrency.

These streams can come with improved performance – at the cost of multi-threading overhead.

In this quick tutorial, we’ll look at one of the biggest limitations of Stream API and see how to make a parallel stream work with a custom ThreadPool instance, alternatively – there’s a library that handles this.

2. Parallel Stream

Let’s start with a simple example – calling the parallelStream method on any of the Collection types – which will return a possibly parallel Stream:

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();
        
    assertTrue(parallelStream.isParallel());
}

The default processing that occurs in such a Stream uses the ForkJoinPool.commonPool(), a thread pool shared by the entire application.

3. Custom Thread Pool

We can actually pass a custom ThreadPool when processing the stream.

The following example lets have a parallel Stream use a custom ThreadPool to calculate the sum of long values from 1 to 1,000,000, inclusive:

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
    
    long firstNum = 1;
    long lastNum = 1_000_000;

    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
 
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

We used the ForkJoinPool constructor with a parallelism level of 4. Some experimentation is required to determine the optimal value for different environments, but a good rule of thumb is simply choosing the number based on how many cores your CPU has.

Next, we processed the content of the parallel Stream, summing them up in the reduce call.

This simple example may not demonstrate the full usefulness of using a custom thread pool, but the benefits become obvious in situations where we do not want to tie-up the common thread pool with long-running tasks – such as processing data from a network source – or the common thread pool is being used by other components within the application.

If we run the test method above, it’ll pass. So far, so good.

However, if we instantiate ForkJoinPool class in a normal method in the same way as we do in the test method, it may lead to the OutOfMemoryError.

Next, let’s take a closer look at the cause of the memory leak.

4. Beware of the Memory Leak

As we’ve talked about earlier, the common thread pool is used by the entire application by default. The common thread pool is a static ThreadPool instance.

Therefore, no memory leak occurs if we use the default thread pool.

Now, let’s review our test method. In the test method, we created an object of ForkJoinPool. When the test method is finished, the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned.

That is to say, every time we call the test method, a new customThreadPool object will be created and it won’t be released.

The fix to the problem is pretty simple: shutdown the customThreadPool object after we’ve executed the method:

try {
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. Conclusion

We have briefly looked at how to run a parallel Stream using a custom ThreadPool. In the right environment and with the proper use of the parallelism level, performance gains can be had in certain situations.

If we create a custom ThreadPool, we should keep in mind to call its shutdown() method to avoid a memory leak.

The complete code samples referenced in this article can be found over on GitHub.

Related posts:

Java Convenience Factory Methods for Collections
Java Program to Implement Skip List
Java Program to Implement the Hill Cypher
Từ khóa throw và throws trong Java
How to Replace Many if Statements in Java
Java Program to Implement Strassen Algorithm
Java Program to Find Second Smallest of n Elements with Given Complexity Constraint
Java Program to Optimize Wire Length in Electrical Circuit
A Guide to TreeMap in Java
Cachable Static Assets with Spring MVC
Java Program to Generate All Subsets of a Given Set in the Lexico Graphic Order
Spring WebClient and OAuth2 Support
Java Program to Implement Quick Hull Algorithm to Find Convex Hull
Vấn đề Nhà sản xuất (Producer) – Người tiêu dùng (Consumer) và đồng bộ hóa các luồng trong Java
How to Round a Number to N Decimal Places in Java
Introduction to Spring Cloud Stream
ExecutorService – Waiting for Threads to Finish
Java Program to Implement PriorityBlockingQueue API
Netflix Archaius with Various Database Configurations
Unsatisfied Dependency in Spring
Java Program to Implement LinkedList API
Introduction to Spring Data JDBC
Zipping Collections in Java
Functional Interfaces in Java 8
Java Program to Find All Pairs Shortest Path
Java Program to Find MST (Minimum Spanning Tree) using Kruskal’s Algorithm
Auditing with JPA, Hibernate, and Spring Data JPA
Java Program to Implement Sparse Matrix
Encode a String to UTF-8 in Java
Summing Numbers with Java Streams
Integer Constant Pool trong Java
How To Serialize and Deserialize Enums with Jackson