Custom Thread Pools In Java 8 Parallel Streams

1. Overview

Java 8 introduced the concept of Streams as an efficient way of carrying out bulk operations on data. And parallel Streams can be obtained in environments that support concurrency.

These streams can come with improved performance – at the cost of multi-threading overhead.

In this quick tutorial, we’ll look at one of the biggest limitations of Stream API and see how to make a parallel stream work with a custom ThreadPool instance, alternatively – there’s a library that handles this.

2. Parallel Stream

Let’s start with a simple example – calling the parallelStream method on any of the Collection types – which will return a possibly parallel Stream:

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();
        
    assertTrue(parallelStream.isParallel());
}

The default processing that occurs in such a Stream uses the ForkJoinPool.commonPool(), a thread pool shared by the entire application.

3. Custom Thread Pool

We can actually pass a custom ThreadPool when processing the stream.

The following example lets have a parallel Stream use a custom ThreadPool to calculate the sum of long values from 1 to 1,000,000, inclusive:

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
    
    long firstNum = 1;
    long lastNum = 1_000_000;

    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
 
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

We used the ForkJoinPool constructor with a parallelism level of 4. Some experimentation is required to determine the optimal value for different environments, but a good rule of thumb is simply choosing the number based on how many cores your CPU has.

Next, we processed the content of the parallel Stream, summing them up in the reduce call.

This simple example may not demonstrate the full usefulness of using a custom thread pool, but the benefits become obvious in situations where we do not want to tie-up the common thread pool with long-running tasks – such as processing data from a network source – or the common thread pool is being used by other components within the application.

If we run the test method above, it’ll pass. So far, so good.

However, if we instantiate ForkJoinPool class in a normal method in the same way as we do in the test method, it may lead to the OutOfMemoryError.

Next, let’s take a closer look at the cause of the memory leak.

4. Beware of the Memory Leak

As we’ve talked about earlier, the common thread pool is used by the entire application by default. The common thread pool is a static ThreadPool instance.

Therefore, no memory leak occurs if we use the default thread pool.

Now, let’s review our test method. In the test method, we created an object of ForkJoinPool. When the test method is finished, the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned.

That is to say, every time we call the test method, a new customThreadPool object will be created and it won’t be released.

The fix to the problem is pretty simple: shutdown the customThreadPool object after we’ve executed the method:

try {
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. Conclusion

We have briefly looked at how to run a parallel Stream using a custom ThreadPool. In the right environment and with the proper use of the parallelism level, performance gains can be had in certain situations.

If we create a custom ThreadPool, we should keep in mind to call its shutdown() method to avoid a memory leak.

The complete code samples referenced in this article can be found over on GitHub.

Related posts:

Java Program to Find the Minimum Element of a Rotated Sorted Array using Binary Search approach
Converting Between a List and a Set in Java
Java Program to Perform Naive String Matching
Java Program to Check Whether a Given Point is in a Given Polygon
Spring MVC and the @ModelAttribute Annotation
Java Program to Implement Weight Balanced Tree
Java Program to Find Minimum Element in an Array using Linear Search
Java Program to Find a Good Feedback Vertex Set
Java Program to Implement Suffix Array
The HttpMediaTypeNotAcceptableException in Spring MVC
Spring Boot - Runners
Java Program to Implement a Binary Search Tree using Linked Lists
Finding Max/Min of a List or Collection
Java Program to Implement Fermat Primality Test Algorithm
Java Program to Implement Direct Addressing Tables
Introduction to Spring Security Expressions
Introduction to Using Thymeleaf in Spring
Semaphore trong Java
Intersection of Two Lists in Java
Giới thiệu về Stream API trong Java 8
Java Program to Implement Attribute API
An Intro to Spring Cloud Security
Java Program to Find Maximum Element in an Array using Binary Search
Extra Login Fields with Spring Security
Using a List of Values in a JdbcTemplate IN Clause
Runnable vs. Callable in Java
Java Program to Find the Peak Element of an Array O(n) time (Naive Method)
A Guide to EnumMap
Java Program to implement Bit Matrix
Java Program to Implement Graph Structured Stack
Java Program to Remove the Edges in a Given Cyclic Graph such that its Linear Extension can be Found
Java Program to Implement RenderingHints API