0%

同步方式

同步方式

除了使用wait()/notify()来实现同步之外,在java.util.concurrent包中还有几种进行同步的方式,如信号量、屏障、闭锁和交换器。

信号量Semaphore

对于synchronized和ReentrantLock,一次都只允许一个线程访问一个资源,而Semaphore却可以指定多个线程,同时访问某一个资源,主要作用就是控制并发的数量,并不能保证线程的安全性

Semaphore类内部继承AQS(AbstractQueuedSynchronizer)实现(使用的是共享锁),提供了允许的最大数量,以及公平策略,默认是非公平的,可以控制某个资源被同时访问的任务数

信号量就是一个计数器,用来保护一个或多个共享资源的访问,如果线程要访问一个共享资源,必须先获得信号量,信号量内部计数器大于0,则计数器减一,允许访问这个共享资源,一旦达到0,就会对新线程进行阻塞;当线程用完某个共享资源时,信号量必须释放,计数器加一,以使其他线程能够访问共享资源

源码

1
2
3
4
5
6
7
8
9
// permits是初始化信号量个数,还可以调用release方法来动态的增加个数
public Semaphore(int permits) {
// 默认使用的是非公平策略
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以使用信号量来进行限流,使用acquire()来获得许可,获得到许可之后,信号量的允许数量就减少一个,当全部被占用之后,就会阻塞,使用release()方法来释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// 尝试获取一个许可,获取不到则返回false,不会进行阻塞
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 在指定时间内尝试获取1个许可,如果获取不到则返回false
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 获取信号量许可,允许被中断
public void acquire() throws InterruptedException {
// 获取一个信号量资源
sync.acquireSharedInterruptibly(1);
}
// 与acquire类似,只是不允许被中断
public void acquireUninterruptibly() {
sync.acquireShared(1);
}

// AQS中的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 调用Sync子类方法尝试获取
if (tryAcquireShared(arg) < 0)
// 获取失败则放入阻塞队列
doAcquireSharedInterruptibly(arg);
}


// 释放信号量资源
public void release() {
sync.releaseShared(1);
}

// AQS中的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 调用sync的方法来尝试释放资源
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前信号量值
int current = getState();
// 信号量+所释放的值
int next = current + releases;
// 溢出
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // 更新信号量值
return true;
}
}

// 获取当前可用的许可数,但是许可的数量可能在实时的改变,并不是固定的数量,一般用于调试
public int availablePermits() {
return sync.getPermits();
}
// 获取并返回立即可用的所有许可个数,并将可用许可置为0
public int drainPermits() {
return sync.drainPermits();
}
// 获取等待许可的线程个数
public final int getQueueLength() {
return sync.getQueueLength();
}
// 判断有没有线程在等待许可
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

如果不想在获取的时候进行阻塞,可以使用tryAcquire()方法来尝试获取许可

非公平策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
// 父类Sync构造器为setState(permits);
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}


final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取当前信号量
int available = getState();
// 计算剩余值
int remaining = available - acquires;
// 没有剩余值或者cas操作成功,则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
公平策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 查看队列中是否有等待的
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
1
2
3
4
5
6
7
8
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

可以将信号量看做是一个有界的阻塞容器

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public class TestSemaphore {

public static void main(String[] args) {
UseSemaphore useSemaphore = new UseSemaphore();

for(int i = 0; i< 60;i++){
ThreadP threadP = new ThreadP(useSemaphore);
threadP.start();
}

for(int i = 0; i< 60;i++){
ThreadC threadC = new ThreadC(useSemaphore);
threadC.start();
}
}

// 生产者线程
static class ThreadP extends Thread{

UseSemaphore useSemaphore;
public ThreadP(UseSemaphore useSemaphore){
super();
this.useSemaphore = useSemaphore;
}

@Override
public void run() {
useSemaphore.put();
}
}
// 消费者线程
static class ThreadC extends Thread{

UseSemaphore useSemaphore;
public ThreadC(UseSemaphore useSemaphore){
super();
this.useSemaphore = useSemaphore;
}
@Override
public void run() {
useSemaphore.poll();
}
}

static class UseSemaphore{
private volatile Semaphore consumerSemaphore = new Semaphore(20); // 消费者
private volatile Semaphore producerSemaphore = new Semaphore(5); // 生产者

private volatile ReentrantLock lock = new ReentrantLock();
private volatile Condition consumerCondition = lock.newCondition();
private volatile Condition producerCondition = lock.newCondition();

private volatile Object[] store = new Object[10]; // 可存储的商品数量

// 是否已空
private boolean isEmpty(){
boolean isEmpty = true;
for(int i = 0;i < store.length;i++){
if(store[i] != null){
isEmpty = false;
break;
}
}

return isEmpty;
}

// 是否已满
private boolean isFull(){
boolean isFull = true;
for(int i = 0;i < store.length;i++){
if(store[i] == null){
isFull = false;
break;
}
}

return isFull;
}

// 生产
public void put(){
try{
producerSemaphore.acquire();
lock.lock();
while (isFull()){
System.out.println("仓库已满,生产者在等待生产");
producerCondition.await();
}

for(int i = 0;i<store.length;i++){
if(store[i] == null){
store[i] = "商品"+ UUID.randomUUID().toString();
System.out.println(Thread.currentThread().getName() + "生产了"+store[i]);
break;
}
}
consumerCondition.signalAll();
lock.unlock();
} catch (InterruptedException e){
e.printStackTrace();
} finally {
producerSemaphore.release();
}

}

// 消费
public void poll(){
try{
consumerSemaphore.acquire();
lock.lock();
while (isEmpty()){
System.out.println("仓库没货了,消费者在等待商品");
consumerCondition.await();
}

for(int i = 0;i<store.length;i++){
if(store[i] != null){
System.out.println(Thread.currentThread().getName()+"消费了商品"+store[i]);
store[i] = null;
break;
}
}
producerCondition.signalAll();
lock.unlock();
} catch (InterruptedException e){
e.printStackTrace();
} finally {
consumerSemaphore.release();
}

}
}
}

闭锁CountDownLatch

一个或多个线程使其在某个条件成熟后再执行

CountDownLatch类内部继承AQS(AbstractQueuedSynchronizer)实现(使用的是共享锁),在完成一组其他线程中正在执行的操作之前,允许一个或多个线程一直等待,内部提供了一个计数器count,判断count计数不为0时则调用了await()的线程呈wait状态,也就是在屏障处等待,提供了countDown方法使得计数器减一,直到计数器为0表示条件成熟,此时所有调用await方法而阻塞的线程都会被唤醒

在完成某些运算时,只有其他所有线程全部完成运算时,当前运算才会继续,但是当闭锁打开之后就只能保持打开的方式,不可以再次闭合,用来确保某个运算在其他运算之后

就像是一个门闩,门打开之前所有的线程都被阻断,一旦大门打开,所有的线程都将通过,且不能再次闭合

如:确保某个服务在其依赖的所有服务都已经启动后才启动

采用共享锁来实现

源码

闭锁状态是一个计数器,初始为正数,表示需要等待事件的数量。countDown()递减计数器,await()等待计数器达到0,表示所有需要等待的事件都已经发生了,如果计数器非0,会一直阻塞到计数器值为0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// CountDownLatch中持有的是共享锁
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 计数count无法被重置,只会进行减法操作
Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

// 尝试获取锁
// 负数:当前线程获取失败
// 零:当前线程获取成功,但是后续的线程不能再获取了
// 正数:当前线程获取成功,后续线程还可以获取
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 循环进行CAS,直到完成减一操作
for (;;) {
int c = getState();
// 如果当前状态为0,则直接返回
if (c == 0)
return false;
// 使用CAS让计算器减一
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
// CountDownLatch中的队列同步器
private final Sync sync;

// 这个数就是线程需要等待完成的操作数目
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

// 需要等待其他事件在完成时调用,获取共享锁,判断count是否为0,如果不为0则等待,减到0时,等待的线程继续运行
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 每个被等待的事件在完成时调用,释放共享锁
// 计算器的值count减一
public void countDown() {
sync.releaseShared(1);
}

// 获取当前计数值
public long getCount() {
return sync.getCount();
}


}

// AbstractQueuedSynchronizer中的acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// (getState() == 0) ? 1 : -1 state初始值为所传入的count,所以在countDown调用之前都是大于0的
// 如果state等于0,则会直接返回,否则的话就会阻塞
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// AQS中的方法,释放共享锁
public final boolean releaseShared(int arg) {
// sync中重写的tryReleaseShared
if (tryReleaseShared(arg)) {
// AQS释放资源
doReleaseShared();
return true;
}
return false;
}

// AbstractQueuedSynchronizer中的doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Node.SHARED 为共享锁模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 检查当前节点的前驱节点是不是头节点
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头节点并且唤醒后继节点,调用doReleaseShared方法
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 检查线程是否被阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestCountDownLatch {

public static void main(String[] args) {
// CountDownLatch内部存在一个计数器,声明数量5
// countDown方法对计数器进行减一操作,await方法会一直阻塞到计数器为0
CountDownLatch latch = new CountDownLatch(5);
LatchDemo latchDemo = new LatchDemo(latch);

long start = System.currentTimeMillis();

for (int i = 0;i<5;i++){
new Thread(latchDemo).start();
}

try {
// 阻塞一直到减至0时
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("累计耗时"+(System.currentTimeMillis() - start));

}
}

class LatchDemo implements Runnable{

private CountDownLatch latch;

public LatchDemo(CountDownLatch latch){
this.latch = latch;
}

@Override
public void run() {
try{
for(int i = 0;i<10;i++){
System.out.println(i);
}
} finally {
// 减一表示完成
latch.countDown();
}
}
}

当一个线程要等待某些操作先执行完时,需要调用await()方法,这个方法让线程进入休眠直到等待的所有操作都完成,当某一个操作完成后,调用countDown()方法将CountDownLatch类内部计数器减1,当计数器变成0时,CountDownLatch类将唤醒所有调用await()方法而进入休眠的线程

场景

如模拟并发,启动100个线程同时访问某一个地址,同时并发,可以使用CountDownLatch

屏障CyclicBarrier

可循环使用的屏障

CyclicBarrier类是借助ReentrantLock和Condition来实现的,提供了屏障的实现,允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务。屏障可以看做是一个篱栅,必须等待达到一定数量的人来推开这个篱栅(只有当线程数量足够时才可以继续进行),barrier在释放等待线程后可以重用

CyclicBarrier具有屏障重置性,parties的值可以重置归0

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/** The lock for guarding barrier entry */
// 同步操作锁
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
// 线程拦截器
private final Condition trip = lock.newCondition();
/** The number of parties */
// 每次拦截的线程数
private final int parties;
/* The command to run when tripped */
// 换代前执行的任务,当count减为0时表示当前代结束,需要转到下一代,在转到下一代之前会将所有阻塞的线程唤醒,在唤醒所有线程之前会执行该任务
private final Runnable barrierCommand;
/** The current generation */
// 栅栏的当前代
private Generation generation = new Generation();

/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
// 计数器,初始值为parties,每次await减一,直到为0
private int count;
// parties需要达到的数量
public CyclicBarrier(int parties) {
this(parties, null);
}
// 达到该数量时,执行该Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

当一个线程到达指定的点后,调用await()方法等待其他线程直到其他所有线程到达,当最后一个线程调用await()方法时,CyclicBarrier对象将唤醒所有在等待的线程,继续向后执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// 到达屏障点调用await方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// broken记录当前屏障是否被打破
if (g.broken)
throw new BrokenBarrierException();
// 如果线程被中断,则会通过 breakBarrier 方法将 broken 设置为true,也就是说,如果有线程收到中断通知,直接就打破屏障,停止 CyclicBarrier, 并唤醒所有线程

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 如果index==0,表示所有的线程都到了屏障点,此时执行初始化时传递的任务
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 执行任务
if (command != null)
command.run();
ranAction = true;
// 激活其他因调用await方法而被阻塞的线程,并重置CyclicBarrier,转到下一代
nextGeneration();
return 0;
} finally {
// 确保在任务未成功执行能将所有线程唤醒
if (!ranAction)
breakBarrier();
}
}

// index != 0,说明当前不是最后一个线程调用await方法
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 没有设置超时时间
if (!timed)
// 进行等待
trip.await();
// 设置了超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 条件等待被中断,则判断是否有其他线程已经使屏障破坏。若没有则进行屏障破坏处理,并抛出异常;否则再次中断当前线程
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 如果线程因打破栅栏而被唤醒则抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 如果线程因换代操作被唤醒则返回计数器的值
if (g != generation)
return index;
// 如果线程因时间到了被唤醒则打破栅栏抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

private void nextGeneration() {
// signal completion of last generation
// 唤醒条件队列所有线程
trip.signalAll();
// set up next generation
// 设置计数器的值为需要拦截的线程数
count = parties;
// 重新设置栅栏代次
generation = new Generation();
}
getNumberWaiting

获取有几个线程已经达到了屏障点在进行等待

1
2
3
4
5
6
7
8
9
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
isBroken

查询此屏障是否处于损坏状态

1
2
3
4
5
6
7
8
9
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
getParties

获取parties值

1
2
3
public int getParties() {
return parties;
}
reset

重置屏障

1
2
3
4
5
6
7
8
9
10
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class TestCyclicBarrier {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
CyclicBarrierDemo cyclicBarrierDemo = new CyclicBarrierDemo(cyclicBarrier,10);

for(int i = 0; i<10;i++){
new Thread(cyclicBarrierDemo).start();
}

}

static class CyclicBarrierDemo implements Runnable{
CyclicBarrier cyclicBarrier;
int taskSize;

volatile AtomicInteger count = new AtomicInteger(0);

public CyclicBarrierDemo(CyclicBarrier cyclicBarrier,int taskSize){
this.cyclicBarrier = cyclicBarrier;
this.taskSize = taskSize;
}


@Override
public void run() {
count.incrementAndGet();
if(count.get() == taskSize * 0.3){
System.out.println("30%");
} else if(count.get() == taskSize * 0.5){
System.out.println("50%");
} else if(count.get() == taskSize){
System.out.println("100%");
}
try {
cyclicBarrier.await();
System.out.println("完成");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}

场景

等待并发线程结束

对比CountDownLatch

  • CountDownLatch
    • 一个线程或者多个线程,等待另一个线程或者多个线程完成某个事情之后才能继续执行,减法操作
    • 只能触发一次
    • 基本操作是countDown和await,调用await的线程会阻塞等待countDown被调用足够的次数
  • CyclicBarrier
    • 多个线程相互等待,任何一个线程完成之前,所有的线程都必须等待,加法操作
    • 可重复使用
    • 基本操作只有await,当所有的线程都调用了await,才会继续执行任务,并自动进行重置

移相器Phaser

Phaser类是对于CountDownLatch和CyclicBarrier的升级版本,把并发任务分成多个阶段运行,在下一阶段之前,当前阶段中的所有线程都必须执行完成,类似于可以设置多个屏障

适用于当我们有并发任务需要分解成几步执行,Phaser类机制是在每一步结束的位置对线程进行同步,当所有线程都完成这一步,才允许执行下一步

源码

一个Phaser对象有两种状态

  • 活跃态Active : 当存在参与同步的线程时,Phaser就是活跃的,并且在每个阶段结束的时候进行同步,在这种状态下
  • 终止态Termination:当所有参与同步的线程都取消注册时,Phaser就处于终止状态,在这种状态下,Phaser没有任何参与者。当Phaser对象的onAdvance()方法返回true时,Phaser对象就处于终止状态了,当Phaser是终止态时,同步方法arriveAndAwaitAdvance()方法会立即返回,而不会做任何同步操作
1
2
3
4
5
6
7
8
9
10
11
// 通知phaser对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段,该方法不会与其他线程同步,直接执行后续代码 【慎用】
public int arrive() {
return doArrive(ONE_ARRIVAL);
}


// 在phaser阶段改变时会被自动执行,第一个参数是当前阶段数,第二个参数是注册参与者数量,如果返回false则表示phaser再继续执行,返回true表示phaser已经完成执行并进入终止态
// 可以重写该方法来自定义控制phaser的阶段改变
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
arriveAndAwaitAdvance

该方法类似于CountDownLatch类中的await方法,作用是当前线程已经达到屏障,在此等待一段时间,等条件满足后继续向下一个屏障执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 表示已完成了当前阶段
// 到达进行减一操作,会进行阻塞,等待其他线程到达
public int arriveAndAwaitAdvance() {
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
}
}
}
arriveAndDeregister

退出当前阶段,且当前阶段的parties-1,并且不会参与后续阶段

1
2
3
4
5
// 减少注册者的数目,通知phaser对象对应的线程已经完成了当前阶段,并且他不会参与到下一阶段的操作中
// 当前线程退出,并且parties-1
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}
getPhase

获取已经到达第几个屏障

1
2
3
public final int getPhase() {
return (int)(root.state >>> PHASE_SHIFT);
}
register

既然arriveAndDeregister可以动态的减少parties值,那么也就对应的可以动态的增加parties值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 动态的增加parties值
// 将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程,动态的parties+1
public int register() {
return doRegister(1);
}

// 将指定数目的参与者注册到Phaser中,所有这些参与者都将被当成没有执行完本阶段的线程
// 批量的进行新增parties值
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
return doRegister(parties);
}

// 获取注册的parties数量
public int getRegisteredParties() {
return partiesOf(state);
}
getArrivedParties
1
2
3
4
5
6
7
8
// 获取已经到达的Parties数
public int getArrivedParties() {
return arrivedOf(reconcileState());
}
// 获取还未到达的Parties数
public int getUnarrivedParties() {
return unarrivedOf(reconcileState());
}
awaitAdvance

如果传入阶段参数与当前阶段一致,这个方法会将当前线程置于休眠,直到这个阶段的所有参与者都运行完成,如果传入的阶段参数与当前阶段不一致,则立即返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 不可中断
public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}

// 可中断
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase) {
QNode node = new QNode(this, phase, true, false, 0L);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
}
return p;
}
termination
1
2
3
4
5
6
// 取消阶段,线程执行各自代码,不在阻塞等待
public void forceTermination()
// 判断Phaser是否为销毁状态
public boolean isTerminated() {
return root.state < 0L;
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TestPhaser {

public static Phaser phaser = new Phaser(3);

public static void main(String[] args) {
Runner runner = new Runner();
for(int i=0;i<3;i++){
Thread thread = new Thread(runner);
thread.start();
}
}

public static void method(){
System.out.println(Thread.currentThread().getName()+"-A1 begin");
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"-A1 end");
}

static class Runner implements Runnable{

@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10000));
TestPhaser.method();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

交换器Exchanger

Exchanger主要作用是可以使2个线程之间方便的互相通信,使两个线程之间传输数据,比wait/notify更加方便

Exchanger类用于两个线程交换数据,允许两个线程之间定义同步点,当两个线程都到达同步点时,它们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一次线程中(只能同步两个线程),用于同步生产者和消费者的交换对象

源码

1
2
3
4
5
6
7
8
9
10
11
// 阻塞,被调用后等待其他线程来取得数据,如果没有其他线程取得数据,则一直阻塞等待
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class TestExchanger {

public static void main(String[] args) {
// 两个线程交换数据
Exchanger<String> exchanger = new Exchanger<>();
ThreadA threadA = new ThreadA(exchanger);
ThreadB threadB = new ThreadB(exchanger);
threadA.start();
threadB.start();
}

static class ThreadA extends Thread{
private Exchanger<String> exchanger;

public ThreadA(Exchanger<String> exchanger){
super();
this.exchanger = exchanger;
}

@Override
public void run() {
try{
System.out.println("线程A中得到的线程B的值为"+exchanger.exchange("【线程A的值】"));
} catch (InterruptedException e){
e.printStackTrace();
}
}
}

static class ThreadB extends Thread{
private Exchanger<String> exchanger;

public ThreadB(Exchanger<String> exchanger){
super();
this.exchanger = exchanger;
}

@Override
public void run() {
try{
System.out.println("线程B中得到的线程A的值为"+exchanger.exchange("【线程B的值】"));
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}

---
线程B中得到的线程A的值为【线程A的值】
线程A中得到的线程B的值为【线程B的值】

欢迎关注我的其它发布渠道