Custom Thread Pools In Java 8 Parallel Streams

1. Overview

Java 8 introduced the concept of Streams as an efficient way of carrying out bulk operations on data. And parallel Streams can be obtained in environments that support concurrency.

These streams can come with improved performance – at the cost of multi-threading overhead.

In this quick tutorial, we’ll look at one of the biggest limitations of Stream API and see how to make a parallel stream work with a custom ThreadPool instance, alternatively – there’s a library that handles this.

2. Parallel Stream

Let’s start with a simple example – calling the parallelStream method on any of the Collection types – which will return a possibly parallel Stream:

@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
    List<Long> aList = new ArrayList<>();
    Stream<Long> parallelStream = aList.parallelStream();
        
    assertTrue(parallelStream.isParallel());
}

The default processing that occurs in such a Stream uses the ForkJoinPool.commonPool(), a thread pool shared by the entire application.

3. Custom Thread Pool

We can actually pass a custom ThreadPool when processing the stream.

The following example lets have a parallel Stream use a custom ThreadPool to calculate the sum of long values from 1 to 1,000,000, inclusive:

@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() 
  throws InterruptedException, ExecutionException {
    
    long firstNum = 1;
    long lastNum = 1_000_000;

    List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
      .collect(Collectors.toList());

    ForkJoinPool customThreadPool = new ForkJoinPool(4);
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
 
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}

We used the ForkJoinPool constructor with a parallelism level of 4. Some experimentation is required to determine the optimal value for different environments, but a good rule of thumb is simply choosing the number based on how many cores your CPU has.

Next, we processed the content of the parallel Stream, summing them up in the reduce call.

This simple example may not demonstrate the full usefulness of using a custom thread pool, but the benefits become obvious in situations where we do not want to tie-up the common thread pool with long-running tasks – such as processing data from a network source – or the common thread pool is being used by other components within the application.

If we run the test method above, it’ll pass. So far, so good.

However, if we instantiate ForkJoinPool class in a normal method in the same way as we do in the test method, it may lead to the OutOfMemoryError.

Next, let’s take a closer look at the cause of the memory leak.

4. Beware of the Memory Leak

As we’ve talked about earlier, the common thread pool is used by the entire application by default. The common thread pool is a static ThreadPool instance.

Therefore, no memory leak occurs if we use the default thread pool.

Now, let’s review our test method. In the test method, we created an object of ForkJoinPool. When the test method is finished, the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned.

That is to say, every time we call the test method, a new customThreadPool object will be created and it won’t be released.

The fix to the problem is pretty simple: shutdown the customThreadPool object after we’ve executed the method:

try {
    long actualTotal = customThreadPool.submit(
      () -> aList.parallelStream().reduce(0L, Long::sum)).get();
    assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
    customThreadPool.shutdown();
}

5. Conclusion

We have briefly looked at how to run a parallel Stream using a custom ThreadPool. In the right environment and with the proper use of the parallelism level, performance gains can be had in certain situations.

If we create a custom ThreadPool, we should keep in mind to call its shutdown() method to avoid a memory leak.

The complete code samples referenced in this article can be found over on GitHub.

Related posts:

Java Program to find the maximum subarray sum using Binary Search approach
Introduction to Netflix Archaius with Spring Cloud
Working With Maps Using Streams
Query Entities by Dates and Times with Spring Data JPA
Java InputStream to Byte Array and ByteBuffer
Spring Security Custom AuthenticationFailureHandler
Java Program to Implement Levenshtein Distance Computing Algorithm
Spring Security 5 – OAuth2 Login
Extra Login Fields with Spring Security
Spring Boot - Tomcat Port Number
Java Program to Implement a Binary Search Tree using Linked Lists
Lập trình đa luồng với CompletableFuture trong Java 8
Java Program to Implement Dijkstra’s Algorithm using Queue
Xử lý ngoại lệ đối với trường hợp ghi đè phương thức trong java
Generic Constructors in Java
Sao chép các phần tử của một mảng sang mảng khác như thế nào?
Java Program to do a Breadth First Search/Traversal on a graph non-recursively
Versioning a REST API
Comparing Arrays in Java
Kiểu dữ liệu Ngày Giờ (Date Time) trong java
Kết hợp Java Reflection và Java Annotations
The Spring @Controller and @RestController Annotations
Guide to Spring Cloud Kubernetes
Java Program to Check whether Graph is a Bipartite using 2 Color Algorithm
Java Program to Generate Randomized Sequence of Given Range of Numbers
Get and Post Lists of Objects with RestTemplate
Inheritance and Composition (Is-a vs Has-a relationship) in Java
Java Program to Solve the 0-1 Knapsack Problem
Sử dụng JDBC API thực thi câu lệnh truy vấn dữ liệu
Java Program to Implement Nth Root Algorithm
Java Program to Implement CopyOnWriteArraySet API
Prevent Brute Force Authentication Attempts with Spring Security