0%

Lock

Lock

由于synchronized在很多情况下是不可控的,所以在jdk5出现了一个新的加锁方式Lock,提供了无条件的,可轮询的,可定时的,可中断的锁获取操作,所有加锁和解锁都是显式的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Lock {
// 获取锁,会阻塞
void lock();
// 如果当前线程未被中断,则获取锁
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,如果锁可用,则获取锁,返回true;如果锁不可用,则返回false
boolean tryLock();
// 尝试获取锁,设置超时时间,如果该时间内没有获取到锁,则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 将Condition绑定到该Lock实例上
Condition newCondition();
}

主要有三个实现类ReentrantLock重入锁、ReentrantReadWriteLock.ReadLock读锁、ReentrantReadWriteLock.WriteLock写锁

ReentrantLock重入锁

ReentrantLock为Lock的实现类,实现了标准的互斥锁,底层使用AQS实现线程同步,使用CAS,比synchronized效率略高,一次最多只有一个线程持有ReentrantLock —> 有公平锁和非公平锁,是可重入锁,每次重入会把拥有数++

1
2
3
4
5
6
7
8
9
10
11
private transient volatile Node head;

private transient volatile Node tail;

// Node存储等待的线程 本质是一个双向链表
static final class Node{
// 节点等待状态
volatile int waitStatus;
volatile Node prev;
volatile Node next;
}

公平锁和非公平锁

内部使用的是AQS,继承自AbstractQueuedSynchronizer来实现自身的功能,公平锁FairSync和非公平锁NonfairSync继承自Sync分别实现了获取锁的公平与非公平策略,公平与非公平的区别在于获取锁的时候是否按照FIFO的顺序来

1
2
// 继承自AbstractQueuedSynchronizer的state变量状态值表示的是线程获取该锁的可重入次数,为0则表示当前锁没有被任何线程持有;线程持有之后会变成1,并记录该锁的持有者为当前线程,在没有释放锁的情况下第二次获取该锁,状态值就会被设置为2;然后释放锁时,会进行减1操作,直到状态值为0,则当前线程释放锁
abstract static class Sync extends AbstractQueuedSynchronizer

ReentrantLock默认是非公平锁

1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁

java.util.concurrent.locks.ReentrantLock.FairSync 公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。

  • 优点是等待锁的线程不会饿死。

  • 缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}
// AQS中的方法 队列中当前节点是否有前驱结点
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t表示头节点不等于尾结点,如果h==t表示当前队列为空
// h != t && s == null,则说明有一个元素将要作为AQS的第一个节点入队
// h != t && s != null && s.thread != Thread.currentThread() 说明当前队列里的第一个元素不是当前线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 会尝试再次通过CAS获取一次锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// getState 为0,表示没有线程持有锁
if (c == 0) {
// 公平策略的体现
// 看当前线程是否有前驱节点(即是否在队列头),如果在队头则尝试更改status是否成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置当前线程为占有线程
setExclusiveOwnerThread(current);
return true;
}
}
// getState 不为0,表示有线程持有锁,判断是否为当前线程占有
else if (current == getExclusiveOwnerThread()) {
// 将进入次数加一,由于是可重入锁,所以进入一次就加一,出去一次就减一
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

// AbstractQueuedSynchronizer中的acquire方法
public final void acquire(int arg) {
// tryAcquire会尝试再次通过CAS获取一次锁
// tryAcquire为true则成功获取到锁
// tryAcquire为false,则执行acquireQueued入队
if (!tryAcquire(arg) &&
// Node.EXCLUSIVE表示独占锁模式 ReentrantLock是独占锁
// acquireQueued 通过自旋,判断当前队列节点是否可以获取锁
// addWaiter 将当前线程加入上面锁的双向链表(等待队列)中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 尾结点不为null,说明队列已经进行初始化了
if (pred != null) {
node.prev = pred;
// 将当前节点加入到链表的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 初始化队列,并且把node节点追加到队列尾部
enq(node);
return node;
}
非公平锁

java.util.concurrent.locks.ReentrantLock.NonfairSync 非公平锁

非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。

  • 优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU不必唤醒所有线程

  • 缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 非公平锁先尝试获取锁,获取到锁就直接占有
// 尝试将state从0变成1
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 所已经被占用,没有获取到锁,执行AQS的acquire方法,去同步队列中排队
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

// Sync的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// getState 为0,表示没有线程持有锁
if (c == 0) {
// 直接争抢锁资源
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// state不为0,说明此时有线程持有该锁,如果当前线程是锁的持有者
else if (current == getExclusiveOwnerThread()) {
// 将状态值加1
int nextc = c + acquires;
if (nextc < 0) // overflow 可重入次数溢出了
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 已经有线程持有该锁了,且不是当前线程,没有获取到锁资源,就放入到AQS阻塞队列中排队
return false;
}

state状态 0.未占用 1.已占用 大于1 重入 使用CAS来修改state状态

使用多条件Condition

使用Lock可以一个锁关联一个或多个条件,这些条件通过Condition接口声明,该接口中提供了挂起线程和唤醒线程的机制

Condition其实就对应于之前synchronized中的对象监视器

synchronized(obj){

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface Condition {

// 进入等待状态
void await() throws InterruptedException;


void awaitUninterruptibly();


long awaitNanos(long nanosTimeout) throws InterruptedException;


boolean await(long time, TimeUnit unit) throws InterruptedException;


boolean awaitUntil(Date deadline) throws InterruptedException;


void signal();


void signalAll();
}

其内部维护了一个FIFO队列,在队列中每个节点都包含了一个线程引用,该线程就是在该Condition对象上等待的线程

1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

调用await方法时会创建一个新的Node,将Node加入到队列的队尾

使用方式

1
2
3
private volatile ReentrantLock lock = new ReentrantLock();
private volatile Condition consumerCondition = lock.newCondition();
private volatile Condition producerCondition = lock.newCondition();

此时就维护了两个不同的条件队列consumerCondition和producerCondition,会进入到不同的条件队列中进行排队阻塞

ReadWriteLock读写锁

ReentrantLock是重入排他锁,在一个时刻仅有一个线程可以进行访问,但是在大多数场景下,大部分时间都是进行读操作,写操作占用时间较少,而且读操作不存在数据竞争问题,所以提供了读写锁

ReadWriteLock读写锁暴露了两个Lock,一个用来读,一个用来写。加锁策略是允许多个读者同时读,但是只能有一个写者,多个读锁不互斥,读锁与写锁互斥,写锁与写锁互斥,采用了读写分离的策略

1
2
3
4
5
6
public interface ReadWriteLock {
// 读锁是共享锁
Lock readLock();
// 写锁是独占锁
Lock writeLock();
}

ReentrantReadWriteLock为ReadWriteLock的实现类,同样支持公平和非公平

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ReentrantReadWriteLock
// 返回当前读锁被获取的次数
public int getReadLockCount() {
return sync.getReadLockCount();
}
// 返回当前线程获取读锁的次数
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
// 返回当前写锁被获取的次数
public int getWriteHoldCount() {
return sync.getWriteHoldCount();
}
// 写锁是否被获取
public boolean isWriteLocked() {
return sync.isWriteLocked();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;

/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
// AQS中的只有一个state状态,但是ReentrantReadWriteLock中需要维护读写两种状态,所以将state进行分割
// 高16位代表读状态,用来表示获取读锁的次数
// 低16位代表获取写锁的线程的重入次数
static final int SHARED_SHIFT = 16;
// 共享锁(读锁)状态单位值65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 共享锁(读锁)线程最大个数65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 排他锁掩码,二进制,15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
// 读锁线程数 右移16位
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
// 写锁重入数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 存放除去第一个获取读锁线程的其他线程获取读锁的重入次数
private transient ThreadLocalHoldCounter readHolds;
// 记录最后一个获取读锁的线程的重入次数
private transient HoldCounter cachedHoldCounter;
// 记录第一个获取到读锁的线程
private transient Thread firstReader = null;
// 记录第一个获取到读锁的线程获取读锁的重入次数
private transient int firstReaderHoldCount;

static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
}

其中包含有读锁ReadLock和写锁WriteLock

读锁ReadLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取读锁,如果当前没有线程持有写锁,则可以获取读锁
public void lock() {
sync.acquireShared(1);
}
// 可中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


public boolean tryLock() {
return sync.tryReadLock();
}


public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}


public void unlock() {
sync.releaseShared(1);
}


}

Sync中读锁的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 获取读锁
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
int c = getState();
// 写状态不等于0并且锁的持有者不是当前线程
// 已经有其他线程获取到写锁了,此时无法获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁的数量
int r = sharedCount(c);
// 尝试获取锁,compareAndSetState多个读线程同时操作时,只有一个线程会成功,如果失败,会执行fullTryAcquireShared方法进行重试
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁数量为0,说明该线程是第一个获取到读锁的
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果当前线程是第一个获取到读锁的线程,将数量+1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// cachedHoldCounter记录最后一个获取读锁的线程以及该线程获取读锁的重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
//readHolds记录了当前线程获取读锁的可重入数
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

// 释放读锁
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 循环读取计数,直到CAS更新成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

WriteLock写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

// 写锁是独占锁,只有一个线程可以获取该锁,如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁
// 如果当前已有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞
// 且写锁是可重入锁,如果当前线程已经获取了该锁,再次获取只是简单的把重入次数+1
public void lock() {
// 会去执行Sync中重写的tryAcquire方法
sync.acquire(1);
}

// 会对中断进行响应
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

// 尝试获得写锁
public boolean tryLock( ) {
return sync.tryWriteLock();
}

// 超时重新激活
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}


public void unlock() {
// 会调用tryRelease
sync.release(1);
}


public Condition newCondition() {
return sync.newCondition();
}


public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}


public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

Sync类中对于写锁的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 获取写锁
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
// 在state中会使用高16位表示读锁状态(读锁个数),低16位表示写锁状态(写锁个数)
// 当前锁的个数
int c = getState();
// 写锁的个数
int w = exclusiveCount(c);
// 当前有线程持有锁(可能是读锁,也可能是写锁)
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// w==0 写锁数量为0,说明已经有线程获取了读锁
// w!=0 && current != getExclusiveOwnerThread() 说明当前线程不是写锁的持有者
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 说明当前线程获得了写锁,但是重入次数大于最大值会抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire 走到这里说明是当前线程重入
setState(c + acquires);
return true;
}
// 此时持有锁的数量是0,可以正常获取写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

// 释放写锁
protected final boolean tryRelease(int releases) {
// 是否为写锁的拥有者调用的
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取重入值,这里不需要考虑高位,因为写锁是独占锁,此时读锁的数量肯定是0
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 如果写锁的数量为0,则释放锁,将持有线程置为null
if (free)
setExclusiveOwnerThread(null);
// 设置state
setState(nextc);
return free;
}

与synchronized比较

  • Lock使用代码实现CAS,为API层面上的互斥锁;synchronized通过jvm的monitor实现的,为原语层面的互斥锁
  • 使用ReentrantLock可以设置尝试获取锁的等待时间,超过该时间则中断等待
  • Lock可以实现公平锁和非公平锁,synchronized是非公平锁,可能会造成线程饥饿现象
  • ReentrantLock可以绑定多个条件Condition对象,对线程的等待、唤醒操作更加详细灵活
  • ReentrantLock对于锁的可操作性强;而synchronized的操作都是JVM隐式操作的
  • Lock可以实现独占模式和共享模式,如读写锁,而synchronize只提供独占模式

锁的分类

以是否可重入分类

  • 可重入锁 synchronized、ReetrantLock
  • 不可重入锁

以是否公平分类

  • 公平锁 秉持着先来后到的原则
  • 非公平锁

以并发度来进行分类

  • 读写锁 同一时刻允许多个线程访问,ReentrantReadWriteLock
  • 排他锁 同一时刻只允许一个线程进行访问,synchronized、ReentrantLock

欢迎关注我的其它发布渠道