1. 什么是阻塞队列?
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
操作\处理方式 | 抛出异常 | 返回特殊值 | 阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | — | — |
- 处理方式:
处理方式是指出现以下情况时,阻塞队列的不同操作方法对应的处理方式 :- 向已满的队列插入新的元素
- 从空队列中移除元素
- 检查空队列中的元素
- 抛出异常:<队列满时>llegalStateException(“Queue full”) / <队列空时>NoSuchElementException
- 返回特殊值:成功返回
true
或者元素,失败返回false
或者null
- 阻塞:
2. JDK 内置的阻塞队列
JDK7提供了7个阻塞队列。分别是
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
3. 阻塞队列的使用
生产者-消费者问题的 BlockingQueue 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
/** * 生产者-消费者问题 BlockingQueue 简单实现 */ public class Main { private static final int MAX = 8; // 最大循环数 private static final int QUEUE_MAX_SIZE = 3; private static final int DELAYED_PRODUCER = 4; private static final int DELAYED_CUSTOMER = 3; private static BlockingQueue<Good> sBlockingQueue = new LinkedBlockingQueue<Good>(QUEUE_MAX_SIZE); public static void main(String[] args) { Thread producer = new Thread(new Runnable() { @Override public void run() { int i = 0; while (i < MAX) { try { Thread.sleep(new Random(System.currentTimeMillis()).nextInt(DELAYED_PRODUCER) * 1000); Good good = new Good(); good.index = i; System.out.println(">>> " + good + " produced."); if (sBlockingQueue.size() >= QUEUE_MAX_SIZE) System.out.println(" Queue is full, producer waiting ..."); sBlockingQueue.put(good); // 当队列满时,这里会阻塞 producer 线程 System.out.println(">>> " + good + " added."); } catch (InterruptedException e) { continue; } ++i; } } }); Thread customer = new Thread(new Runnable() { @Override public void run() { int i = 0; while (i < MAX) { try { Thread.sleep(new Random(System.currentTimeMillis()).nextInt(DELAYED_CUSTOMER) * 1000); if (sBlockingQueue.isEmpty()) System.out.println(" Queue is empty, customer waiting ..."); Good good = sBlockingQueue.take(); // 当队列空时,这里会阻塞 customer 线程 System.out.println(" " + good + " consumed. >>>"); } catch (InterruptedException e) { continue; } ++i; } } }); producer.start(); customer.start(); } private static class Good { private int index; @Override public String toString() { return "Good{" + "index=" + index + '}'; } } } |
随机运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
Queue is empty, customer waiting ... // 队列是空,等待生产者生产 >>> Good{index=0} produced. >>> Good{index=0} added. // 生产者生产 Good{index=0} consumed. >>> // 消费者消费 >>> Good{index=1} produced. >>> Good{index=1} added. >>> Good{index=2} produced. >>> Good{index=2} added. >>> Good{index=3} produced. >>> Good{index=3} added. >>> Good{index=4} produced. Queue is full, producer waiting ... // 队列已满,等待消费者消费 Good{index=1} consumed. >>> // 消费者消费 >>> Good{index=4} added. // 队列不满,生产者继续生产 >>> Good{index=5} produced. Queue is full, producer waiting ... Good{index=2} consumed. >>> >>> Good{index=5} added. Good{index=3} consumed. >>> >>> Good{index=6} produced. >>> Good{index=6} added. Good{index=4} consumed. >>> Good{index=5} consumed. >>> Good{index=6} consumed. >>> Queue is empty, customer waiting ... // 消费者连续消费,队列已空 >>> Good{index=7} produced. >>> Good{index=7} added. Good{index=7} consumed. >>> |
4. 参考
【1】: http://www.infoq.com/cn/articles/java-blocking-queue
【2】: http://developer.android.com/reference/java/util/concurrent/BlockingDeque.html