线程池中常见的阻塞队列

比眉伴天荒 2024-04-24 23:01 147阅读 0赞

Java 中的 java.util.concurrent 包提供了多种阻塞队列,它们在多线程环境下非常有用,特别是在构建线程池时。阻塞队列(BlockingQueue)是一种特殊的队列,用于在生产者和消费者线程之间安全地传递数据。线程池中的工作队列通常就是阻塞队列,用于存储待执行的任务。

以下是 Java 中几种常见的阻塞队列,以及它们的特点和用途:

1. ArrayBlockingQueue

ArrayBlockingQueue 是一个由数组支持的有界阻塞队列。这个队列按 FIFO(先进先出)原则对元素进行排序。

源码片段和特点
  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. /** The queued items */
  4. final Object[] items;
  5. /** items index for next take, poll, peek or remove */
  6. int takeIndex;
  7. /** items index for next put, offer, or add */
  8. int putIndex;
  9. /** Number of elements in the queue */
  10. int count;
  11. // ... 省略构造器和其他方法
  12. public void put(E e) throws InterruptedException {
  13. // ... 省略 null 检查和中断检查
  14. final ReentrantLock lock = this.lock;
  15. lock.lockInterruptibly();
  16. try {
  17. while (count == items.length)
  18. notFull.await();
  19. enqueue(e);
  20. } finally {
  21. lock.unlock();
  22. }
  23. }
  24. public E take() throws InterruptedException {
  25. final ReentrantLock lock = this.lock;
  26. lock.lockInterruptibly();
  27. try {
  28. while (count == 0)
  29. notEmpty.await();
  30. return dequeue();
  31. } finally {
  32. lock.unlock();
  33. }
  34. }
  35. // ... 更多方法
  36. }

ArrayBlockingQueue 需要在创建时指定容量,且一旦创建后不能更改。此队列使用单个锁来控制插入和移除操作,从而导致这两种操作不能完全并行。

2. LinkedBlockingQueue

LinkedBlockingQueue 是一个由链表结构支持的可选有界队列。

源码片段和特点
  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. /** The capacity bound, or Integer.MAX_VALUE if none */
  4. private final int capacity;
  5. /** Current number of elements */
  6. private final AtomicInteger count = new AtomicInteger();
  7. /** The head of the linked list */
  8. transient Node<E> head;
  9. /** The last node of the linked list */
  10. private transient Node<E> last;
  11. // ... 省略构造器和其他方法
  12. public void put(E e) throws InterruptedException {
  13. // ... 省略 null 检查和中断检查
  14. int c = -1;
  15. Node<E> node = new Node<>(e);
  16. final ReentrantLock putLock = this.putLock;
  17. final AtomicInteger count = this.count;
  18. putLock.lockInterruptibly();
  19. try {
  20. // 如果队列满,等待
  21. while (count.get() == capacity) {
  22. notFull.await();
  23. }
  24. enqueue(node);
  25. // ... 更新计数和唤醒等待线程的代码
  26. } finally {
  27. putLock.unlock();
  28. }
  29. // ... 如果队列是空的,唤醒取元素的线程
  30. }
  31. public E take() throws InterruptedException {
  32. E x;
  33. int c = -1;
  34. final AtomicInteger count = this.count;
  35. final ReentrantLock takeLock = this.takeLock;
  36. takeLock.lockInterruptibly();
  37. try {
  38. // 如果队列为空,等待
  39. while (count.get() == 0) {
  40. notEmpty.await();
  41. }
  42. x = dequeue();
  43. // ... 更新计数和唤醒等待线程的代码
  44. } finally {
  45. takeLock.unlock();
  46. }
  47. // ... 如果队列是满的,唤醒放入元素的线程
  48. return x;
  49. }
  50. // ... 更多方法
  51. }

LinkedBlockingQueue 内部使用两个锁,一个用于入队操作,一个用于出队操作,允许这两个操作并行进行,从而提高了队列在并发环境中的吞吐量。

3. PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级排序的无界阻塞队列。队列中元素的排序可以根据自然排序,或者根据构造时提供的 Comparator 来进行。

源码片段和特点
  1. public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. // 默认初始容量
  4. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  5. // 数组用来存储队列元素
  6. private transient Object[] queue;
  7. // 队列中元素的数量
  8. private transient int size;
  9. // 比较器,决定元素的顺序
  10. private transient Comparator<? super E> comparator;
  11. // ... 省略构造器和其他方法
  12. public void put(E e) {
  13. offer(e); // 在 PriorityBlockingQueue 中,put 和 offer 实际上是一样的。
  14. }
  15. public boolean offer(E e) {
  16. // ... 省略 null 检查
  17. final ReentrantLock lock = this.lock;
  18. lock.lock();
  19. try {
  20. int i = size;
  21. if (i >= queue.length)
  22. grow(i + 1);
  23. size = i + 1;
  24. if (i == 0)
  25. queue[0] = e;
  26. else
  27. siftUp(i, e);
  28. notEmpty.signal();
  29. } finally {
  30. lock.unlock();
  31. }
  32. return true;
  33. }
  34. public E take() throws InterruptedException {
  35. final ReentrantLock lock = this.lock;
  36. lock.lockInterruptibly();
  37. try {
  38. while (size == 0)
  39. notEmpty.await();
  40. return dequeue();
  41. } finally {
  42. lock.unlock();
  43. }
  44. }
  45. // ... 更多方法
  46. }

PriorityBlockingQueue 通常用于执行基于优先级的任务调度。注意此队列不阻塞数据插入操作,但如果队列为空,数据取出操作会阻塞。

4. SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,反之亦然。

源码片段和特点
  1. public class SynchronousQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. // ... 省略不相关代码和变量
  4. public void put(E e) throws InterruptedException {
  5. if (e == null) throw new NullPointerException();
  6. // A dummy node for the put operation
  7. Node<E> node = new Node<>(e);
  8. // ... 尝试传输对象到一个消费者
  9. if (!transferer.transfer(node, null, PUT)) {
  10. // ... 如果失败,等待消费者线程
  11. }
  12. }
  13. public E take() throws InterruptedException {
  14. Node<E> result = transferer.transfer(null, null, TAKE);
  15. if (result != null) return result.item;
  16. int s = -1;
  17. // ... 如果失败,等待生产者线程
  18. throw new InterruptedException();
  19. }
  20. // ... 更多方法
  21. }

SynchronousQueue 适合于传递性的任务调度,每个任务都由一个线程提交,另一个线程接收执行。

示例代码

以下示例演示如何使用 ArrayBlockingQueue 在线程池中:

  1. import java.util.concurrent.*;
  2. public class ThreadPoolWithArrayBlockingQueue {
  3. public static void main(String[] args) {
  4. BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
  5. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  6. 2, 4, 1, TimeUnit.MINUTES, queue);
  7. for (int i = 0; i < 20; i++) {
  8. final int taskId = i;
  9. executor.execute(() -> {
  10. System.out.println("Executing task " + taskId +
  11. " on thread " + Thread.currentThread().getName());
  12. try {
  13. Thread.sleep(1000);
  14. } catch (InterruptedException e) {
  15. Thread.currentThread().interrupt();
  16. }
  17. });
  18. }
  19. executor.shutdown();
  20. }
  21. }

在这个例子中,我们创建了一个有界队列 ArrayBlockingQueue,用作线程池的工作队列。线程池被配置为有 2 个核心线程,最多 4 个线程,并且有一个容量为 10 的队列用于存储待处理任务。

不同的阻塞队列实现为线程池的行为提供了不同的策略。选择合适的阻塞队列,可以根据应用程序的需求优化性能并提供更强的功能。

发表评论

表情:
评论列表 (有 0 条评论,147人围观)

还没有评论,来说两句吧...

相关阅读