0%

同步方式

同步方式

除了使用wait()/nitify()来实现同步之外,在java.util.concurrent包中还有几种进行同步的方式,如信号量、屏障、闭锁和交换器。

信号量Semaphore

Semaphore类内部继承AQS(AbstractQueuedSynchronizer)实现,提供了允许的最大数量,以及公平策略,默认是非公平的,可以控制某个资源被同时访问的任务数

信号量就是一个计数器,用来保护一个或多个共享资源的访问,如果线程要访问一个共享资源,必须先获得信号量,信号量内部计数器大于0,则计数器减一,允许访问这个共享资源,一旦达到0,就会对新线程进行阻塞;当线程用完某个共享资源时,信号量必须释放,计数器加一,以是其他线程能够访问共享资源

1
2
3
4
5
6
7
8

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以使用信号量来进行限流,使用acquire()来获得许可,获得到许可之后,信号量的允许数量就减少一个,当全部被占用之后,就会阻塞,使用release()方法来释放

如果不想在获取的时候进行阻塞,可以使用tryAcquire()方法来尝试获取许可

1
2
3
4
5
6
7
8
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

可以将信号量看做是一个有界的阻塞容器

屏障CyclicBarrier

CyclicBarrier类是借助ReentrantLock和Condition来实现的,提供了屏障的实现,允许多个线程在某个集合点处进行相互等待。屏障可以看做是一个篱栅,必须等待达到一定数量的人来推开这个篱栅(只有当线程数量足够时才可以继续进行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

// 需要达到的数量
public CyclicBarrier(int parties) {
this(parties, null);
}
// 达到该数量时,执行该Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

当一个线程到达指定的点后,调用await()方法等待其他线程直到其他所有线程到达,当最后一个线程调用await()方法时,CyclicBarrier对象将唤醒所有在等待的线程,继续向后执行

闭锁CountDownLatch

CountDownLatch类内部继承AQS(AbstractQueuedSynchronizer)实现,在完成一组正在其他线程中执行的操作之前,允许线程一直等待

在完成某些运算时,只有其他所有线程全部完成运算时,当前运算才会继续,但是当闭锁打开之后就只能保持打开的方式,不可以再次闭合,用来确保某个运算在其他运算之后

就像是一扇大门,门打开之前所有的线程都被阻断,一旦大门打开,所有的线程都将通过,且不能再次闭合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// CountDownLatch中持有的是共享锁
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
// CountDownLatch中的队列同步器
private final Sync sync;

// 这个数就是线程需要等待完成的操作数目
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

// 需要等待其他事件在完成时调用,获取共享锁
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// 每个被等待的事件在完成时调用,释放共享锁
public void countDown() {
sync.releaseShared(1);
}


public long getCount() {
return sync.getCount();
}


}

// AbstractQueuedSynchronizer中的acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// (getState() == 0) ? 1 : -1 state初始值为所传入的count,所以在countDown调用之前都是大于0的
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// AbstractQueuedSynchronizer中的doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// Node.SHARED 为共享锁模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 检查当前节点的前驱结点是不是头节点
if (p == head) {
// 尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头节点并且唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 检查线程是否被阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestCountDownLatch {

public static void main(String[] args) {
// CountDownLatch内部存在一个计数器,声明数量5
// countDown方法对计数器进行减一操作,await方法会一直阻塞到计数器为0
CountDownLatch latch = new CountDownLatch(5);
LatchDemo latchDemo = new LatchDemo(latch);

long start = System.currentTimeMillis();

for (int i = 0;i<5;i++){
new Thread(latchDemo).start();
}

try {
// 阻塞一直到减至0时
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("累计耗时"+(System.currentTimeMillis() - start));

}
}

class LatchDemo implements Runnable{

private CountDownLatch latch;

public LatchDemo(CountDownLatch latch){
this.latch = latch;
}

@Override
public void run() {
try{
for(int i = 0;i<10;i++){
System.out.println(i);
}
} finally {
// 减一表示完成
latch.countDown();
}
}
}

当一个线程要等待某些操作先执行完时,需要调用await()方法,这个方法让线程进入休眠直到等待的所有操作都完成,当某一个操作完成后,调用countDown()方法将CountDownLatch类内部计数器减1,当计数器变成0时,CountDownLatch类将唤醒所有调用await()方法而进入休眠的线程

移相器Phaser

Phaser类,把并发任务分成多个阶段运行,在下一阶段之前,当前阶段中的所有线程都必须执行完成

适用于当我们有并发任务需要分解成几步执行,Phaser类机制是在每一步结束的位置对线程进行同步,当所有线程都完成这一步,才允许执行下一步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 通知phaser对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段,该方法不会与其他线程同步 【慎用】
public int arrive() {
return doArrive(ONE_ARRIVAL);
}


// 如果传入阶段参数与当前阶段一致,这个方法会将当前线程置于休眠,直到这个阶段的所有参与者都运行完成,如果传入的阶段参数与当前阶段不一致,则立即返回
public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}

// 将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
public int register() {
return doRegister(1);
}


// 将指定数目的参与者注册到Phaser中,所有这些参与者都将被当成没有执行完本阶段的线程
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
return doRegister(parties);
}


// 减少注册者的数目,通知phaser对象对应的线程已经完成了当前阶段,并且他不会参与到下一阶段的操作中
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
}

// 在phaser阶段改变时会被自动执行,第一个参数是当前阶段数,第二个参数是注册参与者数量,如果返回false则表示phaser再继续执行,返回true表示phaser已经完成执行并进入终止态
// 可以重写该方法来自定义控制phaser的阶段改变
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}

一个Phaser对象有两种状态

  • 活跃态Active : 当存在参与同步的线程时,Phaser就是活跃的,并且在每个阶段结束的时候进行同步,在这种状态下
  • 终止态Termination:当所有参与同步的线程都取消注册时,Phaser就处于终止状态,在这种状态下,Phaser没有任何参与者。当Phaser对象的onAdvance()方法返回true时,Phaser对象就处于终止状态了,当Phaser是终止态时,同步方法arriveAndAwaitAdvance()方法会立即返回,而不会做任何同步操作

交换器Exchanger

Exchanger类用于两个线程交换数据,允许两个线程之间定义同步点,当两个线程都到达同步点时,它们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,同时第二个线程的数据结构进入到第一次线程中(只能同步两个线程),用于同步生产者和消费者的交换对象

1
2
3
4
5
6
7
8
9
10
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}