Sử dụng CyclicBarrier trong Java

Trong bài trước, chúng ta đã cùng tìm hiểu về cách sử dụng CountDownLatch trong Java. Trong bài này, chúng ta sẽ cùng tìm hiểu về cách sử dụng CyclicBarrier , sự khác biệt của nó so với CountDownLatch.

1. Giới thiệu CyclicBarrier

CyclicBarrier là một tiện ích thuộc gói java.util.concurrent, được giới thiệu trong Java 5. CyclicBarrier được sử dụng để tạo luồng chờ đợi cho nhau ở một điểm chung, còn được gọi là rào cản (barrier).

Nó được sử dụng khi các luồng khác xử lý một phần của tính toán và khi tất cả các luồng đã hoàn thành việc thực hiện, kết quả cần phải được kết hợp trong luồng chính. Nói cách khác, một CyclicBarrier được sử dụng khi nhiều luồng thực hiện các tác vụ phụ khác nhau và đầu ra của các nhiệm vụ phụ này cần phải được kết hợp để tạo thành đầu ra cuối cùng.

CyclicBarrier tương tự như CountDownLatch, nhưng sau khi hoàn thành việc thực thi, thay vì gọi countDown() mỗi thread gọi await(). Và khi Thread cuối cùng gọi await(), nó báo hiệu rằng nó đã đạt đến rào cản (barrier). Khi đó, tất cả các Thread bắt đầu xử lý lại, còn được gọi là một rào cản bị hỏng (broken).

Sự khác biệt giữa CyclicBarrier và CountDownLatch:

  • Một CountDownLatch chỉ có thể được sử dụng một lần trong một chương trình (cho đến khi nó đếm đến 0).
  • Một CyclicBarrier có thể được sử dụng lại khi tất cả các Thread trong một rào cản được hoàn thành (released).

Bạn có thể sử dụng CyclicBarrier bất cứ nơi nào sử dụng CountDownLatch, nhưng ngược lại là không thể bởi vì bạn không thể sử dụng lại latch một khi đếm (countdown) đạt đến số 0.

Một số cách sử dụng của CyclicBarrier là viết unit test cho chương trình đa luồng, để mô phỏng sự đa luồng trong một lớp test hoặc tính kết quả cuối cùng sau khi một task vụ đã hoàn thành.

2. Ví dụ sử dụng CyclicBarrier

Ví dụ: Một chương trình chỉ được start sau khi 3 service của nó đã được start. Với yêu cầu này, chúng ta có thể sử dụng phương thức join() của lớp Thread hoặc sử dụng lớp CountDownLatch theo hướng dẫn của bài trước. Tuy nhiên, còn một cách khác là sử dụng CyclicBarrier.

Để dễ hiểu, tôi thêm các dòng log trước và sau khi service thực thi để bạn thấy được cách hoạt động của CyclicBarrier. Khi service thực thi xong, sẽ gọi phương thức await() để thông báo là nó đã hoàn thành.

ServiceOne.java

package com.maixuanviet.cyclicbarrier;
 
import java.util.concurrent.CyclicBarrier;
 
public class ServiceOne implements Runnable {
 
    private final CyclicBarrier cyclicBarrier;
 
    public ServiceOne(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
 
    @Override
    public void run() {
        System.out.println("Starting service One...");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("Service One has finished its work... waiting for others...");
        try {
            cyclicBarrier.await();
        } catch (Exception e) {
            System.out.println("Service one interrupted!");
            e.printStackTrace();
        }
        System.out.println("The wait is over, lets complete Service One!");
 
    }
 
}

ServiceTwo.java

package com.maixuanviet.cyclicbarrier;
 
import java.util.concurrent.CyclicBarrier;
 
public class ServiceTwo implements Runnable {
 
    private final CyclicBarrier cyclicBarrier;
 
    public ServiceTwo(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
 
    @Override
    public void run() {
        System.out.println("Starting service Two....");
 
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("Service Two has finished its work.. waiting for others...");
        try {
            cyclicBarrier.await();
        } catch (Exception e) {
            System.out.println("Service two interrupted!");
            e.printStackTrace();
        }
        System.out.println("The wait is over, lets complete Service two!");
 
    }
 
}

ServiceThree.java

package com.maixuanviet.cyclicbarrier;
 
import java.util.concurrent.CyclicBarrier;
 
public class ServiceThree implements Runnable {
 
    private final CyclicBarrier cyclicBarrier;
 
    public ServiceThree(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
 
    @Override
    public void run() {
        System.out.println("Starting service Three....");
 
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        System.out.println("Service Three has finished its work.. waiting for others...");
        try {
            cyclicBarrier.await();
        } catch (Exception e) {
            System.out.println("Service Three interrupted!");
            e.printStackTrace();
        }
        System.out.println("The wait is over, lets complete Service Three!");
    }
}

Chương trình chính sẽ chờ tất cả các Service hoàn thành trước khi tiếp tục xử lý.

package com.maixuanviet.cyclicbarrier;
 
import java.util.Date;
import java.util.concurrent.CyclicBarrier;
 
public class CyclicBarrierExample {
    public static void main(String[] args) {
 
        // 4 threads are part of the barrier, ServiceOne, ServiceTwo, ServiceThree and
        // this main thread calling them.
        final CyclicBarrier barrier = new CyclicBarrier(4);
 
        Thread service1 = new Thread(new ServiceOne(barrier));
        Thread service2 = new Thread(new ServiceTwo(barrier));
        Thread service3 = new Thread(new ServiceThree(barrier));
 
        System.out.println("Starting both the services at" + new Date());
 
        service1.start();
        service2.start();
        service3.start();
 
        try {
            barrier.await();
        } catch (Exception e) {
            System.out.println("Main Thread interrupted!");
            e.printStackTrace();
        }
        System.out.println("Ending both the services at" + new Date());
    }
}

Thực thi chương trình trên, ta có kết quả sau:

Starting both the services atTue Mar 06 23:49:55 ICT 2018
Starting service One...
Starting service Three....
Starting service Two....
Service Three has finished its work.. waiting for others...
Service One has finished its work... waiting for others...
Service Two has finished its work.. waiting for others...
Ending both the services atTue Mar 06 23:50:00 ICT 2018
The wait is over, lets complete Service Three!
The wait is over, lets complete Service One!
The wait is over, lets complete Service two!

Như bạn thấy:

  • Tôi đã khởi tạo một CyclicBarrier với số lượng Thread là 4, bao gồm: service1, service2, service3 và luồng của chương trình chính.
  • Chương trình chính sẽ start 3 service: service, service2 và service3.
  • Khi service 1 hoàn thành, nó sẽ gọi await() để thông báo nó đã hoàn thành phần việc của nó, chờ service khác xử lý.
  • Khi service2 hoàn thành, nó cũng gọi await() để thông báo nó đã hoàn thành phần việc của nó, chờ service khác xử lý.
  • Khi service3 hoàn thành, nó cũng gọi await() để thông báo nó đã hoàn thành phần việc của nó, chờ service khác xử lý.
  • Khi cả 3 service đã hoàn thành, chương trình chính sẽ được gọi thực thi và các service cũng sẽ tiếp tục xử lý phần việc còn lại (resume).

Related posts:

Optional trong Java 8
Registration – Password Strength and Rules
Spring 5 Testing with @EnabledIf Annotation
Từ khóa this và super trong Java
Java Program to Generate a Random Subset by Coin Flipping
Java Program to Check Whether an Undirected Graph Contains a Eulerian Cycle
JUnit5 Programmatic Extension Registration with @RegisterExtension
Java – Generate Random String
Java Program to Implement String Matching Using Vectors
Java – Byte Array to Reader
Introduction to Spring Cloud Stream
Java Program to Implement Iterative Deepening
Jackson – Marshall String to JsonNode
Use Liquibase to Safely Evolve Your Database Schema
Java Program to Represent Graph Using 2D Arrays
REST Web service: Tạo ứng dụng Java RESTful Client với Jersey Client 2.x
Hướng dẫn Java Design Pattern – Decorator
The Difference Between map() and flatMap()
Wiring in Spring: @Autowired, @Resource and @Inject
Limiting Query Results with JPA and Spring Data JPA
Java Program to Implement Cartesian Tree
Spring Boot - Twilio
Java Program to Check Whether it is Weakly Connected or Strongly Connected for a Directed Graph
Java Program to Implement Johnson’s Algorithm
Thực thi nhiều tác vụ cùng lúc như thế nào trong Java?
Java Program to implement Bit Matrix
Java Program to Implement Hash Tables with Linear Probing
Java Program to Solve a Matching Problem for a Given Specific Case
Java 8 and Infinite Streams
Java Program to Implement Sieve Of Atkin
Java Program to Implement Lloyd’s Algorithm
Java Program to Implement Multi-Threaded Version of Binary Search Tree