集合-Queue
集合-Queue
一、核心理论
1.1 Queue接口概述
Queue接口是Java集合框架中的队列数据结构,遵循FIFO(先进先出)原则,用于存储按顺序排列的元素。除了基本的Collection操作外,Queue还提供了专门的插入、删除和检查操作,并针对这些操作定义了两种行为模式:当操作失败时要么抛出异常,要么返回特殊值。
1.2 继承体系
1.3 主要实现类对比
| 实现类 | 底层结构 | 阻塞特性 | 有序性 | 容量限制 | 线程安全 | 适用场景 |
|---|---|---|---|---|---|---|
| PriorityQueue | 数组/堆 | 非阻塞 | 自然排序/定制排序 | 无(动态扩容) | 否 | 优先级任务调度 |
| ArrayDeque | 数组 | 非阻塞 | 插入顺序 | 无(动态扩容) | 否 | 栈/队列实现 |
| LinkedList | 双向链表 | 非阻塞 | 插入顺序 | 无 | 否 | 简单队列实现 |
| ArrayBlockingQueue | 数组 | 阻塞 | FIFO | 有界 | 是 | 固定容量并发场景 |
| LinkedBlockingQueue | 链表 | 阻塞 | FIFO | 可选有界 | 是 | 高并发生产者-消费者 |
| SynchronousQueue | 无缓冲 | 阻塞 | FIFO | 容量为0 | 是 | 线程间直接移交数据 |
| DelayQueue | 优先级队列 | 阻塞 | 延迟时间 | 无 | 是 | 定时任务调度 |
| ConcurrentLinkedQueue | 链表 | 非阻塞 | FIFO | 无 | 是 | 高并发非阻塞场景 |
1.4 核心操作对比
| 操作类型 | 抛出异常 | 返回特殊值 | 阻塞 | 超时退出 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 删除 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | - | - |
1.5 JDK版本特性
- JDK 1.5: 引入Queue接口及PriorityQueue、ConcurrentLinkedQueue
- JDK 1.6: 完善阻塞队列实现,新增SynchronousQueue等
- JDK 8: 新增Stream API支持,如queue.stream()
- JDK 9: 新增of()方法创建不可变队列
- JDK 16: 新增addAll()默认实现
二、代码实践
2.1 基本操作示例
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Queue接口基本操作示例
* 展示不同类型队列的创建和常用API
*/
public class QueueBasicOperations {
public static void main(String[] args) throws InterruptedException {
// 1. 优先级队列(自然排序)
Queue<Integer> priorityQueue = new PriorityQueue<>();
priorityQueue.add(3);
priorityQueue.add(1);
priorityQueue.add(2);
System.out.println("PriorityQueue: " + priorityQueue); // [1, 3, 2]
System.out.println("Poll: " + priorityQueue.poll()); // 1
// 2. 双端队列
Deque<String> arrayDeque = new ArrayDeque<>();
arrayDeque.addFirst("a");
arrayDeque.addLast("b");
arrayDeque.push("c"); // 等价于addFirst
System.out.println("ArrayDeque: " + arrayDeque); // [c, a, b]
System.out.println("Pop: " + arrayDeque.pop()); // c
// 3. 阻塞队列
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);
blockingQueue.put("x");
blockingQueue.put("y");
System.out.println("BlockingQueue size: " + blockingQueue.size()); // 2
// 超时插入
boolean inserted = blockingQueue.offer("z", 1, TimeUnit.SECONDS);
System.out.println("插入成功? " + inserted); // false
// 4. 并发队列
Queue<String> concurrentQueue = new ConcurrentLinkedQueue<>();
concurrentQueue.offer("A");
concurrentQueue.offer("B");
concurrentQueue.offer("C");
System.out.println("ConcurrentLinkedQueue: " + concurrentQueue); // [A, B, C]
}
}2.2 生产者-消费者模型实现
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用阻塞队列实现生产者-消费者模型
* 展示ArrayBlockingQueue在并发场景下的应用
*/
public class ProducerConsumerExample {
// 缓冲区大小
private static final int BUFFER_SIZE = 5;
// 产品数量
private static final int PRODUCT_COUNT = 10;
public static void main(String[] args) {
// 创建有界阻塞队列作为缓冲区
BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(BUFFER_SIZE);
// 创建生产者和消费者
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Producer(buffer, PRODUCT_COUNT));
executor.submit(new Consumer(buffer, PRODUCT_COUNT));
executor.submit(new Consumer(buffer, PRODUCT_COUNT));
executor.shutdown();
}
/**
* 生产者线程
*/
static class Producer implements Runnable {
private final BlockingQueue<Integer> buffer;
private final int productCount;
public Producer(BlockingQueue<Integer> buffer, int productCount) {
this.buffer = buffer;
this.productCount = productCount;
}
@Override
public void run() {
try {
for (int i = 1; i <= productCount; i++) {
System.out.println("生产者生产产品: " + i);
buffer.put(i); // 若缓冲区满则阻塞
Thread.sleep(100); // 模拟生产耗时
}
System.out.println("生产者完成生产任务");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* 消费者线程
*/
static class Consumer implements Runnable {
private final BlockingQueue<Integer> buffer;
private final int productCount;
private static int totalConsumed = 0;
public Consumer(BlockingQueue<Integer> buffer, int productCount) {
this.buffer = buffer;
this.productCount = productCount;
}
@Override
public void run() {
try {
while (totalConsumed < productCount) {
Integer product = buffer.take(); // 若缓冲区空则阻塞
synchronized (Consumer.class) {
if (totalConsumed < productCount) {
System.out.println("消费者消费产品: " + product);
totalConsumed++;
Thread.sleep(200); // 模拟消费耗时
}
}
}
System.out.println("消费者完成消费任务");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}2.3 延迟队列应用
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延迟队列(DelayQueue)应用示例
* 实现定时任务调度功能
*/
public class DelayQueueExample {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 添加延迟任务
delayQueue.put(new DelayedTask("任务1", 1, TimeUnit.SECONDS));
delayQueue.put(new DelayedTask("任务2", 3, TimeUnit.SECONDS));
delayQueue.put(new DelayedTask("任务3", 2, TimeUnit.SECONDS));
System.out.println("开始执行延迟任务...");
// 执行延迟任务
while (!delayQueue.isEmpty()) {
DelayedTask task = delayQueue.take(); // 阻塞直到有任务到期
task.run();
}
}
/**
* 延迟任务类
*/
static class DelayedTask implements Delayed, Runnable {
private final String taskName;
private final long executeTime;
public DelayedTask(String taskName, long delay, TimeUnit unit) {
this.taskName = taskName;
this.executeTime = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
}
@Override
public void run() {
System.out.println("执行任务: " + taskName + ", 当前时间: " + System.currentTimeMillis());
}
}
}2.4 双端队列实现栈和队列
import java.util.ArrayDeque;
import java.util.Deque;
/**
* 使用ArrayDeque实现栈和队列
* 展示双端队列的灵活性
*/
public class DequeAsStackAndQueue {
public static void main(String[] args) {
// 1. 作为栈使用(LIFO)
Deque<Integer> stack = new ArrayDeque<>();
stack.push(1);
stack.push(2);
stack.push(3);
System.out.println("栈顶元素: " + stack.peek()); // 3
System.out.println("弹出元素: " + stack.pop()); // 3
System.out.println("栈大小: " + stack.size()); // 2
// 2. 作为队列使用(FIFO)
Deque<Integer> queue = new ArrayDeque<>();
queue.offer(1);
queue.offer(2);
queue.offer(3);
System.out.println("队首元素: " + queue.peek()); // 1
System.out.println("出队元素: " + queue.poll()); // 1
System.out.println("队列大小: " + queue.size()); // 2
// 3. 双端操作
Deque<Integer> deque = new ArrayDeque<>();
deque.addFirst(1);
deque.addLast(2);
deque.addFirst(0);
System.out.println("双端队列: " + deque); // [0, 1, 2]
System.out.println("移除第一个: " + deque.removeFirst()); // 0
System.out.println("移除最后一个: " + deque.removeLast()); // 2
}
}三、设计思想
3.1 接口设计模式
Queue接口采用了"接口继承+功能扩展"的设计模式,在Collection接口基础上扩展了队列特有操作,并通过不同的实现类提供多样化的队列特性。这种设计保证了接口的简洁性和实现的灵活性。
3.2 阻塞队列实现原理
阻塞队列通过ReentrantLock和Condition实现线程间的协调:
3.3 优先级队列设计
PriorityQueue基于二叉小顶堆实现,通过数组存储元素,利用堆的特性实现元素的自动排序:
- 父节点索引:i
- 左子节点索引:2i+1
- 右子节点索引:2i+2
- 插入时"上浮"调整
- 删除时"下沉"调整
3.4 并发队列设计
ConcurrentLinkedQueue采用无锁CAS操作实现高并发性能:
- 基于单向链表实现
- 使用volatile变量保证可见性
- 通过CAS操作实现原子性
- 采用Michael-Scott非阻塞算法
四、避坑指南
4.1 常见异常及解决方案
4.1.1 NoSuchElementException
原因:队列为空时调用element()或remove()方法 解决方案:
Queue<String> queue = new LinkedList<>();
// 错误示例
queue.element(); // 抛出NoSuchElementException
// 正确示例
if (!queue.isEmpty()) {
String element = queue.element();
}
// 更优方案: 使用peek()
String element = queue.peek(); // 队列为空时返回null4.1.2 IllegalStateException
原因:队列已满时调用add()方法 解决方案:
Queue<String> queue = new ArrayBlockingQueue<>(1);
queue.add("a");
// 错误示例
queue.add("b"); // 抛出IllegalStateException
// 正确示例
boolean success = queue.offer("b"); // 队列满时返回false
if (!success) {
System.out.println("队列已满,添加失败");
}4.2 线程安全问题
问题:在多线程环境下使用非线程安全的队列(如LinkedList、PriorityQueue) 解决方案:
// 错误示例 - 多线程下不安全
Queue<String> unsafeQueue = new LinkedList<>();
// 正确方案1: 使用线程安全的实现类
Queue<String> safeQueue1 = new ConcurrentLinkedQueue<>();
// 正确方案2: 使用阻塞队列
BlockingQueue<String> safeQueue2 = new LinkedBlockingQueue<>();
// 正确方案3: 使用Collections.synchronized包装
Queue<String> safeQueue3 = Collections.synchronizedQueue(new LinkedList<>());4.3 PriorityQueue陷阱
4.3.1 无序遍历
问题:PriorityQueue的迭代器不保证有序遍历 解决方案:
PriorityQueue<Integer> pq = new PriorityQueue<>();
pq.add(3);
pq.add(1);
pq.add(2);
// 错误: 直接遍历不保证有序
for (Integer num : pq) {
System.out.print(num + " "); // 可能输出: 1 3 2
}
// 正确: 逐个poll()获取有序元素
while (!pq.isEmpty()) {
System.out.print(pq.poll() + " "); // 输出: 1 2 3
}4.3.2 null元素
问题:PriorityQueue不允许添加null元素 解决方案:
PriorityQueue<String> pq = new PriorityQueue<>();
// 错误示例
pq.add(null); // 抛出NullPointerException
// 正确示例: 使用特殊值代替null
pq.add("NULL_VALUE");五、深度思考题
思考题1: 阻塞队列如何实现线程间通信
思考题回答: 阻塞队列通过内置锁和条件变量实现线程间通信。以ArrayBlockingQueue为例,其内部维护了一个ReentrantLock和两个Condition(notEmpty和notFull)。当队列满时,生产者线程调用put()方法会被阻塞在notFull条件上;当队列空时,消费者线程调用take()方法会被阻塞在notEmpty条件上。当有元素入队时,会唤醒notEmpty条件上的消费者;当有元素出队时,会唤醒notFull条件上的生产者。这种机制实现了生产者和消费者之间的高效协作。
思考题2: ArrayDeque与LinkedList作为队列和栈的性能对比
思考题回答: ArrayDeque通常比LinkedList作为队列和栈时性能更好,原因如下:
- 内存结构:ArrayDeque基于数组实现,内存连续,缓存利用率高;LinkedList基于节点,内存分散,可能导致更多缓存失效
- 操作效率:ArrayDeque的add/remove操作都是O(1)时间复杂度且实现简单;LinkedList需要创建节点对象,涉及更多内存操作
- 空间开销:LinkedList每个元素需要额外存储前后指针,空间开销更大
- 迭代性能:ArrayDeque的迭代器实现更简单高效
唯一适合使用LinkedList的场景是需要在队列中间进行插入/删除操作,或者需要实现双向迭代器的场景。
思考题3: 如何实现一个线程安全的非阻塞队列
思考题回答: 可以使用CAS(Compare-And-Swap)操作实现非阻塞队列,核心思想是通过原子操作避免使用锁:
- 使用volatile变量存储头节点和尾节点,保证可见性
- 入队时通过CAS原子更新尾节点
- 出队时通过CAS原子更新头节点
- 处理并发情况下的节点引用更新冲突
简化实现示例:
import java.util.concurrent.atomic.AtomicReference;
public class ConcurrentNonBlockingQueue<T> {
// 节点定义
private static class Node<T> {
final T item;
AtomicReference<Node<T>> next;
Node(T item) {
this.item = item;
this.next = new AtomicReference<>(null);
}
}
// 头节点和尾节点(AtomicReference保证原子操作)
private final AtomicReference<Node<T>> head = new AtomicReference<>(new Node<>(null));
private final AtomicReference<Node<T>> tail = new AtomicReference<>(head.get());
// 入队操作
public void enqueue(T item) {
Node<T> newNode = new Node<>(item);
while (true) {
Node<T> currentTail = tail.get();
Node<T> tailNext = currentTail.next.get();
// 检查尾节点是否被修改
if (currentTail == tail.get()) {
// 如果尾节点的next为null,表示可以插入新节点
if (tailNext == null) {
if (currentTail.next.compareAndSet(null, newNode)) {
// 成功插入,更新尾节点
tail.compareAndSet(currentTail, newNode);
return;
}
} else {
// 尾节点已被其他线程修改,帮助更新尾节点
tail.compareAndSet(currentTail, tailNext);
}
}
}
}
// 出队操作
public T dequeue() {
while (true) {
Node<T> currentHead = head.get();
Node<T> currentTail = tail.get();
Node<T> headNext = currentHead.next.get();
// 检查头节点是否被修改
if (currentHead == head.get()) {
// 队列为空
if (currentHead == currentTail) {
if (headNext == null) {
return null; // 队列为空
}
// 帮助更新尾节点
tail.compareAndSet(currentTail, headNext);
} else {
T item = headNext.item;
// 尝试更新头节点
if (head.compareAndSet(currentHead, headNext)) {
return item;
}
}
}
}
}
}思考题4: DelayQueue的实现原理及应用场景
思考题回答: DelayQueue是一个支持延迟获取元素的无界阻塞队列,其实现原理如下:
- 底层基于PriorityQueue实现,元素必须实现Delayed接口
- Delayed接口定义了getDelay()方法返回剩余延迟时间和compareTo()方法定义排序规则
- 只有当元素的延迟时间<=0时才能被取出
- 内部通过ReentrantLock保证线程安全
- 使用Condition等待直到有元素延迟到期
应用场景包括:
- 定时任务调度:如定时执行的任务队列
- 缓存过期清理:存储带有过期时间的缓存项
- 订单超时处理:处理超过一定时间未支付的订单
- 会话超时管理:管理用户会话的过期时间
- 重试机制实现:失败操作的定时重试队列
