0%

AQS

AQS

但凡了解多线程的对于AQS应该都有所耳闻吧(我第一次知道AQS还是在一次面试中,那次被虐的老惨了),AQS即AbstractQueuedSynchronizer队列同步器,是一个抽象类,它是从java5开始的同步组件的基础,它仅仅只是定义了同步状态的获取和释放方法,很多同步类都继承该类来实现同步逻辑。

AQS的子类使用

源码解析

主要维护了一个state和一个队列,底层是一个FIFO的双向队列,通过head节点和tail节点来记录队首和队尾元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

// 头节点
private transient volatile Node head;

// 尾结点
private transient volatile Node tail;


// 线程同步的关键
// 同步状态信息state,可以通过getState、setState、compareAndSetState来获取或者修改值
// 对于不同的实现类,其用法不同
private volatile int state;

// 获取当前同步状态
protected final int getState() {
return state;
}


protected final void setState(int newState) {
state = newState;
}

// 使用CAS设置当前状态,可以保证原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


static final long spinForTimeoutThreshold = 1000L;

// 获取锁 独占锁
public final void acquire(int arg) {
// tryAcquire需要具体子类去实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}


public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

// 释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}


}

Node节点

队列中的元素存储的是Node节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 标记该线程是获取资源时被阻塞挂起后放入AQS队列的
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 标记线程是获取独占锁资源时被挂起后放入AQS队列的
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;


// 当前线程的等待状态
// SIGNAL 线程需要被唤醒
// CANCELLED 线程被取消了
// CONDITION 线程在条件队列里面等待
// PROPAGATE 释放共享资源时需要通知其他节点
// 0
volatile int waitStatus;

// 记录当前节点的前驱节点
volatile Node prev;

// 记录当前节点的后继节点
volatile Node next;


// 存放进入AQS的线程
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
}

LockSupport工具类

在AQS中多次使用到了LockSupport来进行操作,LockSupport是用来干什么的呢?主要作用是来挂起和唤醒线程,LockSupport的底层是使用Unsafe实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class LockSupport {
private LockSupport() {} // Cannot be instantiated.

private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

// 唤醒线程,如果thread之前因调用park()方法而被挂起,则调用unpark后,该线程会被唤醒;如果Thread之前没有调用park,则调用unpark后,再次调用park方法,会立即返回
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

// 当线程在没有持有许可的情况下调用park方法而被阻塞挂起,blocker对象会被记录到当前线程内部,此时可以通过getBlocker方法来获取blocker对象,推荐blocker对象设置为this,这样可以在打印线程堆栈时知道哪个类被阻塞了
// volatile Object parkBlocker; Thread类中有一个变量来专门存储该值
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
// 获取blocker对象
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}

//
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, nanos);
setBlocker(t, null);
}
}

// deadline的单位是ms,是一个时间戳,表示阻塞到什么时候
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(true, deadline);
setBlocker(t, null);
}





// 调用park方法来获取许可,调用该方法的线程会被挂起阻塞,等待其他线程调用unpark(Thread thread)方法来唤醒该线程
public static void park() {
UNSAFE.park(false, 0L);
}

// 挂起指定时间
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}


public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}


static final int nextSecondarySeed() {
int r;
Thread t = Thread.currentThread();
if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
r ^= r << 13; // xorshift
r ^= r >>> 17;
r ^= r << 5;
}
else if ((r = java.util.concurrent.ThreadLocalRandom.current().nextInt()) == 0)
r = 1; // avoid zero
UNSAFE.putInt(t, SECONDARY, r);
return r;
}

// Hotspot implementation via intrinsics API
private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
parkBlockerOffset = UNSAFE.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
SEED = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSeed"));
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
SECONDARY = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception ex) { throw new Error(ex); }
}

}