Mục lục
Trong Java, khởi tạo một thread sẽ chiếm một phần tài nguyên của hệ thống, dùng để cấp phát cho thread mới, cho phép nó thực thi các nhiệm vụ được giao.
Nếu chúng ta tạo thread một cách không kiểm soát, có thể dẫn đến tình trạng cạn kiệt tài nguyên hệ thống.
Thread Pool được sinh giúp chúng ta quản lý các thread, ngăn chặn tối đa tình trạng cạn kiệt tài nguyên từ việc khởi tạo thread trong chương trình.
Thread Pool hoạt động như thế nào
Thread Pool là 1 collection các thread được khởi trước đó. Thông thường kích thước của collection này là cố định. Nó cho việc thực hiện N task trên các thread được quản lý bởi Thread Pool. Nếu cho nhiều task hơn số Thread hiện hành, những task vào sau sẽ được xếp vào hàng đợi.
Khi một thread hoàn thành task được giao, nó sẽ đợi để được giao một task khác từ hàng đợi và thực thi. Khi tất cả các task được hoàn thành, những thread này sẽ chờ các task mới để thực thi từ Thread Pool.
Executors, Executor and ExecutorService
Executors class cung cấp một số method cho phép tạo ra các thread pool instance cấu hình sẵn, chúng ta không cần phải tự cấu hình cho các thread pool này, đây là một lợi thế khi mới làm quen với thread pool.
Executor và ExecutorService là các interface được sử dụng trong quá trình thực thi của các thread pool. Executor chứa duy nhất 1 execute() method dùng gửi các Runnable instance để thực thi.
Ví dụ tạo một Executor instance trong thread pool với số lượng thread đối đa là 1, execute() method nhận một implement của Runnable interface dưới dạng lambda expression.
Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));
ExecutorService interface implement từ Exectuor interface, chứa số lượng lớn method dùng để kiểm soát tiến độ của các task cho đến khi hoàn thành. Sử dụng service này, chúng ta cũng có thể gửi các Runnable instance để thực thi cũng như kiểm soát việc thực hiện của chúng với Future instance.
Ví dụ sử dụng ExecutorService để gửi các task cần thực thi, nó sẽ trả về Future instance. Sử dụng Future.get() method để chờ cho đến khi task được gửi đi hoàn tất và trả về giá trị.
ExecutorService executorService = Executors.newFixedThreadPool(10); Future<String> future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();
Trong thực tế, thì chúng ta không cần gọi get() ngay lập tức, mà chỉ nên gọi khi thực sự cần thiết.
ThreadPoolExecutor
Từ Java 5 trở đi, Java concurrency API cung cấp Executor framework. Dựa trêm Executor interface, và các sub-interface của ExecutorService, và ThreadPoolExecutor class implement cả 2 interface này.
ThreadPoolExecutor tách công việc tạo task và thực thi chúng thành 2 phần. Khi làm việc với ThreadPoolExecutor chúng ta chỉ cần triển khai task bằng cách implement Runnable interface và gửi chúng vào executor, nó sẽ chịu trách nhiệm cho việc thực thi, khởi tạo và chạy các thread cần thiết để hoàn thành các task.
Chúng ta sẽ tìm hiểu về 3 tham số chính sử dụng trong ThreadPoolExecutor: corePoolSize, maximunPoolSize và keepAliveTime.
Mỗi thread pool sẽ chứa một số lượng thread nhất định, chúng sẽ được duy trì mọi lúc trong thread pool, khi số lượng task nhiều quá mức, thread pool có thể khởi tạo thêm một số thread tối đa lên đến maximunPoolSize để tăng hiệu suất làm việc. Các thread được tạo thêm này sẽ được chấm dứt khi không còn cần đến, và giữa lại số corePoolSize thread.
Tham số keepAliveTime là khoảng thời gian cho phép các thread được thread pool tạo thêm được phép tồn tại khi ở trạng thái không hoạt động
Ví dụ sử dụng newFixedThreadPool() method để khởi tạo 1 ThreadPoolExecutor với corePoolSize và maximumPoolSize bằng nhau, và keepAliveTime là 0. Như vậy, tại mọi thời thời điểm thì thread pool luôn chứa 2 thread .
executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); System.out.println("Pool size: " + executor.getPoolSize()); System.out.println("Queue size: "executor.getQueue().size());
Output:
Pool size: 2
Queue size: 1
Ở ví dụ trên, ThreadPoolExecutor được tạo ra với 2 thread bên trong, nghĩa là chúng ta có thể thực thi nhiều nhất là 2 task cùng lúc. Ở trên, chúng ta đã thực thi 3 task cùng lúc nên một task sẽ được đưa vào hàng đợi và 2 task đang được thực thi.
Một cách khác để khởi tạo ThreadPoolExecutor từ Executors.newCachedThreadPool() method, method này sẽ không nhận bất kỳ tham số nào, corePoolSize sẽ được gán bằng 0, maximunPoolSize sẽ là Interger.MAX, keepAliveTime là 60 seconds.
Với các thông số trên, newCachedThreadPool() sẽ tạo ra thread pool không giới hạn số lượng thread được tạo ra, số lượng thread sẽ tăng lên theo số lượng task, nhưng khi không còn cần đến, chúng sẽ được chấm dứt khi không hoạt động trong 60s.
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); System.out.println("Pool size: " + executor.getPoolSize()); System.out.println("Queue size: "executor.getQueue().size());
Output:
Pool size: 3
Queue size: 0
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor extends ThreadPoolExecutor và implement ScheduledExecutorService. Ngoài các tính năng tương tự ThreadPoolExecutor, ScheduledThreadPoolExecutor còn có một số chức năng khác như:
- schedule() method cho phép thực thi một task sau một khoảng thời gian delay xác định.
- scheduleAtFixedRate() method cho phép thực thi một task sau một khoảng thời gian delay xác định tính từ thời điểm khởi tạo, sau đó thực thi task lặp đi lặp lại trong một khoảng thời gian định trước, khoảng thời gian này được tính từ lúc bắt đầu task trước đó đến thời gian bắt đầu task tiếp theo.
- scheduleWithFixedDelay() method tương tự scheduleAtFixedRate() nhưng khoảng thời gian thực thi giữa 2 task được tính đừ lúc kết thúc task trước đi đến khi bắt đầu task kế tiếp.
Sử dụng Executors.newScheduledThreadPool() method để tạo một ScheduledThreadPoolExecutor với corePoolSize là tham số truyền vào, không giới hạn maximumPoolSize, và keepAliveTime là 0.
Ví dụ tạo một ScheduledThreadPoolExecutor, schedule() sẽ thực thi sau khoảng thời gian 500 milliseconds
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);
Ví dụ tạo một ScheduledThreadPoolExecutor, sử dụng scheduleAtFixedRate() để thực thi task sau khoảng thời gian delay 500 milliseconds từ khi khởi tạo, khoảng thời gian thực thi giữa 2 lần kể tiếp là 100 milliseconds; Chúng ta sẽ thực thi task 3 lần liên tiếp sau đó dừng với Future.cancel(), và CountDownLatch.
CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);
Output:
Hello World
Hello World
Hello World
ForkJoinPool
Các thuật toán đệ quy sinh ra rất nhiều task trong một lúc, nếu sử dụng ThreasPoolExecutor sẽ dẫn đến cạn kiệt nguồn tài nguyên của hệ thống nhanh chóng, vì mỗi task hoặc các subtask của nó đều yêu cầu một thread riêng để chạy. ForkJoinPool là một thành phần quan trọng trong fork/join framework được giới thiệu trong Java 7 để giải quyết vấn đề trên.
Trong fork/join framework, mỗi task có thể chia thành các subtask và chờ chúng hoàn thành với join() method. Một điểm đáng chú ý là fork/join framework sẽ không tạo ra các thread mới cho mỗi task và subtask của nó.
Chúng ta đi đến một ví dụ đơn giản sử dụng ForkJoinPool để duyệt qua tất cả các node của một cây và tính tổng giá trị của các node.
class TreeNode { int value; List<TreeNode> children; TreeNode(int value, List<TreeNode> children) { this.value = value; this.children = children; } }
Để tính tổng giá trị của các node, chúng ta cần tạo ra một CountingTask class implement RecursiveTask<Integer> interface. Mỗi task sẽ nhận một node và tính tổng giá trị các node con của nó bằng cách:
- Dùng Stream API để duyệt qua các node con của mỗi node
- Các node con tương ứng tạo một CountingTask thông qua map()method
- Gọi fork() để tính tổng giá trị các node con của một node.
- Tập hợp tất cả giá trị với join() method
class CountingTask extends RecursiveTask<Integer> { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .mapToInt(ForkJoinTask::join).sum(); } }
class Main { public static void main(String[] agrs) throws InterruptedException { TreeNode tree = new TreeNode(5, Arrays.asList(new TreeNode(3, Collections.emptyList()), new TreeNode(2, Arrays.asList(new TreeNode(2, Collections.emptyList()), new TreeNode(8, Collections.emptyList()))))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree)); System.out.println("Sum: " + sum); } }
Output: Sum: 20
Tóm lược
Như vậy Thread Pool quả là một công cụ hiệu quả khi làm việc với thread phải không nào. Chúng ta sẽ giảm thiểu đáng kể việc cạn kiệt tài nguyên hệ thống khi mà các thread được tạo ra vô số mà không được quản lý chặt chẽ.
Nguồn tham khảo
https://www.baeldung.com/thread-pool-java-and-guava
https://howtodoinjava.com/java/multi-threading/java-thread-pool-executor-example/