public E peek(){ final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } }
publicLinkedBlockingQueue(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
public E poll(){ // 获取队列元素数量 final AtomicInteger count = this.count; // 队列为空,则直接返回null if (count.get() == 0) returnnull; E x = null; int c = -1; // 获取takeLock锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 队列不为空 if (count.get() > 0) { // 出队,该方法会移除所取出的元素 x = dequeue(); // 队列元素数量-1 c = count.getAndDecrement(); if (c > 1) // c>1,表示该元素出队之后,至少还有一个元素,可以通知notEmpty条件队列中的线程进行唤醒,来执行出队操作 notEmpty.signal(); } } finally { // 释放锁 takeLock.unlock(); } if (c == capacity) // 在进行出队操作前队列中元素是满的,此时可能NotFull是被阻塞的,所以通知NotFull条件队列中的线程进行唤醒,来执行入队操作 signalNotFull(); return x; }
// Removes a node from head of queue. private E dequeue(){ // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
peek方法
从队列头部获取元素,但是不会移除该元素,如果队列为空,则直接返回null,该方法是不阻塞的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
public E peek(){ // 队列为空,直接返回null if (count.get() == 0) returnnull; final ReentrantLock takeLock = this.takeLock; // 获取takeLock锁 takeLock.lock(); try { // 获取头部元素 Node<E> first = head.next; if (first == null) returnnull; else return first.item; } finally { takeLock.unlock(); } }
/** * A plain PriorityQueue used only for serialization, * to maintain compatibility with previous versions * of this class. Non-null only during serialization/deserialization. */ private PriorityQueue<E> q; publicPriorityBlockingQueue(){ this(DEFAULT_INITIAL_CAPACITY, null); }
publicPriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator){ if (initialCapacity < 1) thrownew IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
public E poll(){ final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } }
private E dequeue(){ int n = size - 1; // 队列为空,则直接返回null if (n < 0) returnnull; else { Object[] array = queue; // 获取队头元素 E result = (E) array[0]; // 获取队尾元素,并赋值为null E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; // 根节点没有了,需要重新进行二叉堆的构建 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }
// 由于根节点被移除了,所以需要重新构建二叉堆 // k为0,x为之前的队尾元素,array为队列,n为队尾数组下标 privatestatic <T> voidsiftDownComparable(int k, T x, Object[] array, int n){ if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // assume left child is least Object c = array[child]; int right = child + 1; if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; } }
take方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
public E take()throws InterruptedException { final ReentrantLock lock = this.lock; // 获取可中断的锁 lock.lockInterruptibly(); E result; try { // 如果队列为空,则阻塞 while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }