0%

ConcurrentLinkedQueue

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一种适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,由于ConcurrentLinkedQueue是一种非阻塞的队列,通常ConcurrentLinkedQueue性能好于BlockingQueue。是一种基于链表节点的无界线程安全队列。该队列的元素遵循先进先出FIFO的原则,该队列不允许为null

源码分析

内部的队列使用单向链表的方式实现,新元素会被插入队列末尾,出队时从队列头部获取一个元素,队列进行出队入队时对节点的操作是通过CAS实现的,保证线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 队首
private transient volatile Node<E> head;
// 队尾
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
// 默认头节点、尾结点是Node中为null的哨兵节点
// 初始时,head、tail 都指向同一个 item 为 null 的节点
head = tail = new Node<E>(null);
}

private static class Node<E> {
// 存放节点的值
volatile E item;
// 存放下一个节点
volatile Node<E> next;
// 内部的操作全部依赖于UNSAFE的CAS操作来实现原子性
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// 更改Node中的数据域item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 更改Node中的指针域next
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// 更改Node中的指针域next
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

// Unsafe mechanics

private static final sun.misc.Unsafe UNSAFE;
// 偏移量
private static final long itemOffset;
// 下一个元素的偏移量
private static final long nextOffset;
}

重要方法

add()和offer()都是加入元素的方法,add方法内部也是调用的offer方法

offer方法入队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public boolean offer(E e) {
// 检查是否为null,会抛出空指针
checkNotNull(e);
// 构造Node节点
final Node<E> newNode = new Node<E>(e);
// 从尾结点进行插入,循环直到成功为止
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 如果q为null,说明当前p是尾结点,尝试加入到队尾,如果加入失败,表示其他线程已经修改了p的指向
if (q == null) { // 初始时,head、tail 都指向同一个 item 为 null 的节点
// 使用CAS操作设置p节点的next节点,但是没有更新尾结点
// 如果有多线程操作,会导致第一次CAS操作失败,再次执行for循环
if (p.casNext(null, newNode)) { // CAS操作成功,新增节点被放入到链表中

// p!=t,表示有多线程操作,导致第一次cas操作没有成功,此次不是第一次cas操作,此时在进行设置尾结点
if (p != t) // hop two nodes at a time
// 设置当前尾结点为新插入的节点
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) // 多线程操作时,由于poll操作移除元素后可能会把head变成自引用(环形链表),此时head的next节点也是head,所以需要重新找到新的head
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.

p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 寻找尾结点,找到当前的尾结点之后,再次执行for循环
p = (p != t && t != (t = tail)) ? t : q;
}
}

poll()和peek()都是取头元素节点,前者会删除元素,后者不会删除元素

poll方法出队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E poll() {
restartFromHead:
for (;;) {
// 从头节点开始遍历
for (Node<E> h = head, p = h, q;;) {
// 保存当前节点
E item = p.item;
// 当前节点有值,并且使用CAS操作将当前节点变为null成功
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// CAS操作成功,则标记当前节点从链表中移除
// 只有多线程操作时,使得第一次p!=h时才会设置头节点为p
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 当前队列为空
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 多线程同时操作时才会出现该情况,当前节点自引用,需要重新寻找新的队列头节点
else if (p == q)
continue restartFromHead;
else // 多线程操作时,会导致第一次判断时item为null,且此时已经有了新插入的节点了,需要重新指定头节点
p = q;
}
}
}
peek方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 与poll方法类似,只是少了cas操作来清空头节点的值
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
remove方法

如果队列中存在该元素,则删除该元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
if (!o.equals(item)) {
next = succ(p);
continue;
}
// 使用cas操作来进行remove
removed = p.casItem(item, null);
}

next = succ(p);
// 如果有前驱节点,并且next不为空,则需要将这两个连接起来
if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}

head和tail的更新时机

tail 更新时机:tail 节点不总是尾节点,如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则只入队不更新尾节点。

head 更新时机:并不是每次出队时都更新 head 节点,当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点;只有当 head 节点里没有元素时,出队操作才会更新 head 节点。

head 和 tail 的更新总是间隔了一个,是为了减少CAS的更新操作,如果大量的入队操作,每次都要执行 CAS 进行 tail 的更新,汇总起来对性能也会是大大的损耗。如果能减少 CAS 更新的操作,无疑可以大大提升入队的操作效率

欢迎关注我的其它发布渠道