现在的位置: 首页 > 编程技术 > Java > 正文

Java 中的阻塞队列

2015年11月22日 Java ⁄ 共 3226字 ⁄ 字号 Java 中的阻塞队列已关闭评论 ⁄ 阅读 677 次

1. 什么是阻塞队列?

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

操作\处理方式 抛出异常 返回特殊值 阻塞 超时退出
插入 add(e) offer(e) put(e) offer(time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek()
  • 处理方式:
    处理方式是指出现以下情况时,阻塞队列的不同操作方法对应的处理方式 :

    1. 向已满的队列插入新的元素
    2. 从空队列中移除元素
    3. 检查空队列中的元素
  • 抛出异常:<队列满时>llegalStateException(“Queue full”) / <队列空时>NoSuchElementException
  • 返回特殊值:成功返回 true 或者元素,失败返回 false或者 null
  • 阻塞:

2. JDK 内置的阻塞队列

JDK7提供了7个阻塞队列。分别是

  1. ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  3. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  5. SynchronousQueue:一个不存储元素的阻塞队列。
  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

3. 阻塞队列的使用

生产者-消费者问题的 BlockingQueue 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
 * 生产者-消费者问题 BlockingQueue 简单实现
 */
public class Main {
    private static final int MAX = 8;   // 最大循环数
    private static final int QUEUE_MAX_SIZE = 3;
    private static final int DELAYED_PRODUCER = 4;
    private static final int DELAYED_CUSTOMER = 3;
    private static BlockingQueue<Good> sBlockingQueue = new LinkedBlockingQueue<Good>(QUEUE_MAX_SIZE);

    public static void main(String[] args) {
        Thread producer = new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (i < MAX) {
                    try {
                        Thread.sleep(new Random(System.currentTimeMillis()).nextInt(DELAYED_PRODUCER) * 1000);
                        Good good = new Good();
                        good.index = i;
                        System.out.println(">>> " + good + " produced.");

                        if (sBlockingQueue.size() >= QUEUE_MAX_SIZE) 
                            System.out.println("    Queue is full, producer waiting ...");

                        sBlockingQueue.put(good); // 当队列满时,这里会阻塞 producer 线程
                        System.out.println(">>> " + good + " added.");
                    } catch (InterruptedException e) {
                        continue;
                    }

                    ++i;
                }
            }
        });

        Thread customer = new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (i < MAX) {
                    try {
                        Thread.sleep(new Random(System.currentTimeMillis()).nextInt(DELAYED_CUSTOMER) * 1000);

                        if (sBlockingQueue.isEmpty())
                            System.out.println("    Queue is empty, customer waiting ...");

                        Good good = sBlockingQueue.take(); // 当队列空时,这里会阻塞 customer 线程
                        System.out.println("    " + good + " consumed. >>>");
                    } catch (InterruptedException e) {
                        continue;
                    }

                    ++i;
                }
            }
        });

        producer.start();
        customer.start();
    }

    private static class Good {
        private int index;
        @Override
        public String toString() { return "Good{" + "index=" + index + '}'; }
    }
}

随机运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    Queue is empty, customer waiting ...    // 队列是空,等待生产者生产
>>> Good{index=0} produced.
>>> Good{index=0} added.                    // 生产者生产
    Good{index=0} consumed. >>>             // 消费者消费
>>> Good{index=1} produced.
>>> Good{index=1} added.
>>> Good{index=2} produced.
>>> Good{index=2} added.
>>> Good{index=3} produced.
>>> Good{index=3} added.
>>> Good{index=4} produced.
    Queue is full, producer waiting ...     // 队列已满,等待消费者消费
    Good{index=1} consumed. >>>             // 消费者消费
>>> Good{index=4} added.                    // 队列不满,生产者继续生产
>>> Good{index=5} produced.
    Queue is full, producer waiting ...
    Good{index=2} consumed. >>>
>>> Good{index=5} added.
    Good{index=3} consumed. >>>
>>> Good{index=6} produced.
>>> Good{index=6} added.
    Good{index=4} consumed. >>>
    Good{index=5} consumed. >>>
    Good{index=6} consumed. >>>             
    Queue is empty, customer waiting ...    // 消费者连续消费,队列已空
>>> Good{index=7} produced.
>>> Good{index=7} added.
    Good{index=7} consumed. >>>

4. 参考

【1】: http://www.infoq.com/cn/articles/java-blocking-queue
【2】: http://developer.android.com/reference/java/util/concurrent/BlockingDeque.html

抱歉!评论已关闭.