抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文主要学习了JUC提供的三个常用的辅助类。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8

1 CountDownLatch

1.1 简介

用于将线程阻塞某段时间,等其他线程完成后,唤醒被阻塞的线程继续执行。

CountDownLatch在内部维护了一个计数器,需要在构造方法中传入一个非负整数。

当线程调用await()方法后,判断计数器是否为0,如果为0则继续执行,如果不为0则阻塞线程。

当其他线程调用countDown()方法后,将计数器减1,并判断计数器是否为0,如果计数器不为0则无事发生,如果计数器为0则唤醒被阻塞的线程继续执行。

计数器无法被重置,只能使用一次,不能重复使用。

1.2 源码

1.2.1 构造方法

构造方法需要传入一个非负整数,否则会抛出异常:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// CountDownLatch的构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
...
// Sync继承自AQS
private static final class Sync extends AbstractQueuedSynchronizer {
...
// Sync的构造方法
Sync(int count) {
setState(count);
}
...
}
...
// AQS类的setState()方法
protected final void setState(int newState) {
state = newState;
}

1.2.2 阻塞方法

调用await()方法后,判断计数器是否为0,如果为0则继续执行,如果不为0则阻塞线程:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// CountDownLatch的await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
...
// AQS的acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
...
// Sync的tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

1.2.3 减少计数

调用countDown()方法将计数器减1,在执行会对进行双层判断:

  • 在减少计数前,判断计数器是否为0,为0则返回false,不为0则减少计数。
  • 在减少计数后,判断计数器是否为0,为0则返回true,不为0则返回false。

只有在减少后判断等于0的时候才会唤醒等待线程:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// CountDownLatch的countDown()方法
public void countDown() {
sync.releaseShared(1);
}
// AQS的releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync的tryReleaseShared()方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

1.3 使用

示例:

java
1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
CountDownLatch cdl = new CountDownLatch(2);
for (int i = 1; i <= cdl.getCount(); i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "离开课堂");
cdl.countDown();
}, i + "号").start();
}
cdl.await();
System.out.println("所有学生都已离开课堂");
}

结果:

log
1
2
3
1号离开课堂
2号离开课堂
所有学生都已离开课堂

2 CyclicBarrier

2.1 简介

用于将线程阻塞某段时间,等其他线程也被阻塞后,唤醒被阻塞的线程继续执行。

CyclicBarrier在内部维护了一个计数器和一个屏障操作,需要在构造方法中传入一个正整数和一个屏障方法。

当线程调用await()方法后,将计数器减1,并判断计数器是否为0,如果计数器不为0则阻塞线程,如果计数器为0则唤醒被阻塞的线程继续执行,并由当前线程执行屏障方法。

2.2 源码

2.2.1 构造方法

构造方法有两个,最终调用的是同一个。

要求传入一个正整数和一个屏障方法:

java
1
2
3
4
5
6
7
8
9
10
11
// CyclicBarrier的构造方法,指定计数器和屏障方法
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
// CyclicBarrier的构造方法,指定计数器,默认屏障方法为空
public CyclicBarrier(int parties) {
this(parties, null);
}

2.2.2 阻塞方法

调用await()方法后,线程会被阻塞,直到达到指定数量后,唤醒阻塞线程,并由当前线程执行屏障操作:

java
1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

2.3 使用

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
CyclicBarrier cb = new CyclicBarrier(7, () -> {
System.out.println(Thread.currentThread().getName() + "龙珠是最后一颗,龙珠集齐,召唤神龙");
});
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "龙珠被收集");
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}

结果:

log
1
2
3
4
5
6
7
8
1龙珠被收集
4龙珠被收集
5龙珠被收集
7龙珠被收集
3龙珠被收集
2龙珠被收集
6龙珠被收集
4龙珠是最后一颗,龙珠集齐,召唤神龙

3 Semaphore

3.1 简介

用于控制同时访问许可证的线程数量。

Semaphore在内部维护了一个许可证数量,需要在构造方法中传入一个非负整数。

当线程调用acquire()方法后,尝试将许可证数量减1并判断是否大于等于0,如果为true则继续执行,如果为false则阻塞并等待,直到有许可证释放重新尝试获取许可证。

当线程调用release()方法后,释放许可证,唤醒被阻塞的线程抢占许可证。

3.2 源码

3.2.1 构造方法

构造方法有两个,都需要传入整型的许可证数量。

第一个使用非公平的锁:

java
1
2
3
4
// Semaphore类的非公平锁的构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

第二个可以传入参数指定使用公平锁还是非公平锁:

java
1
2
3
4
// Semaphore类的公平锁的构造方法
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

3.2.2 获取许可证

调用acquire()方法后,线程尝试获取许可证,获取失败会被阻塞,当有许可证被释放时重新抢占许可证:

java
1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

3.2.3 释放许可证

调用release()方法后,线程释放许可证,唤醒被阻塞的线程抢占许可证:

java
1
2
3
public void release() {
sync.releaseShared(1);
}

3.3 使用

示例:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
Semaphore s = new Semaphore(2);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
s.acquire();
System.out.println(Thread.currentThread().getName() + "抢到了");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放了");
s.release();
}
}, String.valueOf(i)).start();
}
}

结果:

log
1
2
3
4
5
6
7
8
9
10
11
12
13
2抢到了
1抢到了
// 等待1s
1释放了
2释放了
4抢到了
3抢到了
// 等待1s
3释放了
4释放了
5抢到了
// 等待1s
5释放了

评论