线程池中常见的阻塞队列
Java 中的 java.util.concurrent
包提供了多种阻塞队列,它们在多线程环境下非常有用,特别是在构建线程池时。阻塞队列(BlockingQueue)是一种特殊的队列,用于在生产者和消费者线程之间安全地传递数据。线程池中的工作队列通常就是阻塞队列,用于存储待执行的任务。
以下是 Java 中几种常见的阻塞队列,以及它们的特点和用途:
1. ArrayBlockingQueue
ArrayBlockingQueue
是一个由数组支持的有界阻塞队列。这个队列按 FIFO(先进先出)原则对元素进行排序。
源码片段和特点
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
// ... 省略构造器和其他方法
public void put(E e) throws InterruptedException {
// ... 省略 null 检查和中断检查
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// ... 更多方法
}
ArrayBlockingQueue
需要在创建时指定容量,且一旦创建后不能更改。此队列使用单个锁来控制插入和移除操作,从而导致这两种操作不能完全并行。
2. LinkedBlockingQueue
LinkedBlockingQueue
是一个由链表结构支持的可选有界队列。
源码片段和特点
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/** The head of the linked list */
transient Node<E> head;
/** The last node of the linked list */
private transient Node<E> last;
// ... 省略构造器和其他方法
public void put(E e) throws InterruptedException {
// ... 省略 null 检查和中断检查
int c = -1;
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 如果队列满,等待
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
// ... 更新计数和唤醒等待线程的代码
} finally {
putLock.unlock();
}
// ... 如果队列是空的,唤醒取元素的线程
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
// ... 更新计数和唤醒等待线程的代码
} finally {
takeLock.unlock();
}
// ... 如果队列是满的,唤醒放入元素的线程
return x;
}
// ... 更多方法
}
LinkedBlockingQueue
内部使用两个锁,一个用于入队操作,一个用于出队操作,允许这两个操作并行进行,从而提高了队列在并发环境中的吞吐量。
3. PriorityBlockingQueue
PriorityBlockingQueue
是一个支持优先级排序的无界阻塞队列。队列中元素的排序可以根据自然排序,或者根据构造时提供的 Comparator
来进行。
源码片段和特点
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 默认初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 数组用来存储队列元素
private transient Object[] queue;
// 队列中元素的数量
private transient int size;
// 比较器,决定元素的顺序
private transient Comparator<? super E> comparator;
// ... 省略构造器和其他方法
public void put(E e) {
offer(e); // 在 PriorityBlockingQueue 中,put 和 offer 实际上是一样的。
}
public boolean offer(E e) {
// ... 省略 null 检查
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (size == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// ... 更多方法
}
PriorityBlockingQueue
通常用于执行基于优先级的任务调度。注意此队列不阻塞数据插入操作,但如果队列为空,数据取出操作会阻塞。
4. SynchronousQueue
SynchronousQueue
是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,反之亦然。
源码片段和特点
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// ... 省略不相关代码和变量
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// A dummy node for the put operation
Node<E> node = new Node<>(e);
// ... 尝试传输对象到一个消费者
if (!transferer.transfer(node, null, PUT)) {
// ... 如果失败,等待消费者线程
}
}
public E take() throws InterruptedException {
Node<E> result = transferer.transfer(null, null, TAKE);
if (result != null) return result.item;
int s = -1;
// ... 如果失败,等待生产者线程
throw new InterruptedException();
}
// ... 更多方法
}
SynchronousQueue
适合于传递性的任务调度,每个任务都由一个线程提交,另一个线程接收执行。
示例代码
以下示例演示如何使用 ArrayBlockingQueue
在线程池中:
import java.util.concurrent.*;
public class ThreadPoolWithArrayBlockingQueue {
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 1, TimeUnit.MINUTES, queue);
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Executing task " + taskId +
" on thread " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
在这个例子中,我们创建了一个有界队列 ArrayBlockingQueue
,用作线程池的工作队列。线程池被配置为有 2 个核心线程,最多 4 个线程,并且有一个容量为 10 的队列用于存储待处理任务。
不同的阻塞队列实现为线程池的行为提供了不同的策略。选择合适的阻塞队列,可以根据应用程序的需求优化性能并提供更强的功能。
还没有评论,来说两句吧...