0%

Java并发-CyclicBarrier

作用

  • CyclicBarrier构造函数传入一个数字N。
  • 线程调用CyclicBarrier.await()会阻塞等待,直到有第N个线程调用CyclicBarrier.await(),所有线程一起执行await()方法后续逻辑。
  • 再次调用await()可以继续这一波操作,循环使用。

比喻:

  • 人到齐了一起走,没到齐每个人都一直等着不走。
  • 走完了过后,下一波人来了继续这个流程。

可循环利用的屏障。

举例:

CyclicBarrier barrier = new CyclicBarrier(5);

然后各个线程调用barrier.await();

当有5个线程await()过后,会继续执行await()后续代码

底层原理

调用CyclicBarrier的await()方法,通过ReentrantLock先加锁,然后用Condition的await实现等待。

每次调用await()会计数,当第个N个线程执行await()后,会对Condition对象signalAll()来让所有等待线程的继续执行。

然后重新计数,继续同样的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

CountDownLatch和CyclicBarrier区别?

CountDownLatch主要是实现了1个或N个线程需要等待其他线程完成某项操作之后才能继续往下执行操作,描述的是1个线程或N个线程等待其他线程的关系。

CyclicBarrier主要是实现了多个线程之间相互等待,直到所有的线程都满足了条件之后各自才能继续执行后续的操作,描述的多个线程内部相互等待的关系。

CountDownLatch是一次性的,而CyclicBarrier则可以被重置而重复使用。