0%

阻塞队列

阻塞队列

BlockingQueue接口是在jdk5版本提供的,在线程池中用到了阻塞队列来实现,阻塞队列是深入学习线程池的基础,该队列通常是有限的容量,如果队列已满添加操作就会阻塞,如果队列为空,移除操作就会阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public interface BlockingQueue<E> extends Queue<E> {

boolean add(E e);


boolean offer(E e);


void put(E e) throws InterruptedException;


boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;


E take() throws InterruptedException;


E poll(long timeout, TimeUnit unit)
throws InterruptedException;


int remainingCapacity();


boolean remove(Object o);


public boolean contains(Object o);


int drainTo(Collection<? super E> c);


int drainTo(Collection<? super E> c, int maxElements);
}

在使用阻塞队列时最好使用put()、take()以及可定时的offer()和poll(),而不要使用Queue接口中的方法,否则就丢失了阻塞的效果

BlockingQueue实现类

阻塞队列

ArrayBlockingQueue

ArrayBlockingQueue类底层是由数组支持的有界阻塞队列,实现了FIFO(先进先出)排序机制。新添加的元素都在队列的尾部,获取操作是从队列头部进行,可以设置公平策略,默认是非公平的

内部没有实现读写分离,生产和消费不能完全并行,长度需要定义

1
2
3
4
5
6
7
8
9
10
11
12
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

LinkedBlockingQueue

LinkedBlockingQueue类底层为链表,无界队列被设计为没有容量限制,最大值为Integer.MAX_VALUE,当然也提供了设置容量的构造器

可以很好的处理并发数据,其内部实现采用分离锁(读写分离两个锁),可以实现生产和消费操作的完全并行运行。

1
2
3
4
5
6
7
8
9
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

PriorityBlockingQueue

PriorityBlockingQueue类是一个无界队列,基于优先级的阻塞队列,可以决定元素的优先顺序(使用自然排序或者比较器来进行排序,传入的对象必须实现Comparable接口),会自动进行扩容,内部控制线程同步的锁是公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

DelayQueue

DelayQueue类是一种延迟队列,带有延迟时间,只有当延迟时间到了,才能从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,该队列也是一个没有大小限制的队列,可以用做对缓存超时的数据移除、任务超时处理和空闲连接关闭等。

1
public DelayQueue() {}

SynchronousQueue

SynchronousQueue类是一个没有缓冲的队列,不会在队列中维护任何的存储空间,没有存储能力,生产者生产的数据直接会被消费者获取并消费。

TransferQueue

TransferQueue类主要新增了tryTransfer方法和transfer方法,实现类有LinkedTransferQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface TransferQueue<E> extends BlockingQueue<E> {

boolean tryTransfer(E e);
// 生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费
void transfer(E e) throws InterruptedException;

boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

boolean hasWaitingConsumer();


int getWaitingConsumerCount();
}