0%

AQS

AQS

但凡了解多线程的对于AQS应该都有所耳闻吧(我第一次知道AQS还是在一次面试中,那次被虐的老惨了),AQS即AbstractQueuedSynchronizer队列同步器,是一个抽象类,它是从java5开始的同步组件的基础框架,它仅仅只是定义了同步状态的获取和释放方法,很多同步类都继承该类来实现同步逻辑。

AQS的子类使用

源码解析

主要维护了一个state同步状态和一个FIFO队列,底层是一个FIFO的双向队列,通过head节点和tail节点来记录队首和队尾元素,主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,可重写的方法tryAcquire(int arg)、tryRelease(int arg)、tryAcquireShared(int arg)、tryReleaseShared(int arg)、isHeldExclusively()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

// 头节点
private transient volatile Node head;

// 尾结点
private transient volatile Node tail;


// 线程同步的关键
// 同步状态信息state,可以通过getState、setState、compareAndSetState来获取或者修改值
// 对于不同的实现类,其用法不同
private volatile int state;

// 获取当前同步状态
protected final int getState() {
return state;
}


protected final void setState(int newState) {
state = newState;
}

// 使用CAS设置当前状态,可以保证原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


static final long spinForTimeoutThreshold = 1000L;

// 获取锁 独占锁
public final void acquire(int arg) {
// tryAcquire需要具体子类去实现,保证线程安全的获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 同步状态获取失败,通过addWaiter将节点加入到同步队列尾部,acquireQueued以无限循环的方式获取同步状态
selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个节点,可以尝试去获取资源
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果可以休息了,就进入waiting状态,直接被unpark
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 将Node插入到队列中
Node pred = tail;
if (pred != null) { // 尾结点不为空
// 新加入节点的前驱节点指向尾结点
node.prev = pred;
// 设置当前节点为尾结点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS操作失败,则自旋CAS插入
//(可能有多个线程并发加入队尾产生竞争)
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 第一次循环,尾结点为null,需要进行初始化
if (t == null) { // Must initialize
// 构建一个哨兵节点作为头节点
if (compareAndSetHead(new Node()))
// 尾结点同样指向哨兵节点
tail = head;
} else { // 尾结点不为空
// 将节点的前驱节点指向尾结点
node.prev = t;
// 设置该节点为尾结点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}


public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

// 释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}


}

Node节点

队列中的元素存储的是Node节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 标记该线程是获取资源时被阻塞挂起后放入AQS队列的
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 标记线程是获取独占锁资源时被挂起后放入AQS队列的
static final Node EXCLUSIVE = null;


// 当前线程的等待状态
// SIGNAL 线程需要被唤醒 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
// CANCELLED 线程被取消了 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
// CONDITION 线程在条件队列里面等待 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取
// PROPAGATE 释放共享资源时需要通知其他节点 下一次共享式同步状态获取将会无条件的被传播下去
// 0
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 记录当前节点的前驱节点
volatile Node prev;

// 记录当前节点的后继节点
volatile Node next;


// 节点对应的线程
volatile Thread thread;


// 等待队列中的后继节点,如果当前节点是共享的,那么这个字段将是一个SHARED常量
Node nextWaiter;

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

LockSupport工具类

在AQS中多次使用到了LockSupport来进行操作,LockSupport是用来干什么的呢?LockSupport定义了一组公共静态方法,提供了最基本的线程阻塞和唤醒功能,也成为了构建同步组件的基础工具,主要作用是来挂起和唤醒线程,LockSupport的底层是使用Unsafe实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class LockSupport {
private LockSupport() {} // Cannot be instantiated.

private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

// 唤醒线程,如果thread之前因调用park()方法而被挂起,则调用unpark后,该线程会被唤醒;如果Thread之前没有调用park,则调用unpark后,再次调用park方法,会立即返回
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

// 当线程在没有持有许可的情况下调用park方法而被阻塞挂起,blocker对象会被记录到当前线程内部,此时可以通过getBlocker方法来获取blocker对象,推荐blocker对象设置为this,这样可以在打印线程堆栈时知道哪个类被阻塞了
// volatile Object parkBlocker; Thread类中有一个变量来专门存储该值
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 获取blocker对象
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}

// 有超时时间的阻塞
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}

// deadline的单位是ms,是一个时间戳,表示阻塞到什么时候
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}


// 调用park方法来获取许可,调用该方法的线程会被挂起阻塞,等待其他线程调用unpark(Thread thread)方法来唤醒该线程
public static void park() {
UNSAFE.park(false, 0L);
}

// 挂起指定时间
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}


public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}


static final int nextSecondarySeed() {
int r;
Thread t = Thread.currentThread();
if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
r ^= r << 13; // xorshift
r ^= r >>> 17;
r ^= r << 5;
}
else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
r = 1; // avoid zero
UNSAFE.putInt(t, SECONDARY, r);
return r;
}

// Hotspot implementation via intrinsics API
private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}

}

欢迎关注我的其它发布渠道