集合-并发集合
集合-并发集合
一、核心理论
1.1 并发集合概述
并发集合是Java集合框架中专门为多线程环境设计的集合类,位于java.util.concurrent包下。它们通过精细的同步机制或无锁算法,在保证线程安全的同时提供了较高的并发性能,解决了传统集合在多线程环境下需要手动同步的问题。
1.2 并发集合体系
1.3 并发集合对比
| 集合类 | 底层结构 | 并发机制 | 读写性能 | 迭代特性 | 适用场景 | 
|---|---|---|---|---|---|
| ConcurrentHashMap | 数组+链表+红黑树 | 分段锁/CAS+synchronized | 高 | 弱一致性 | 高并发键值对存储 | 
| CopyOnWriteArrayList | 数组 | 写时复制 | 读高写低 | 弱一致性 | 读多写少场景 | 
| ConcurrentLinkedQueue | 链表 | CAS | 高 | 弱一致性 | 高并发FIFO队列 | 
| ArrayBlockingQueue | 数组 | 显式锁 | 中 | 弱一致性 | 有界缓冲区 | 
| LinkedBlockingQueue | 链表 | 显式锁 | 中 | 弱一致性 | 无界/有界缓冲区 | 
| SynchronousQueue | 无缓冲 | 转移机制 | 高 | 不支持 | 线程间直接通信 | 
| ConcurrentSkipListMap | 跳表 | CAS | 高 | 弱一致性 | 有序并发映射 | 
1.4 并发级别
并发集合的并发度设计:
- 分段锁:将数据分成多个段,每段单独加锁(如ConcurrentHashMap早期版本)
- CAS操作:无锁算法,通过比较并交换实现原子操作
- 写时复制:写操作时复制整个数组,读操作无锁(如CopyOnWriteArrayList)
- 分离锁:读写操作使用不同的锁(如LinkedBlockingQueue的takeLock和putLock)
1.5 JDK版本演进
- JDK 1.5:引入ConcurrentHashMap、ConcurrentLinkedQueue等早期并发集合
- JDK 1.6:优化ConcurrentHashMap性能,引入TransferQueue
- JDK 1.7:ConcurrentHashMap使用分段锁(Segment)实现
- JDK 1.8:ConcurrentHashMap彻底重构,使用CAS+synchronized替代分段锁
- JDK 9:新增不可变集合工厂方法,如List.of()、Map.of()
- JDK 16:ConcurrentHashMap新增stream()相关方法优化
二、代码实践
2.1 ConcurrentHashMap使用示例
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * ConcurrentHashMap并发操作示例
 * 展示高并发环境下的安全读写操作
 */
public class ConcurrentHashMapExample {
    private static final int THREAD_COUNT = 10;
    private static final int OPERATIONS_PER_THREAD = 10000;
    private static final Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
    public static void main(String[] args) throws InterruptedException {
        // 初始化计数器
        concurrentMap.put("counter", 0);
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        // 提交增量任务
        for (int i = 0; i < THREAD_COUNT; i++) {
            executor.submit(() -> {
                for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
                    // 原子操作: increment by 1
                    concurrentMap.computeIfPresent("counter", (k, v) -> v + 1);
                }
            });
        }
        // 关闭线程池并等待完成
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        // 输出结果
        System.out.println("预期结果: " + (THREAD_COUNT * OPERATIONS_PER_THREAD));
        System.out.println("实际结果: " + concurrentMap.get("counter"));
    }
}2.2 CopyOnWriteArrayList使用示例
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
 * CopyOnWriteArrayList使用示例
 * 展示读多写少场景下的并发安全
 */
public class CopyOnWriteArrayListExample {
    private static final List<String> cowList = new CopyOnWriteArrayList<>();
    public static void main(String[] args) throws InterruptedException {
        // 初始化列表
        cowList.add("元素1");
        cowList.add("元素2");
        cowList.add("元素3");
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        // 读线程
        executor.submit(() -> {
            System.out.println("读线程开始遍历");
            Iterator<String> iterator = cowList.iterator();
            while (iterator.hasNext()) {
                String element = iterator.next();
                System.out.println("读取元素: " + element);
                try {
                    Thread.sleep(1000); // 模拟读取耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        // 写线程
        executor.submit(() -> {
            try {
                Thread.sleep(500); // 等待读线程开始遍历
                System.out.println("写线程添加元素");
                cowList.add("元素4");
                System.out.println("写线程修改完成,当前列表大小: " + cowList.size());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        // 关闭线程池
        executor.shutdown();
        executor.awaitTermination(5, TimeUnit.SECONDS);
        // 最终列表内容
        System.out.println("最终列表内容: " + cowList);
    }
}2.3 并发队列性能对比
import java.util.Queue;
import java.util.concurrent.*;
/**
 * 不同并发队列的性能对比测试
 */
public class ConcurrentQueuePerformanceTest {
    private static final int THREADS = 4;
    private static final int OPERATIONS = 100000;
    public static void main(String[] args) {
        testQueue("ConcurrentLinkedQueue", new ConcurrentLinkedQueue<>());
        testQueue("ArrayBlockingQueue", new ArrayBlockingQueue<>(1000));
        testQueue("LinkedBlockingQueue", new LinkedBlockingQueue<>());
        testQueue("SynchronousQueue", new SynchronousQueue<>());
    }
    private static void testQueue(String name, Queue<Integer> queue) {
        ExecutorService executor = Executors.newFixedThreadPool(THREADS);
        long startTime = System.nanoTime();
        // 生产者
        Runnable producer = () -> {
            for (int i = 0; i < OPERATIONS; i++) {
                try {
                    queue.add(i);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        // 消费者
        Runnable consumer = () -> {
            for (int i = 0; i < OPERATIONS; i++) {
                try {
                    queue.poll();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        // 提交任务
        executor.submit(producer);
        executor.submit(consumer);
        executor.submit(producer);
        executor.submit(consumer);
        // 等待完成
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        // 计算耗时
        long duration = (System.nanoTime() - startTime) / 1_000_000;
        System.out.printf("%s: 耗时 %d ms, 最终大小: %d%n", name, duration, queue.size());
    }
}2.4 并发集合在实际项目中的应用
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * 并发集合在实际项目中的应用示例
 * 实现一个线程安全的用户会话管理器
 */
public class ConcurrentSessionManager {
    // 存储用户会话: 用户ID -> 会话对象
    private final Map<String, Session> sessions = new ConcurrentHashMap<>();
    // 存储在线用户ID
    private final Set<String> onlineUsers = new CopyOnWriteArraySet<>();
    /**
     * 创建新会话
     */
    public Session createSession(String userId) {
        // 原子操作: 如果不存在则创建
        Session session = sessions.computeIfAbsent(userId, k -> new Session(userId));
        onlineUsers.add(userId);
        return session;
    }
    /**
     * 获取会话
     */
    public Session getSession(String userId) {
        return sessions.get(userId);
    }
    /**
     * 销毁会话
     */
    public void destroySession(String userId) {
        sessions.remove(userId);
        onlineUsers.remove(userId);
    }
    /**
     * 获取在线用户数
     */
    public int getOnlineUserCount() {
        return onlineUsers.size();
    }
    /**
     * 会话类
     */
    public static class Session {
        private final String userId;
        private final long createTime;
        private long lastAccessTime;
        public Session(String userId) {
            this.userId = userId;
            this.createTime = System.currentTimeMillis();
            this.lastAccessTime = createTime;
        }
        // 省略getter和setter
    }
}三、设计思想
3.1 ConcurrentHashMap实现原理
JDK 8中ConcurrentHashMap的实现:
核心优化点:
- 取消分段锁:使用CAS+synchronized实现更细粒度的同步
- 红黑树转换:链表长度超过阈值(8)时转为红黑树
- volatile可见性:节点值和next指针使用volatile修饰
- 懒加载初始化:首次使用时才初始化数组
- 并发扩容:支持多线程同时参与扩容
3.2 写时复制容器原理
CopyOnWriteArrayList的实现机制:
优缺点分析:
- 优点:读操作无锁,性能高;读多写少场景下效率极高
- 缺点:写操作复制整个数组,内存开销大;数据一致性弱
3.3 非阻塞队列算法
ConcurrentLinkedQueue采用Michael-Scott非阻塞队列算法:
- 使用CAS操作实现无锁入队和出队
- 维护头节点和尾节点的volatile引用
- 允许短暂的不一致状态,但最终会达到一致
- 失败重试机制保证操作最终成功
3.4 并发集合的迭代特性
并发集合的迭代器特性:
- 弱一致性:迭代器可以看到迭代开始时的元素,可能看不到后续修改
- 快速失败:传统集合的迭代器在检测到并发修改时抛出ConcurrentModificationException
- 安全失败:并发集合的迭代器从不抛出ConcurrentModificationException
四、避坑指南
4.1 ConcurrentHashMap常见问题
4.1.1 size()方法的陷阱
问题:ConcurrentHashMap的size()方法返回的不是精确值 解决方案:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 添加元素...
// 错误: 期望精确计数
int size = map.size(); // 可能不是最新值
// 正确: 需要精确计数时使用
long accurateSize = map.mappingCount(); // JDK 8+提供,返回long类型
// 更好的方式: 避免依赖精确计数
// 设计不需要精确知道集合大小的算法4.1.2 原子操作的重要性
问题:错误地组合多个操作,导致竞态条件 解决方案:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 错误: 非原子操作组合
if (map.containsKey("key")) {
    map.put("key", map.get("key") + 1); // 存在竞态条件
}
// 正确: 使用原子方法
map.computeIfPresent("key", (k, v) -> v + 1);
// 或者
map.putIfAbsent("key", 0);
map.compute("key", (k, v) -> v + 1);
// 对于简单计数
map.merge("key", 1, Integer::sum);4.2 CopyOnWriteArrayList使用误区
4.2.1 内存占用问题
问题:在大数据量和频繁修改场景下使用CopyOnWriteArrayList 解决方案:
// 错误: 大数据量频繁修改场景
List<LargeObject> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 100000; i++) {
    list.add(new LargeObject()); // 每次添加都会复制整个数组
}
// 正确: 根据场景选择合适的集合
if (readHeavy && writeRare) {
    // 读多写少场景使用CopyOnWriteArrayList
    List<LargeObject> list = new CopyOnWriteArrayList<>();
} else if (concurrentModification) {
    // 一般并发场景使用ConcurrentLinkedQueue或其他
    Queue<LargeObject> queue = new ConcurrentLinkedQueue<>();
} else {
    // 单线程或低并发场景使用普通ArrayList
    List<LargeObject> list = new ArrayList<>();
    // 手动同步
    Collections.synchronizedList(list);
}4.2.2 迭代器数据一致性
问题:期望迭代器反映最新数据 解决方案:
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.add("b");
Iterator<String> iterator = list.iterator();
list.add("c");
// 迭代器不会看到添加的"c"元素
while (iterator.hasNext()) {
    System.out.println(iterator.next()); // 只输出a和b
}
// 如果需要最新数据,应重新获取迭代器
iterator = list.iterator();
while (iterator.hasNext()) {
    System.out.println(iterator.next()); // 输出a、b和c
}4.3 阻塞队列使用不当
4.3.1 队列容量设置不合理
问题:ArrayBlockingQueue容量设置过小导致频繁阻塞 解决方案:
// 错误: 容量设置过小
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 正确: 根据实际吞吐量设置合理容量
int expectedThroughput = 1000;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(expectedThroughput / 2);
// 或者使用可动态调整的容量策略
BlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 无界队列
// 或者使用饱和策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize, maxPoolSize, keepAliveTime, unit, queue, handler);五、深度思考题
思考题1: ConcurrentHashMap在JDK 7和JDK 8中的实现差异
思考题回答: ConcurrentHashMap在JDK 7和JDK 8中的实现有显著差异:
- 锁机制: - JDK 7: 使用分段锁(Segment),将整个数组分成多个Segment,每个Segment是一个小的HashMap,拥有独立的锁
- JDK 8: 取消分段锁,使用CAS+synchronized实现同步,只锁定链表头节点或红黑树的根节点
 
- 数据结构: - JDK 7: Segment数组 + HashEntry数组 + 链表
- JDK 8: Node数组 + 链表/红黑树,与HashMap结构类似
 
- 并发度: - JDK 7: 并发度由Segment数量决定,默认16,且不可动态调整
- JDK 8: 理论上并发度为数组长度,可动态扩容,并发性能更高
 
- 扩容机制: - JDK 7: 每个Segment独立扩容,扩容粒度大
- JDK 8: 支持多线程并发扩容,每个线程负责一部分桶的迁移
 
- 功能增强: - JDK 8新增了compute(), forEach(), merge()等原子操作方法
- 支持Stream API和Lambda表达式
 
这些改进使JDK 8的ConcurrentHashMap在高并发场景下具有更好的性能和可扩展性。
思考题2: 如何选择合适的并发集合
思考题回答: 选择并发集合应考虑以下因素:
- 数据结构需求: - 键值对存储:ConcurrentHashMap、ConcurrentSkipListMap
- 列表存储:CopyOnWriteArrayList、ConcurrentLinkedQueue
- 队列操作:根据是否阻塞、是否有界、是否需要优先级选择
 
- 操作类型比例: - 读多写少:CopyOnWriteArrayList、CopyOnWriteArraySet
- 读写均衡:ConcurrentHashMap、ConcurrentLinkedQueue
- 写操作频繁:考虑ConcurrentHashMap或阻塞队列
 
- 线程安全需求: - 强一致性:阻塞队列如ArrayBlockingQueue
- 弱一致性:ConcurrentHashMap、ConcurrentLinkedQueue
- 原子操作支持:优先选择提供原子方法的集合
 
- 性能要求: - 高并发读:CopyOnWriteArrayList性能最佳
- 高并发写:ConcurrentHashMap (JDK8+)性能优异
- 有序性需求:ConcurrentSkipListMap/Set提供排序功能
 
- 内存占用: - 大数据量:避免使用CopyOnWrite容器
- 小数据量:CopyOnWrite容器的性能优势更明显
 
选择流程建议:先确定数据结构类型,再根据读写比例和一致性需求选择具体实现,最后进行性能测试验证。
思考题3: 并发集合的弱一致性实现原理
思考题回答: 并发集合的弱一致性(weakly consistent)是指:
- 定义:迭代器可以看到迭代开始时集合的状态,可能看不到迭代过程中的修改,但不会抛出ConcurrentModificationException 
- 实现原理: - ConcurrentHashMap:迭代器遍历的是数组的快照,对于已遍历的节点,即使发生修改也不会影响迭代结果
- ConcurrentLinkedQueue:迭代器保存了初始节点引用,后续修改不会影响已创建的迭代器
- CopyOnWriteArrayList:迭代器基于创建时的数组快照,后续修改操作会创建新数组,不影响旧迭代器
 
- 实现机制: - 使用volatile变量保证节点引用的可见性
- 不使用快速失败机制(fail-fast),避免抛出ConcurrentModificationException
- 迭代过程中不加锁,允许并发修改
- 通过内存屏障或原子操作保证部分操作的有序性
 
- 优缺点: - 优点:迭代过程中不需要加锁,性能高;不会抛出异常中断迭代
- 缺点:可能读取到过期数据;无法保证数据的实时一致性
 
弱一致性是并发性能和数据一致性之间的权衡,适用于可以接受短暂数据不一致的高并发场景。
思考题4: 如何实现一个自定义的并发安全集合
思考题回答: 实现自定义并发安全集合可采用以下方案:
- 基于现有并发集合封装:
public class CustomConcurrentSet<T> {
    private final ConcurrentHashMap<T, Boolean> backingMap;
    public CustomConcurrentSet() {
        this.backingMap = new ConcurrentHashMap<>();
    }
    public boolean add(T element) {
        return backingMap.putIfAbsent(element, Boolean.TRUE) == null;
    }
    public boolean remove(T element) {
        return backingMap.remove(element) != null;
    }
    public boolean contains(T element) {
        return backingMap.containsKey(element);
    }
    // 其他方法...
}- 基于锁机制实现:
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
public class LockBasedConcurrentList<T> {
    private final List<T> list = new ArrayList<>();
    private final ReentrantLock lock = new ReentrantLock();
    public void add(T element) {
        lock.lock();
        try {
            list.add(element);
        } finally {
            lock.unlock();
        }
    }
    public T get(int index) {
        lock.lock();
        try {
            return list.get(index);
        } finally {
            lock.unlock();
        }
    }
    // 其他方法...
}- 基于CAS操作实现:
import java.util.concurrent.atomic.AtomicReference;
public class CASBasedStack<T> {
    private static class Node<T> {
        T value;
        Node<T> next;
        Node(T value, Node<T> next) {
            this.value = value;
            this.next = next;
        }
    }
    private final AtomicReference<Node<T>> top = new AtomicReference<>();
    public void push(T value) {
        Node<T> newNode;
        Node<T> currentTop;
        do {
            currentTop = top.get();
            newNode = new Node<>(value, currentTop);
        } while (!top.compareAndSet(currentTop, newNode));
    }
    public T pop() {
        Node<T> currentTop;
        Node<T> newTop;
        do {
            currentTop = top.get();
            if (currentTop == null) {
                return null;
            }
            newTop = currentTop.next;
        } while (!top.compareAndSet(currentTop, newTop));
        return currentTop.value;
    }
}实现要点:
- 选择合适的基础数据结构
- 根据并发需求选择同步机制(锁、CAS或两者结合)
- 实现原子操作避免竞态条件
- 考虑迭代器的一致性保证
- 提供适当的性能优化(如锁粒度控制、无锁设计)
- 进行充分的并发测试验证线程安全性
