摘要:本文学习了能在高并发场景下使用的线程安全集合。
环境
Windows 10 企业版 LTSC 21H2
Java 1.8
1 集合中的线程安全问题
1.1 List
示例:
1 | public static void main(String[] args) { |
结果:
1 | Exception in thread "1" Exception in thread "8" java.util.ConcurrentModificationException |
说明出现了并发修改异常。
1.2 Set
示例:
1 | public static void main(String[] args) { |
结果:
1 | Exception in thread "1" Exception in thread "8" java.util.ConcurrentModificationException |
说明出现了并发修改异常。
1.3 Map
示例:
1 | public static void main(String[] args) { |
结果:
1 | Exception in thread "1" Exception in thread "8" java.util.ConcurrentModificationException |
说明出现了并发修改异常。
2 线程安全的集合
JUC下有关Collection的类:
- CopyOnWriteArrayList:实现了List接口,相当于线程安全的ArrayList。
- CopyOnWriteArraySet:继承于AbstractSet类,实现了Set接口,相当于线程安全的HashSet。通过CopyOnWriteArrayList实现。
- ConcurrentSkipListSet:继承于AbstractSet类,实现了NavigableSet接口,相当于线程安全的TreeSet。通过ConcurrentSkipListMap实现。
- ConcurrentLinkedQueue:继承于AbstractQueue类,实现了Queue接口,是单向链表实现的线程安全的无界队列,该队列支持FIFO方式操作元素。
- ConcurrentLinkedDeque:继承于AbstractCollection类,实现了Deque接口,是双向链表实现的线程安全的无界队列,该队列支持FIFO和FILO方式操作元素,相当于线程安全的LinkedList。
- ArrayBlockingQueue:实现了BlockingQueue接口,是数组实现的线程安全的有界阻塞队列。
- LinkedBlockingQueue:实现了BlockingQueue接口,是单向链表实现的线程安全的无界阻塞队列,该队列支持FIFO方式操作元素。
- LinkedBlockingDeque:实现了BlockingDeque接口,是双向链表实现的线程安全的无界阻塞队列,该队列支持FIFO和FILO方式操作元素。
- SynchronousQueue:实现了BlockingQueue接口,是不存储元素的阻塞队列,put操作必须等待take操作,否则不能添加元素并产生中断抛出异常。
- DelayQueue:实现了BlockingQueue接口,是支持延迟获取的阻塞队列。
JUC下有关Map的类:
- ConcurrentHashMap:继承于AbstractMap类,相当于线程安全的HashMap,是线程安全的哈希表。使用数组加链表加红黑树结构和CAS操作实现。
- ConcurrentSkipListMap:继承于AbstractMap类,相当于线程安全的TreeMap,是线程安全的有序的哈希表。通过跳表实现的。
Collections工具类下的方法:
- synchronizedList()方法:将普通的List转换为线程安全的List,通过synchronized同步锁保证对集合的访问是线程安全的。
- synchronizedSet()方法:将普通的Set转换为线程安全的Set,通过synchronized同步锁保证对集合的访问是线程安全的。
- synchronizedMap()方法:将普通的Map转换为线程安全的Map,通过synchronized同步锁保证对集合的访问是线程安全的。
3 典型分析
2.1 CopyOnWriteArrayList
2.1.1 说明
CopyOnWriteArrayList使用写时复制技术,其内部有个volatile修饰的数组用于保存数据,在操作数据时会创建新数组,并将更新后的数据拷贝到新数组中,最后再将该数组赋值给原数组。
通过volatile关键字保证数据修改时的可见性,通过在操作数据前使用Lock互斥锁来保护数据。所以涉及到修改数据的操作,CopyOnWriteArrayList效率很低。
使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。迭代器支持hasNext()
方法和next()
方法等不可变操作,但不支持add()
方法和remove()
方法等可变操作。
2.1.2 源码
2.1.2.1 设置元素
设置元素:
1 | public E set(int index, E element) { |
2.1.2.2 添加元素
添加元素:
1 | public void add(int index, E element) { |
2.1.2.3 删除元素
删除元素:
1 | public E remove(int index) { |
2.2 ConcurrentHashMap
2.2.1 说明
在JDK1.8之前,ConcurrentHashMap使用分段锁机制实现,其最大并发度受Segment的个数限制。
在JDK1.8之后,ConcurrentHashMap使用与HashMap类似的数组配合链表或红黑树的方式实现,加锁采用CAS自旋锁和synchronized可重入锁等机制实现。
2.2.2 属性
使用sizeCtl属性表示标志控制符,这个参数非常重要,出现在ConcurrentHashMap的各个阶段,不同取值的含义:
- 负数:表示正在进行初始化或扩容操作:
- -1表示正在进行初始化操作。
- -N表示正在进行扩容操作:高16位是扩容标识,与数组长度有关,最高位固定为1,低16位减1表示扩容线程数。
- 0:表示数组还未初始化。
- 正数:表示下一次进行扩容的大小,类似于扩容阈值。它的值始终是当前容量的0.75倍,如果数组节点个数大于等于sizeCtl,则进行扩容。
内部类Node中的hash属性:
- 负数:-1表示该节点为转发节点,-2表示该节点为红黑树节点。
- 正数:表示根据key计算得到的hash值。
2.2.3 源码
2.2.3.1 构造方法
需要说明的是,在构造方法里并没有对集合进行初始化操作,而是等到了添加节点的时候才进行初始化,属于懒汉式的加载方式。
另外,loadFactor参数在JDK1.8中不再有加载因子的意义,仅为了兼容以前的版本,加载因子默认为0.75并通过移位运算计算,不支持修改。
同样,concurrencyLevel参数在JDK1.8中不再有多线程运行的并发度的意义,仅为了兼容以前的版本。
构造方法:
1 | // 空参构造器 |
2.2.3.2 初始化方法
集合并不会在构造方法里进行初始化,而是在用到集合的时候才进行初始化,在初始化的同时会设置集合的阈值sizeCtl。
在初始化的过程中为了保证线程安全,总共使用了两步操作:
- 通过CAS原子更新方法将sizeCtl设置为-1,保证只有一个线程执行初始化。
- 线程获取初始化权限后内部进行二次判断,保证只有在未初始化的情况下才能执行初始化。
初始化集合数组,使用CAS原子更新保证线程安全,使用volatile保证顺序和可见性:
1 | private final Node<K,V>[] initTable() { |
2.2.3.3 添加节点
添加节点的代码逻辑:
- 校验数据,判断传入一个key和value是否为空,如果为空就直接报错。ConcurrentHashMap是不可为空的,HashMap是可以为空的。
- 初始化数组,判断数组是否为空,如果为空就执行初始化方法。
- 插入或更新节点,如果数组插入位置的节点为空就通过CAS操作插入节点,如果数组正在扩容就执行协助扩容方法,如果产生哈希碰撞就找到节点并更新节点或者插入节点。
- 插入节点后,链表节点判断是否要转为红黑树节点,并且需要增加容量并判断是否需要扩容。
添加节点:
1 | final V putVal(K key, V value, boolean onlyIfAbsent) { |
2.2.3.4 修改容量
修改容量时,使用的是分散热点机制,借鉴了LongAdder类的add()
方法以及Striped64类的longAccumulato()
方法。
修改容量后需要判断是否要执行扩容方法,支持多个线程同时扩容。
修改容量:
1 | private final void addCount(long x, int check) { |
2.2.3.5 协助扩容
协助扩容的代码逻辑:
- 判断集合数组不为空,并且节点是转发节点,并且转发节点的子节点不为空,如果不成立则不需要扩容。
- 循环判断是否扩容成功,如果没有就进行协助扩容,并增加sizeCtl的值,扩容结束后会减少sizeCtrl的值。
协助扩容:
1 | final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { |
2.2.3.6 扩容方法
扩容方法的核心思路是根据CPU内核数将原数组分成多个区间,每个线程都领取一个区间,对区间内的每个根节点进行扩容:
- 计算区间长度,根据CPU核心数平均分配给每个CPU相同大小的区间,如果不够16个,默认就是16个。
- 创建扩容后数组,有且只能由一个线程构建一个新数组。
- 双层循环完成扩容,外层for循环处理区间节点,内层使用while循环处理扩容区间。
- 处理区间节点时,判断如果完成扩容则返回跳出for循环,否则判断节点状态。如果是空节点和处理过的节点则设置为转发节点,否则对要扩容的节点使用高低节点进行拆分。
- 处理扩容区间时,线程会循环获取所有待处理区间,直至完成扩容,修改while循环标志位跳出循环。
- 当线程完成扩容后,会返回线程并减少sizeCtl的值,但最后一个线程返回时,更新数组和阈值。
扩容方法:
1 | private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { |
2.2.3.7 获取方法
根据指定的键,返回对应的键值对,由于是读操作,所以不涉及到并发问题,步骤如下:
- 判断key对应数组位置上的节点是否为空,为空则返回空表示没有找到,不为空则继续判断。
- 如果位置节点是转发节点或者红黑树节点,执行相应节点的查询方法,如果位置节点是链表节点,遍历链表节点并查询。
获取节点:
1 | public V get(Object key) { |
2.2.3.8 删除节点
删除节点可以看成是替换操作。
删除节点:
1 | final V replaceNode(Object key, V value, Object cv) { |
2.2.3.9 统计容量
ConcurrentHashMap中baseCount用于保存数组中节点个数,但是并不准确,因为多线程同时增删改,会导致baseCount修改失败,此时会将节点个数存储于counterCells数组内。
当需要统计节点个数的时候,除了要统计baseCount之外,还需要加上counterCells中的每个counterCell的值。
值得一提的是即使如此,统计出来的依旧不是当前数组中节点的准确值,在多线程环境下统计前后并不能暂停线程操作,因此无法保证准确性。
统计集合容量:
1 | public int size() { |
统计节点数量,即baseCount和counterCells元素个数的总和:
1 | final long sumCount() { |
条