抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文学习了如何在高并发场景下使用线程池创建线程。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8

1 类和接口

1.1 Executor

Executor是一个顶层接口,在它里面只声明了一个execute()方法,用来在接下来的某个时刻执行提交的任务。

常用方法:

java
1
void execute(Runnable command);

1.2 ExecutorService

ExecutorService接口继承了Executor接口,并声明了一些方法。

常用方法:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 用来关闭线程池,此时线程池不能够接受新的任务,它会等待所有任务执行完毕
void shutdown();
// 用来关闭线程池,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务
List<Runnable> shutdownNow();
// 当调用shutdown或shutdownNow方法后返回为true
boolean isShutdown();
// 若关闭后所有任务都已完成,则返回true。注意除非首先调用shutdown或shutdownNow,否则isTerminated永不为true
// 当调用shutdown方法后,如果所有提交的任务都已完成,返回为true
// 当调用shutdownNow方法后,成功停止,返回为true
boolean isTerminated();
// 用来向线程池提交任务,并返回任务执行的结果
<T> Future<T> submit(Callable<T> task);
// 用来向线程池提交任务,并返回任务执行的结果
<T> Future<T> submit(Runnable task, T result);
// 用来向线程池提交任务,并返回任务执行的结果
Future<?> submit(Runnable task);

1.3 ThreadPoolExecutor

ThreadPoolExecutor是线程池中最核心的一个类,通过间接的方式实现了ExecutorService接口。

构造方法:

java
1
2
3
4
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

参数:

  • corePoolSize:核心池的大小。当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
  • maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。
  • unit:参数keepAliveTime的时间单位,有7种取值。在TimeUnit类中有7种静态属性:
    • TimeUnit.DAYS;// 天
    • TimeUnit.HOURS;// 小时
    • TimeUnit.MINUTES;// 分钟
    • TimeUnit.SECONDS;// 秒
    • TimeUnit.MILLISECONDS;// 毫秒
    • TimeUnit.MICROSECONDS;// 微妙
    • TimeUnit.NANOSECONDS;// 纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务。一般来说,这里的阻塞队列有以下几种选择:
    • ArrayBlockingQueue:有界阻塞队列。
    • LinkedBlockingQueue:无界阻塞队列。
    • SynchronousQueue:不存储元素阻塞队列,即单个元素阻塞队列。
    • DelayQueue:延时阻塞队列。
  • threadFactory:线程工厂,主要用来创建线程。
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
    • ThreadPoolExecutor.AbortPolicy:抛异常,默认策略。
    • ThreadPoolExecutor.CallerRunsPolicy:调用线程处理新任务。
    • ThreadPoolExecutor.DiscardPolicy:丢弃新任务。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃等待最久任务,将新任务加入队列。

2 使用

2.1 线程池状态

常用属性:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 状态:RUNNING。工作线程数量:0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 使用32位的高3位表示状态,余下29位表示容量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大容量为:^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 运行状态用32位整型的高3位表示
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 封装和解析ctl值
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

状态的转换:

  • RUNNING -> SHUTDOWN:调用了shutdown()方法。
  • RUNNING / SHUTDOWN -> STOP:调用了shutdownNow()方法。
  • SHUTDOWN -> TIDYING:当队列中任务都被取出执行完成,并且所有工作线程都结束了任务,再没有未被执行的任务。
  • STOP -> TIDYING:线程池中没有正在运行的线程,任务队列中任务都被取消。
  • TIDYING -> TERMINATED:钩子方法terminated()执行完毕后。

状态说明:

  • RUNNING:运行态,可处理新任务并执行队列中的任务。
  • SHUTDOW:关闭态,不接受新任务,但处理队列中的任务。
  • STOP:停止态,不接受新任务,不处理队列中任务,且打断运行中任务。
  • TIDYING:整理态,所有任务已经结束,将执行terminated()方法。
  • TERMINATED:结束态,terminated()方法已完成。

创建线程池之后不会马上创建线程,在提交任务后才会创建线程,可以手动设置创建线程池后马上创建线程:

  • prestartCoreThread():初始化一个核心线程。
  • prestartAllCoreThreads():初始化所有核心线程。

关闭线程池:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。

动态调整线程池容量:

  • setCorePoolSize():设置核心池大小。
  • setMaximumPoolSize():设置线程池最大能创建的线程数目大小。

2.2 按需创建线程池

Executors类中提供了几个静态方法创建线程池:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建容量为1的缓冲池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
// 创建指定容量的缓冲池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
// 创建容量可变的缓冲池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
// 创建指定容量的定时缓冲池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}:

它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了:

  1. newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue。
  2. newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue。
  3. newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
  4. newScheduledThreadPool将corePoolSize设置为入参,将maximumPoolSize设置为Integer.MAX_VALUE,使用的DelayQueue。

不建议直接使用这几个静态方法创建线程池:

  • SingleThreadPool和FixedThreadPool允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
  • CachedThreadPool和ScheduledThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

建议通过ThreadPoolExecutor的方式创建线程池,这样的处理方式能更加明确线程池的运行规则,规避资源耗尽的风险。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

2.3 线程创建策略

如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务。

如果当前线程池中的线程数目大于等于corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中:

  • 若添加成功,则该任务会等待空闲线程将其取出去执行。
  • 若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务。

如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理。

如果线程池中的线程数量大于corePoolSize时,当某线程空闲时间超过keepAliveTime时,线程将被终止,直至线程池中的线程数目不大于corePoolSize。

如果允许为核心池中的线程设置存活时间,当核心池中的线程空闲时间超过keepAliveTime时,核心线程也会被终止。

2.4 缓存队列

超出一定数量的任务会转移队列中,队列与池里的线程大小的关联表现在:

  • 如果运行的线程数小于corePoolSize,会创建线程执行任务。
  • 如果运行的线程已大于corePoolSize,会把新的任务放于队列中。如果队列已到最大时,会继续创建线程,直到超过maximumPoolSize。如果线程超过maximumPoolSize,将拒绝接收新的任务。

而添加任务到队列时,有几种常规的策略:

  • 有界队列。如ArrayBlockingQueue,当定义了maximumPoolSizes时,使用有界队列可以预防资源的耗尽,但是增加了调整和控制队列的难度,队列的大小和线程池的大小是相互影响的,使用很大的队列和较小的线程池会减少CPU消耗、操作系统资源以及线程上下文开销,但却人为的降低了吞吐量。如果任务是频繁阻塞型的(I/O),系统是可以把时间片分给多个线程的。而采用较小的队列和较大的线程池,虽会造成CPU繁忙,但却会遇到调度开销,这也会降低吞吐量。
  • 无界队列。如LinkedBlockingQueue,当核心线程正在工作时,使用不用预先定义大小的无界队列,使新任务等待,所以如果线程数是小于corePoolSize时,将不会有入队操作。这种策略将很适合那些相互独立的任务,如Web服务器。无界队列可能会堆积大量的请求,从而导致OOM。
  • 直接传递。如SynchronousQueue,不存储元素的阻塞队列,将任务直接交给线程。每一个入队操作必须等待另一个线程移除操作,否则入队将一直阻塞。当处理一些可能有内部依赖的任务时,这种策略避免了加锁操作。直接传递一般不能限制maximumPoolSizes以避免拒绝接收新的任务,可能会造成增加无限多的线程导致OOM。
  • 延时队列。如DelayQueue,队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。延时队列也是一个无界队列,因此往队列中插入数据的操作永远不会被阻塞,而只有获取数据的操作才会被阻塞。延时队列的maximumPoolSizes没有限制,可能会造成增加无限多的线程导致OOM。

2.5 拒绝策略

当线程数量达到缓存队列的最大容量时,线程池则已经饱和了,此时则不会接收新的任务。会调用RejectedExecutionHandler的rejectedExecution()方法执行饱和策略。

在线程池内部预定义了几种处理策略:

  • 终止执行(AbortPolicy)。默认策略,会抛出一个RejectedExecutionException运行异常到调用者线程来阻止系统运行。
  • 调用者线程来运行任务(CallerRunsPolicy)。这种策略会由调用execute()方法的线程来执行任务,它提供了一个简单的反馈机制并能降低新任务的提交频率。
  • 丢弃策略(DiscardPolicy)。丢弃提交的任务。
  • 丢弃队列里最老的一个任务(DiscardOldestPolicy)。丢弃工作队列中等待最久一个任务,并将提交的任务加入队列。

2.6 合理配置

一般需要根据任务的类型来配置线程池大小。

如果是CPU密集型任务,即需要执行大量运算且没有阻塞的任务,就需要设置尽量少的线程数,减少线程上下文切换,参考值可以设为NCPU+1

如果是IO密集型任务,即线程存在阻塞或等待,并不是一直再执行,就需要设置尽量多的线程数,参考值可以设置为2*NCPU

另外,如果是IO密集型任务,也可以根据NCPU/(1-阻塞系数)这个公式计算,阻塞系数是在0.8到0.9之间的一个值。

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

3 实现原理

3.1 属性

常用属性:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
// 线程池的主要状态锁,控制线程池状态和线程池大小的改变
private final ReentrantLock mainLock = new ReentrantLock();
// 用来存放工作集
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用于延时的条件队列
private final Condition termination = mainLock.newCondition();
// 线程池中曾经出现过的最大线程数
private int largestPoolSize;
// 已经执行完毕的任务个数
private long completedTaskCount;
// 线程工厂,用来创建线程
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
// 线程存活时间
private volatile long keepAliveTime;
// 是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小,即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列
private volatile int corePoolSize;
// 线程池最大能容忍的线程数
private volatile int maximumPoolSize;

3.2 源码

3.2.1 内部类

内部类:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 维护了自己的一个Thread对象
final Thread thread;
// 维护了自己的一个Runnable对象
Runnable firstTask;
// 记录完成的任务数
volatile long completedTasks;

// 使用传入的Runnable对象和ThreadFactory产生的Thread对象生成Worker对象
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

// 重写了Runnable里的run()方法
public void run() {
runWorker(this);
}

// ...
}

3.2.2 提交任务

最核心的任务提交方法是execute()方法,虽然通过submit()方法也可以提交任务,但是实际上submit()方法里面最终调用的还是execute()方法。

提交任务:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 在接下来的某个时刻执行任务
public void execute(Runnable command) {
// 如果是空任务则抛出异常
if (command == null)
throw new NullPointerException();
// 获取ctl的值
int c = ctl.get();
// 1 如果工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 尝试添加线程
if (addWorker(command, true))
return;
// 添加线程失败则重新获取ctl的值
c = ctl.get();
}
// 2 如果工作线程数大于或者等于核心线程数
// 2.1 如果线程池正在运行,则尝试将任务放入缓存队列
if (isRunning(c) && workQueue.offer(command)) {
// 重新根据ctl检查状态,因为可能在这段时间进程死掉了
int recheck = ctl.get();
// 2.1.1 如果线程池不处于运行状态,则尝试删除刚刚放入缓存队列的任务
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);
// 2.1.2 如果线程池在运行状态
// 如果工作线程数是0
else if (workerCountOf(recheck) == 0)
// 添加无初始任务的线程
addWorker(null, false);
}
// 2.2 如果线程池不在运行状态,或者缓存队列已满
// 尝试添加线程
else if (!addWorker(command, false))
// 添加失败则拒绝任务
reject(command);
}

3.2.3 添加线程

添加线程:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// 添加线程,第一个参数是要添加的任务,第二个参数是是否核心线程,返回是否插入成功
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获取ctl的值
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);

// 如果线程还未SHUTDOWN,此时是可以添加新线程的
// 如果线程是SHUTDOWN状态,而且传进来的任务为空,并且任务队列不为空的时候,此时是可以添加新线程的
// 将这两个条件取反,如果满足,则返回插入失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果线程数大于最大值,或者当core为true时大于核心线程数,当core为false时大于最大线程数,返回失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试通过CAS添加线程数目,成功则跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 添加失败则重新获取ctl的值
c = ctl.get();
// 如果状态改变则重新进行retry循环,如果没有改变则重新尝试添加线程数
if (runStateOf(c) != rs)
continue retry;
}
}

// 线程运行标志位
boolean workerStarted = false;
// 线程添加标志位
boolean workerAdded = false;
// 封装了任务的Worker对象
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 使用全局锁添加线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取线程池状态
int rs = runStateOf(ctl.get());
// 如果处于运行状态,或者是SHUTDOWN状态并且添加的任务为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程已经在运行,则抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加线程
workers.add(w);
// 更新线程数量的历史最大值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 更新线程添加标志位
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功,则运行线程,并更新线程运行标志位
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程运行标志位是false,则线程添加失败进行回滚
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

失败回滚:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 线程添加失败进行回滚
private void addWorkerFailed(Worker w) {
// 使用全局锁回滚线程个数的添加
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 移除线程
if (w != null)
workers.remove(w);
// 减少线程数
decrementWorkerCount();
// 尝试终止线程
tryTerminate();
} finally {
mainLock.unlock();
}
}

3.2.4 启动线程

启动线程:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 启动线程最终调用的还是runWorker()方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取初始Runnable对象
Runnable task = w.firstTask;
// 置空初始Runnable对象
w.firstTask = null;
// 释放锁,允许被中断
w.unlock();
// 因为运行异常导致线程突然终止的标志
boolean completedAbruptly = true;
try {
// 获取任务,如果没有任务可以获取,则此循环终止
while (task != null || (task = getTask()) != null) {
// 获取工作线程锁
w.lock();
// 如果线程池关闭,则确保线程被中断
// 如果线程池没有关闭,则确保线程不会被中断。这就要求进行重新获取ctl,以便在清除中断时处理shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 回调方法,给子类具体实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 调用Runnable对象的run()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 回调方法,给子类具体实现
afterExecute(task, thrown);
}
} finally {
// 置空,如果进入下一个循环可以继续取任务
task = null;
// 完成数累加
w.completedTasks++;
// 释放工作线程锁
w.unlock();
}
}
// 说明不是用户任务异常引起的
completedAbruptly = false;
} finally {
// 程序退出
processWorkerExit(w, completedAbruptly);
}
}

3.2.5 获取任务

获取任务:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 获取任务,控制线程池线程数量
private Runnable getTask() {
// 获取任务超时标志位
boolean timedOut = false;

for (;;) {
// 获取ctl的值
int c = ctl.get();
// 获取线程池的状态
int rs = runStateOf(c);
// 如果是STOP状态之后,或者在SHUTDOWN状态之后并且任务队列是空的
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 使用CAS让线程数量减一
decrementWorkerCount();
// 返回并使一个线程退出
return null;
}
// 获取线程数
int wc = workerCountOf(c);
// 如果允许为核心线程设置存活时间,或者线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果任务数大于最大线程数,或者超时并且允许为核心线程设置存活时间,或者超时并且任务数大于核心线程数
// 而且,线程数大于1,或者任务队列是空的
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 使用CAS让线程数量减一
if (compareAndDecrementWorkerCount(c))
// 线程数量减一成功,返回并使一个线程退出
return null;
// 线程数量减一失败,说明线程数量已被抢先改变,继续循环
continue;
}
try {
// 超时获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果在生效时间内获取到任务则返回
if (r != null)
return r;
// 否则将超时标志设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果有异常,则将超时标志设置为false
timedOut = false;
}
}
}

3.3.6 程序退出

程序退出:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 处理线程退出,第一个参数是要处理的Worker,第二个参数用来判断是否异常导致退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是异常退出,需要减少线程数。如果是正常退出,则不需要调整
if (completedAbruptly)
decrementWorkerCount();
// 获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 完成任务数自增
completedTaskCount += w.completedTasks;
// 移除线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终结线程
tryTerminate();
// 获取ctl的值
int c = ctl.get();
// 如果线程池关闭了
if (runStateLessThan(c, STOP)) {
// 如果线程正常退出
if (!completedAbruptly) {
// 如果允许超时关闭核心线程,那就是0,否则就取核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果没有核心线程,并且队列中的任务不为空,则设置最少线程为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程数量大于等于正常工作的数量则不再添加新的线程
if (workerCountOf(c) >= min)
return;
}
// 添加线程
addWorker(null, false);
}
}

4 使用

创建线程池,核心线程数为2,最大线程数为4,任务队列大小为3,启动7个线程。

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Demo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(3));
for (int i = 1; i <= 7; i++) {
Runnable runnable = new DemoThread(i);
executor.execute(runnable);
System.out.println("线程编号:" + i + ",线程池:" + executor.getPoolSize() + ",队列:" + executor.getQueue().size());
}
executor.shutdown();
}
}

class DemoThread implements Runnable {
int taskNo = 0;

public DemoThread(int taskNo) {
this.taskNo = taskNo;
}

@SuppressWarnings("static-access")
public void run() {
try {
System.out.println("task " + taskNo);
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

结果:

log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
线程编号:1,线程池:1,队列:0
task 1
线程编号:2,线程池:2,队列:0
线程编号:3,线程池:2,队列:1
线程编号:4,线程池:2,队列:2
task 2
线程编号:5,线程池:2,队列:3
线程编号:6,线程池:3,队列:3
线程编号:7,线程池:4,队列:3
task 6
task 7
// 等待1s
task 3
task 4
task 5

说明:

  • 在前2个任务放到线程池里时,没有超过核心线程数,所以创建新的线程,执行任务。
  • 在第3个任务放到线程池里时,超过了核心线程数,所以放到了任务缓存队列里,等待执行任务。
  • 在第4个任务放到线程池里时,超过了核心线程数,所以放到了任务缓存队列里,等待执行任务。
  • 在第5个任务放到线程池里时,超过了核心线程数,所以放到了任务缓存队列里,等待执行任务。
  • 在第6个任务放到线程池里时,超过了核心线程数,超过了缓存队列长度,线程池的线程数量小于线程池的最大容量,所以创建新的线程,执行任务。
  • 在第7个任务放到线程池里时,超过了核心线程数,超过了缓存队列长度,线程池的线程数量小于线程池的最大容量,所以创建新的线程,执行任务。
  • 等待任务执行完毕,释放线程,获取任务缓存队列里的任务,执行任务。
  • 如果有第8个任务放到线程池里,超过了核心线程数,超过了缓存队列长度,线程池的线程数量大于线程池的最大容量,所以产生RejectedExecutionException拒绝任务异常。

评论