0%

BlockingQueue阻塞队列

BlockingQueue阻塞队列

BlockingQueue接口是在jdk5版本提供的,在线程池中用到了阻塞队列来实现,阻塞队列是深入学习线程池的基础,该队列通常是有限的容量,如果队列已满添加操作就会阻塞,如果队列为空,移除操作就会阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public interface BlockingQueue<E> extends Queue<E> {
// add/offer/put 插入数据,插入到队列尾部
// add添加元素,如果队列已满,直接抛出异常IllegalArgumentException
boolean add(E e);
// 添加元素,如果队列已满,返回false
boolean offer(E e);
// 添加元素,如果队列已满,阻塞
void put(E e) throws InterruptedException;
// 超时
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;


// take/poll/remove移除数据,移除队列头部
// 移除元素并返回头元素,如果队列为空,阻塞
E take() throws InterruptedException;
// 移除元素并返回头元素,如果队列为空,则返回null
void put(E e) throws InterruptedException;
// 超时
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 移除元素并返回头元素,如果队列为空,抛出NoSuchElementException异常
boolean remove(Object o);

int remainingCapacity();

public boolean contains(Object o);

int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);
}

在使用阻塞队列时最好使用put()、take()以及可定时的offer()和poll(),而不要使用Queue接口中的方法,否则就丢失了阻塞的效果

BlockingQueue实现类

BlockingQueue接口有多个实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue等

阻塞队列

ArrayBlockingQueue

ArrayBlockingQueue类底层是由数组支持的有界阻塞队列,创建时需要指定容量,实现了FIFO(先进先出)排序机制。新添加的元素都在队列的尾部,获取操作是从队列头部进行,可以设置公平策略,默认是非公平的

源码分析

内部没有实现读写分离,生产和消费不能完全并行,长度需要定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 存储队列元素
final Object[] items;

/** items index for next take, poll, peek or remove */
// 出队的数组下标(下一个待取出的元素索引)
int takeIndex;

/** items index for next put, offer, or add */
// 入队的数组下标(下一个待添加的元素索引)
int putIndex;

// 队列中的元素数量
int count;
// 锁
final ReentrantLock lock;

/** Condition for waiting takes */
// 出队的条件变量(消费者监视器)
private final Condition notEmpty;

/** Condition for waiting puts */
// 入队的条件变量(生产者监视器)
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 默认是非公平锁
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
重要方法

offer方法

offer方法向队列尾部添加元素,如果队列未满,则添加成功,返回true;如果队列已满则丢弃当前元素,返回false。该方法是不阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean offer(E e) {
// 如果元素为null,抛出EPN,队列中不可存储null值
checkNotNull(e);
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列已满,直接返回false
if (count == items.length)
return false;
else {
// 添加元素
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 计算下一个元素存放的下标位置
if (++putIndex == items.length)
putIndex = 0;
count++;
// 因为添加了一个元素,所以唤醒因为没有元素而被阻塞的take方法的一个线程
notEmpty.signal();
}

put方法

put方法向队列尾部添加元素,如果队列未满,则添加成功,返回true;如果队列已满则阻塞当前线程直到队列有空闲为止。该方法是阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void put(E e) throws InterruptedException {
// 如果元素为null,抛出EPN,队列中不可存储null值
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获取可中断锁
lock.lockInterruptibly();
try {
// 队列已满,阻塞
while (count == items.length)
// 需要等待notFull.notify的唤醒
notFull.await();
// 如果没有满,添加元素进入队列
enqueue(e);
} finally { // 释放锁
lock.unlock();
}
}
  • 所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁
  • 判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull条件变量,同时释放lock锁,等待被消费者线程唤醒
  • 如果没有满,则调用enqueue方法将元素put进阻塞队列
  • 唤醒notEmpty条件变量

poll方法

从队列头获取并移除元素,如果队列为空,则直接返回null,该方法是不阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列为空,则直接返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// 会移除元素
items[takeIndex] = null;
// 队列头指针计算
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 因为移除了一个元素,所以唤醒因为队列已满而被阻塞的put方法的一个线程
notFull.signal();
return x;
}

take方法

从队列头获取并移除元素,如果队列为空,则阻塞当前线程直到不为空返回元素,该方法是阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取可中断的锁
lock.lockInterruptibly();
try {
// 队列为空,阻塞
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
  • 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁
  • 判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并notEmpty.await条件变量,同时释放locak锁,等待被生产者线程唤醒
  • 如果没有空,则调用dequeue方法
  • 唤醒notFull.notify条件变量

peek方法

与poll方法类似,从队列头获取元素(不会移除元素),如果队列为空,则直接返回null,该方法是不阻塞的

1
2
3
4
5
6
7
8
9
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
使用案例

logback异步打印日志使用的ArrayBlockQueue

LinkedBlockingQueue

LinkedBlockingQueue类是底层为单向链表的有界阻塞队列,默认容量为Integer.MAX_VALUE,使用先进先出FIFO,线程池中newFixedThreadPool线程池就是使用了该队列

源码分析

可以很好的处理并发数据,其内部实现采用分离锁(读写分离两个锁),可以实现生产和消费操作的完全并行运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 头节点
transient Node<E> head;
//尾节点
private transient Node<E> last;
// 记录队列的个数
private final AtomicInteger count = new AtomicInteger();
// take、poll方法时获取该锁,控制元素出队,同时只有一个线程可以从队列头部获取元素,其他线程阻塞
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
// 条件变量,存放出栈被阻塞的线程
// 当队列为空时,执行出队操作的线程会被放入这个条件队列中阻塞
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
// put、offer方法时获取该锁,控制元素入队,同时只有一个线程可以在队尾添加元素,其他线程阻塞
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
// 条件变量,存放入栈被阻塞的线程
// 当队列满时,执行入队操作的线程会被放入这个条件队列中阻塞
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
重要方法

offer方法

向尾部插入元素,成功插入返回true,队列已满则丢弃当前元素,返回false,该方法是不阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public boolean offer(E e) {
// 如果元素为null,则抛出NPE,队列中不可存储null值
if (e == null) throw new NullPointerException();
// 获取当前队列元素数量
final AtomicInteger count = this.count;
// 如果已满,则直接返回false
if (count.get() == capacity)
return false;

int c = -1;
// 构造新节点
Node<E> node = new Node<E>(e);
// 获取putLock独占锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 队列不满,则进队
if (count.get() < capacity) {
// 进队
enqueue(node);
// 队列元素数量+1
c = count.getAndIncrement();
if (c + 1 < capacity) // 新元素入队后,队列依然没满,则唤醒notFull条件队列中的线程,通知可以进行入队操作了
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
if (c == 0) // 说明该元素入队之前,队列为空,此时加入了一个元素,应该通知notEmpty条件队列中的线程,可以进行出队操作了
signalNotEmpty();
return c >= 0;
}

put方法

put方法与offer方法类似,只是如果队列已满,则阻塞当前线程,知道队列有空闲才会插入成功返回,该方法是阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void put(E e) throws InterruptedException {
// 如果元素为null,则抛出NPE,队列中不可存储null值
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 构建新节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 获取当前队列元素数量
final AtomicInteger count = this.count;
// 获取可中断的putLock锁
putLock.lockInterruptibly();
try {
// 如果队列已满,则进行等待,使用while循环防止被虚假唤醒
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 队列中元素数量+1
c = count.getAndIncrement();
if (c + 1 < capacity)// 新元素入队后,队列依然没满,则唤醒notFull条件队列中的线程,通知可以进行入队操作了
notFull.signal();
} finally { // 释放锁
putLock.unlock();
}
if (c == 0)// 说明该元素入队之前,队列为空,此时加入了一个元素,应该通知notEmpty条件队列中的线程,可以进行出队操作了
signalNotEmpty();
}

poll方法

从队列头部获取元素,并移除该元素,如果队列为空,则直接返回null,该方法是不阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public E poll() {
// 获取队列元素数量
final AtomicInteger count = this.count;
// 队列为空,则直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 获取takeLock锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 队列不为空
if (count.get() > 0) {
// 出队,该方法会移除所取出的元素
x = dequeue();
// 队列元素数量-1
c = count.getAndDecrement();
if (c > 1) // c>1,表示该元素出队之后,至少还有一个元素,可以通知notEmpty条件队列中的线程进行唤醒,来执行出队操作
notEmpty.signal();
}
} finally { // 释放锁
takeLock.unlock();
}
if (c == capacity) // 在进行出队操作前队列中元素是满的,此时可能NotFull是被阻塞的,所以通知NotFull条件队列中的线程进行唤醒,来执行入队操作
signalNotFull();
return x;
}

// Removes a node from head of queue.
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

peek方法

从队列头部获取元素,但是不会移除该元素,如果队列为空,则直接返回null,该方法是不阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public E peek() {
// 队列为空,直接返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
// 获取takeLock锁
takeLock.lock();
try {
// 获取头部元素
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

take方法

与poll方法类似,从队列头部获取元素,并移除该元素,但是如果队列为空,则会阻塞当前线程直到队列不为空然后返回元素,该方法是阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E take() throws InterruptedException {
E x;
int c = -1;
// 队列元素数量
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 获取可中断的takeLock锁
takeLock.lockInterruptibly();
try {
// 队列为空,则进行阻塞,使用while循环防止虚假唤醒
while (count.get() == 0) {
notEmpty.await();
}
// 出队并移除元素
x = dequeue();
// 队列元素数量-1
c = count.getAndDecrement();
if (c > 1)// c>1,表示该元素出队之后,至少还有一个元素,可以通知notEmpty条件队列中的线程进行唤醒,来执行出队操作
notEmpty.signal();
} finally { // 释放锁
takeLock.unlock();
}
if (c == capacity)// 在进行出队操作前队列中元素是满的,此时可能NotFull是被阻塞的,所以通知NotFull条件队列中的线程进行唤醒,来执行入队操作
signalNotFull();
return x;
}

PriorityBlockingQueue

PriorityBlockingQueue类是一个无界阻塞队列,与LinkedBlockingQueue类似,只是排序是基于优先级的阻塞队列,可以决定元素的优先顺序(使用自然排序或者比较器来进行排序,传入的对象必须实现Comparable接口),会自动进行扩容,内部控制线程同步的锁是公平锁,存储使用的是平衡二叉堆实现的

源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 存放队列元素
private transient Object[] queue;

// 队列元素数量
private transient int size;


// 比较器,使用该比较器来比较元素大小进行排序,如果为null,则使用元素的自然排序
private transient Comparator<? super E> comparator;

// 控制只能有一个线程进行入队、出队操作
private final ReentrantLock lock;

// 条件变量用来实现take方法阻塞
private final Condition notEmpty;

// 自旋锁,使用cas操作来保证只有一个线程可以扩容队列,状态为0表示当前没有进行扩容,1表示当前正在扩容
private transient volatile int allocationSpinLock;

// 默认队列大小为11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 最大队列大小
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* A plain PriorityQueue used only for serialization,
* to maintain compatibility with previous versions
* of this class. Non-null only during serialization/deserialization.
*/
private PriorityQueue<E> q;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
重要方法

offer方法

offer方法在队列中添加添加元素,由于该队列是无界的,所以不会阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public boolean offer(E e) {
// 如果元素为null,则抛出NPE,队列中不可存储null值
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
int n, cap;
Object[] array;
// 当前元素个数>=队列容量,进行扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null) // 比较器为null,进行元素的自然排序
siftUpComparable(n, e, array);
else // 有自定义的比较器,使用自定义比较器进行排序
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal(); // 唤醒由于队列没有元素导致的take操作阻塞的一个线程
} finally {
lock.unlock();
}
return true;
}

// 扩容操作
private void tryGrow(Object[] array, int oldCap) {
// 先释放掉主锁,由于扩容比较费时,释放锁可以让其他线程可以做其他的操作
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 进行cas操作,成功则进行扩容,保证只有一个线程可以进行扩容操作
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// oldCap小于64,则执行oldCap + 2,否则乘以2
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 最大容量不能超过MAX_ARRAY_SIZE
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
// 在某线程进行扩容时,可能会有其他线程也来进行扩容,由于CAS操作失败,会执行该代码,让出cpu
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
// 扩容成功
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}

// 二叉堆操作
// k传入的是队列的元素数量,也就是所要添加的元素可能会放入的数组下标,x是所要添加的元素,array是当前的队列元素
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
// 第一个元素添加进来时不需要进入while循环,直接添加元素即可,因为只有一个元素,不需要排序
while (k > 0) {
// 因为是树形,找到该元素的父节点
int parent = (k - 1) >>> 1;
Object e = array[parent];
// 比较该节点和父节点
if (key.compareTo((T) e) >= 0)
break;
// 与父节点交换位置
array[k] = e;
k = parent;
}
array[k] = key;
}

put方法

与offer方法相同

1
2
3
public void put(E e) {
offer(e); // never need to block
}

poll方法

poll方法获取第一个元素,如果队列为空,则返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
int n = size - 1;
// 队列为空,则直接返回null
if (n < 0)
return null;
else {
Object[] array = queue;
// 获取队头元素
E result = (E) array[0];
// 获取队尾元素,并赋值为null
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 根节点没有了,需要重新进行二叉堆的构建
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}

// 由于根节点被移除了,所以需要重新构建二叉堆
// k为0,x为之前的队尾元素,array为队列,n为队尾数组下标
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}

take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取可中断的锁
lock.lockInterruptibly();
E result;
try {
// 如果队列为空,则阻塞
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

DelayQueue

DelayQueue类是一种延迟队列,带有延迟时间,只有当延迟时间到了,才能从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,该队列也是一个没有大小限制的队列,可以用做对缓存超时的数据移除、任务超时处理和空闲连接关闭等。

源码分析
1
2
3
4
5
6
7
8
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader变量的使用基于Leader-Follower模式的变体,用于减少不必要的线程等待
// 当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos等待delay时间,但是其他线程则会调用available.await()进行无限等待
private Thread leader = null;
// 条件变量
private final Condition available = lock.newCondition();
public DelayQueue() {}
重要方法

offer方法

插入元素到队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队操作,如果为null会抛出NPE
q.offer(e);
if (q.peek() == e) { // 使用的是PriorityQueue,优先级队列,获取的是最先要过期的,所以当前元素时第一个元素,之前的队列没有元素
leader = null;
// 入队成功,通知被阻塞的线程进行唤醒
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

take方法

获取并移除队列里面延迟时间过期的元素,如果没有过期元素则等待,会阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取但不移除元素 1
E first = q.peek();
if (first == null) // 队列中没有元素
// 会进行阻塞,等待唤醒
available.await();
else {
// 获取该元素的过期时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 已经过期,取出
return q.poll();
first = null; // 走到这里说明没有过期 don't retain ref while waiting
if (leader != null) // leader不为null,说明有其他线程在进行take操作,进行阻塞等待
available.await();
else { // 当前没有其他线程在进行take操作,选取当前线程作为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 等待该元素过期,然后重新竞争锁
} finally {
// 重置leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null) // 当前线程已经移除了过期元素,并且队列中还有元素,唤醒被阻塞的线程
available.signal();
lock.unlock();
}
}

poll方法

获取并移除队列中的过期元素,没有则返回null,不会阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// 队列为空,或者不为空但是没有过期,直接返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}

SynchronousQueue

SynchronousQueue类是一个没有缓冲的队列,不会在队列中维护任何的存储空间,没有存储能力,生产者生产的数据直接会被消费者获取并消费,只会在没有可消费的数据时,阻塞数据的消费者,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间,newCachedThreadPool线程池使用了该队列

TransferQueue

TransferQueue类主要新增了tryTransfer方法和transfer方法,实现类有LinkedTransferQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface TransferQueue<E> extends BlockingQueue<E> {

boolean tryTransfer(E e);
// 生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费
void transfer(E e) throws InterruptedException;

boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

boolean hasWaitingConsumer();


int getWaitingConsumerCount();
}

阻塞方法与非阻塞方法

非阻塞方法

  • add 将元素插入到队列尾部,插入成功返回true,插入失败抛出异常
  • remove 移除队首元素,移除成功返回true,移除失败,抛出异常
  • offer(E e) 将元素插入到队尾,插入成功,返回true,插入失败返回false
  • poll 移除并获取队首元素,成功返回队首元素,否则返回null
  • peek 获取队首元素,成功返回队首元素,否则返回null

阻塞方法

  • put 将元素插入到队尾,如果队列已满,则等待
  • take 从队首取元素,如果队列为空,则等待
  • offer(E e, long timeout, TimeUnit unit) 向队尾存入元素,如果队列已满,则等待一定的时间,如果时间已到,还是没有插入成功,则返回false,否则返回true
  • E poll(long timeout, TimeUnit unit) 从队首取元素,如果队列为空,则等待一段时间,当时间已到,如果没有取到,,则返回null,否则返回取得的元素

欢迎关注我的其它发布渠道