抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文学习了多线程的ForkJoin框架。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8

1 简介

在JDK1.7之后引入了ForkJoin框架,将一个大任务分解成多个子任务,子任务可以继续往下分解,将多个子任务的结果合并成一个大结果,最终合并成大任务的结果。

ForkJoin框架要完成两件事情:

  • Fork:把大任务拆分成子任务。
  • Join:把子任务的结果合并成大任务的结果。

ForkJoin框架的实现非常复杂,内部大量运用了位操作和无锁算法,核心组件:

  • ForkJoinPool:基于工作窃取算法的线程池,负责全局任务调度与负载均衡。
  • ForkJoinTask:可递归Fork和Join的任务单元,自带状态机驱动完成通知。
  • ForkJoinWorkerThread:拥有独立队列的线程,优先执行本地任务,空闲时窃取外部队列任务。
  • WorkQueue:无锁双端队列,支持FIFO先进先出和LIFO后进先出,实现高效任务分发与窃取。

2 类和接口

2.1 ForkJoinPool

ForkJoinPool是分支合并池,类似于线程池ThreadPoolExecutor类,同样是ExecutorService接口的一个实现类。

在ForkJoinPool类中提供了三个构造方法:

java
1
2
3
public ForkJoinPool();
public ForkJoinPool(int parallelism);
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode);

最终调用的是下面这个私有构造器:

java
1
private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix);

参数含义:

  • parallelism:并行级别,默认为CPU核心数,ForkJoinPool里线程数量与该参数有关,但它不表示最大线程数。
  • factory:线程工厂,默认为DefaultForkJoinWorkerThreadFactory,其实就是用来创建ForkJoinWorkerThread线程对象。
  • handler:异常处理器。
  • mode:调度模式,FIFO_QUEUE表示本地队列先进先出,LIFO_QUEUE表示本地队列后进先出。
  • workerNamePrefix:线程的名称前缀。

成员变量:

  • config:创建ForkJoinPool的配置,int类型的变量,占32位内存:
    • 低16位表示parallelism。
    • 第17位表示mode,0表示队列后进先出,1表示队列先进先出。
    • 第32位表示是否共享模式,0表示普通模式,1表示共享模式,队列没有线程,只能被其他线程窃取任务。
  • ctl:ForkJoinPool的主要控制字段,long类型的变量,占64位内存:
    • 第63~48位表示激活线程数量,值为激活线程数减去parallelism(补码表示),线程激活则加1,线程停用则减1。当累积增加parallelism时第63位翻转为0,则不允许再激活线程。
    • 第47~32位表示所有线程数量,值为所有线程数减去parallelism(补码表示),创建线程则加1,终止线程则减1。当累积增加parallelism时第47位翻转为0,则不允许再创建线程。
    • 第31~0位表示非激活线程链中top线程的本地队列的scanState属性:
      • 第15~0位表示非激活线程链中top线程的本地队列在workQueues数组中的索引。
      • 第31~16位表示非激活线程链中top线程的版本计数和线程状态。
  • workQueues:WorkQueue数组,奇数索引的队列可以关联线程并接收线程提交的本地任务,偶数索引的队列只能接收外部任务。
  • factory:创建线程的工厂。

2.2 ForkJoinTask

ForkJoinTask是Future接口的抽象实现类,提供了用于分解任务的fork()方法和用于合并任务的join()方法。

在ThreadPoolExecutor类中,线程池执行任务调用的execute()方法中要求传入Runnable接口的实例。但是在ForkJoinPool类中,除了可以传入Runnable接口的实例外,还可以传入ForkJoinTask抽象类的实例,并且传入Runnable接口的实例也会被适配为ForkJoinTask抽象类的实例。

2.3 RecursiveTask

通常情况下使用ForkJoinTask抽象类的实例,并不需要直接继承ForkJoinTask类,只需要继承其子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask:用于有返回结果的任务,最常用。

2.4 ForkJoinWorkerThread

ForkJoinWorkerThread类是Thread的子类,作为线程池中的线程执行任务,其内部维护了一个WorkerQueue类型的双向任务队列。

线程在执行任务时,优先处理本地任务队列中的任务(支持FIFO和LIFO),当本地任务队列为空时,会窃取外部任务队列中的任务(FIFO)。

2.5 WorkerQueue

WorkerQueue类是ForkJoinPool类的一个内部类,存储ForkJoinTask实例的双端队列。

3 使用

任务类定义,因为需要返回结果,所以继承RecursiveTask,并覆写compute方法。

任务的拆分通过ForkJoinTask的fork方法执行,join方法用于等待任务执行后返回。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Demo {
public static void main(String[] args) {
SumTask sumTask = new SumTask(1, 100);
ForkJoinPool pool = new ForkJoinPool();
try {
ForkJoinTask<Integer> task = pool.submit(sumTask);
System.out.println(task.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
}

class SumTask extends RecursiveTask<Integer> {
// 拆分阈值
private static final int THRESHOLD = 10;
// 拆分开始值
private int begin;
// 拆分结束值
private int end;
public SumTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
Integer value = 0;
if (end - begin <= THRESHOLD) {
// 小于阈值,直接计算
for (int i = begin; i <= end; i++) {
value += i;
}
} else {
// 大于阈值,递归计算
int middle = (begin + end) / 2;
SumTask beginTask = new SumTask(begin, middle);
SumTask endTask = new SumTask(middle + 1, end);
beginTask.fork();
endTask.fork();
value = beginTask.join() + endTask.join();
}
return value;
}
}

结果:

log
1
5050

评论