0%

同步方式

同步方式

除了使用wait()/nitify()来实现同步之外,在java.util.concurrent包中还有几种进行同步的方式,如信号量、屏障、闭锁和交换器。

信号量

Semaphore类内部继承AQS(AbstractQueuedSynchronizer)实现,提供了允许的最大数量,以及公平策略,默认是非公平的

1
2
3
4
5
6
7
8

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以使用信号量来进行限流,使用acquire()来获得许可,获得到许可之后,信号量的允许数量就减少一个,当全部被占用之后,就会阻塞,使用release()方法来释放

如果不想在获取的时候进行阻塞,可以使用tryAcquire()方法来尝试获取许可

1
2
3
4
5
6
7
8
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

可以将信号量看做是一个有界的阻塞容器

屏障

CyclicBarrier类是借助ReentrantLock和Condition来实现的,提供了屏障的实现。屏障可以看做是一个篱栅,必须等待达到一定数量的人来推开这个篱栅(只有当线程数量足够时才可以继续进行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

// 需要达到的数量
public CyclicBarrier(int parties) {
this(parties, null);
}
// 达到该数量时,执行该Runnable
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

闭锁

CountDownLatch类内部继承AQS(AbstractQueuedSynchronizer)实现

在完成某些运算时,只有其他所有线程全部完成运算时,当前运算才会继续,但是当闭锁打开之后就只能保持打开的方式,不可以再次闭合,用来确保某个运算在其他运算之后

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TestCountDownLatch {

public static void main(String[] args) {
// 声明数量5
CountDownLatch latch = new CountDownLatch(5);
LatchDemo latchDemo = new LatchDemo(latch);

long start = System.currentTimeMillis();

for (int i = 0;i<5;i++){
new Thread(latchDemo).start();
}

try {
// 阻塞一直到减至0时
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("累计耗时"+(System.currentTimeMillis() - start));

}
}

class LatchDemo implements Runnable{

private CountDownLatch latch;

public LatchDemo(CountDownLatch latch){
this.latch = latch;
}

@Override
public void run() {
try{
for(int i = 0;i<10;i++){
System.out.println(i);
}
} finally {
// 减一表示完成
latch.countDown();
}
}
}

移相器

Phaser类

交换器

Exchanger类用于两个线程交换数据