0%

AQS

AQS

但凡了解多线程的对于AQS应该都有所耳闻吧(我第一次知道AQS还是在一次面试中,那次被虐的老惨了),AQS即AbstractQueuedSynchronizer队列同步器,是一个抽象类,它是从java5开始的同步组件的基础框架,它仅仅只是定义了FIFO同步队列、同步状态的获取和释放方法,很多同步类都继承该类来实现同步逻辑,子类通过继承AQS并实现它的抽象方法来管理同步状态

是构建其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch等)

AQS的子类使用

源码解析

主要维护了一个volatile修饰的state同步状态,当线程调用lock方法时,如果state=0,说明该资源没有被占有,可以获得锁;如果state=1,说明该资源已被占用,需要加入同步队列等待。一个FIFO双向链表的同步队列,来完成线程获取锁的排队工作,线程获取锁失败后,会被添加至队尾。FIFO双向链表,通过head节点和tail节点来记录队首和队尾元素,主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,可重写的方法tryAcquire(int arg)、tryRelease(int arg)、tryAcquireShared(int arg)、tryReleaseShared(int arg)、isHeldExclusively()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态
// 头结点
private transient volatile Node head;

// 尾结点
private transient volatile Node tail;


// 线程同步的关键
// 同步状态信息state,可以通过getState、setState、compareAndSetState来获取或者修改值
// 对于不同的实现类,其用法不同
// 当state>0时表示已经获取了锁,当state = 0时表示释放了锁
private volatile int state;

// 获取当前同步状态
protected final int getState() {
return state;
}


protected final void setState(int newState) {
state = newState;
}

// 使用CAS设置当前状态,可以保证原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


static final long spinForTimeoutThreshold = 1000L;

// 获取锁 独占锁
public final void acquire(int arg) {
// tryAcquire需要具体子类去实现,保证线程安全的获取同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 同步状态获取失败,通过addWaiter将节点加入到同步队列尾部,acquireQueued以无限循环的方式获取同步状态
selfInterrupt();
}

// 以不可中断方式获取锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取给定节点的前驱节点,这里要么是头部节点head,要么是其他排队的节点
final Node p = node.predecessor();
// 如果node的前驱结点p是head,表示node是第二个节点,可以尝试去获取资源
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该节点
setHead(node);
p.next = null; // help GC
// 设置获取成功状态
failed = false;
return interrupted;
}
// 走到这里说明锁的状态不可以获取,判断是否可以挂起当前线程,如果判断为真,则可以挂起当前线程,否则继续循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 阻塞等待被唤醒
interrupted = true;
}
} finally {
// 如果获取失败则取消获取
if (failed)
cancelAcquire(node);
}
}

private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取当前尾结点
Node pred = tail;
if (pred != null) { // 尾结点不为空
// 当前节点的前驱节点指向尾结点
node.prev = pred;
// 设置当前节点为尾结点
if (compareAndSetTail(pred, node)) {
// 旧的尾结点的后继节点指向当前节点
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS操作失败,则自旋CAS插入
//(可能有多个线程并发加入队尾产生竞争)
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 第一次循环,尾结点为null,需要进行初始化
if (t == null) { // Must initialize
// 构建一个哨兵节点作为头节点
if (compareAndSetHead(new Node()))
// 尾结点同样指向哨兵节点
tail = head;
} else { // 尾结点不为空
// 将该节点的前驱节点指向当前的尾结点
node.prev = t;
// 设置该节点为尾结点
if (compareAndSetTail(t, node)) {
// 原本的尾结点的后继节点设置为当前节点
t.next = node;
return t;
}
}
}
}


public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}


public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

// 释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 获取head节点
Node h = head;
// 如果head节点不为空并且等待状态不为0则唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// 唤醒后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取给定节点的等待状态
int ws = node.waitStatus;
// 将等待状态更新为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取给定节点的后继节点
Node s = node.next;
// 后继节点为空或者等待状态为CANCELLED
if (s == null || s.waitStatus > 0) {
s = null;
// 从后向前遍历找到第一个不是CANCELLED状态的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒
if (s != null)
LockSupport.unpark(s.thread);
}

// 判断是否可以将当前节点挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的等待状态
int ws = pred.waitStatus;
// 如果等待状态为SIGNAL,表明前驱结点会唤醒当前节点,可以挂起
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 大于0的状态目前是CANCELLED
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 清理队列中已取消的前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 走到这里可能是CONDITION和PROPAGATE或者是0
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 尝试将前驱结点的等待状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}


}

Node节点

队列中的元素存储的是Node节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 标记线程是获取独占锁资源时被挂起后放入AQS队列的
static final Node EXCLUSIVE = null;


// 当前线程的等待状态
// SIGNAL 线程需要被唤醒 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
// CANCELLED 线程被取消了 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
// CONDITION 线程在条件队列里面等待 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取
// PROPAGATE 释放共享资源时需要通知其他节点 下一次共享式同步状态获取将会无条件的被传播下去
// 0
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 记录当前节点的前驱节点
volatile Node prev;

// 记录当前节点的后继节点
volatile Node next;


// 当前节点持有的线程
volatile Thread thread;


// 等待队列中的后继节点,如果当前节点是共享的,那么这个字段将是一个SHARED常量
Node nextWaiter;

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

ConditionObject

看到在AQS中还有一个ConditionObject类,其实现了Condition接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }

// Internal methods

/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 向后移一位
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

// public methods

/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
// 唤醒条件队列中的下一个节点
public final void signal() {
// 判断当前线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

/**
* Implements uninterruptible condition wait.
* <ol>
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/

/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;

/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程添加到条件队列尾部
Node node = addConditionWaiter();
// 在进入条件等待之前先完全释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 线程一直进行条件等待
while (!isOnSyncQueue(node)) {
// 进行条件等待的都在这里挂起,等待被唤醒
LockSupport.park(this);
// 当前线程被唤醒后检查是否被中断,如果被中断表示节点取消条件等待,移出条件队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

/**
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

/**
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

// support for instrumentation

/**
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}

/**
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}

/**
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}

/**
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}

LockSupport工具类

在AQS中多次使用到了LockSupport来进行操作,LockSupport是用来干什么的呢?LockSupport定义了一组公共静态方法,提供了最基本的线程阻塞和唤醒功能,也成为了构建同步组件的基础工具,主要作用是来挂起和唤醒线程,LockSupport的底层是使用Unsafe实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class LockSupport {
private LockSupport() {} // Cannot be instantiated.

private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

// 唤醒线程,如果thread之前因调用park()方法而被挂起,则调用unpark后,该线程会被唤醒;如果Thread之前没有调用park,则调用unpark后,再次调用park方法,会立即返回
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

// 当线程在没有持有许可的情况下调用park方法而被阻塞挂起,blocker对象会被记录到当前线程内部,此时可以通过getBlocker方法来获取blocker对象,推荐blocker对象设置为this,这样可以在打印线程堆栈时知道哪个类被阻塞了
// volatile Object parkBlocker; Thread类中有一个变量来专门存储该值
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 获取blocker对象
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}

// 有超时时间的阻塞
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}

// deadline的单位是ms,是一个时间戳,表示阻塞到什么时候
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}


// 调用park方法来获取许可,调用该方法的线程会被挂起阻塞,等待其他线程调用unpark(Thread thread)方法来唤醒该线程
public static void park() {
UNSAFE.park(false, 0L);
}

// 挂起指定时间
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}


public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}


static final int nextSecondarySeed() {
int r;
Thread t = Thread.currentThread();
if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
r ^= r << 13; // xorshift
r ^= r >>> 17;
r ^= r << 5;
}
else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
r = 1; // avoid zero
UNSAFE.putInt(t, SECONDARY, r);
return r;
}

// Hotspot implementation via intrinsics API
private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}

}

欢迎关注我的其它发布渠道