摘要:本文学习了Atomic原子操作类常用的类和方法。
环境
Windows 10 企业版 LTSC 21H2
Java 1.8
1 原子操作类
原子操作类指的是java.util.concurrent.atomic
包中的类,可以分成六种:
- 普通类型原子类:AtomicInteger,AtomicBoolean,AtomicLong,AtomicReference。
- 数组类型原子类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray。
- 复合类型原子类:AtomicStampedReference,AtomicMarkableReference。
- 对象属性原子类:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater。
- 累加操作原子类:DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder。
- 累加操作基础类:Striped64,Number。
1.1 普通类型原子类
AtomicInteger,AtomicBoolean,AtomicLong,AtomicReference是操作标量类型的原子类,其内部实现使用volatile关键字和native方法,从而避免了synchronized的高开销。
场景:
- AtomicInteger和AtomicLong用于保证整数类型操作的线程安全。
- AtomicBoolean可以作为中断标识,用于停止线程。
- AtomicReference用于保证引用类型操作的线程安全,也可以封装多个共享变量,保证多个共享变量操作的线程安全。
使用AtomicInteger完成多线程自增操作:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Demo { public AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) throws InterruptedException { Demo demo = new Demo(); CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <= 10; i++) { new Thread(() -> { try { for (int j = 1; j <= 100; j++) { demo.addPlusPlus(); } } finally { countDownLatch.countDown(); } }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "获取到的result:" + demo.get()); }
public void addPlusPlus() { atomicInteger.incrementAndGet(); }
public int get() { return atomicInteger.get(); } }
|
1.2 数组类型原子类
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray是操作数组类型的原子类,对数组提供了支持,其内部没有维持volatile变量,而是全部由native方法实现。
场景:
- AtomicIntegerArray和AtomicLongArray用于保证整数类型数组操作的线程安全。
- AtomicReferenceArray用于保证引用类型数组操作的不安全问题。
1.3 复合类型原子类
AtomicStampedReference,AtomicMarkableReference是复合类型的原子类,将某种值和引用关联起来。
场景:
- AtomicMarkableReference将单个布尔值与引用关联起来,维护带有标记位的对象引用,可以原子更新带有标记位的引用类型。
- AtomicStampedReference将整数值与引用关联起来,维护带有版本号的对象引用,可以原子更新带有版本号的引用类型,可以解决使用CAS出现的ABA问题。
使用AtomicStampedReference类进行更新:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class Demo { public static void main(String[] args) { AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(0, 0); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "进入"); int stamp = atomicStampedReference.getStamp(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "将0更新为1:" + atomicStampedReference.compareAndSet(0, 1, stamp, stamp + 1)); }, "t1").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "进入"); int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "将0更新为1:" + atomicStampedReference.compareAndSet(0, 1, stamp, stamp + 1)); }, "t2").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "进入"); int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "将1更新为0:" + atomicStampedReference.compareAndSet(1, 0, stamp, stamp + 1)); }, "t3").start(); } }
|
1.4 对象属性原子类
AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater是操作对象属性的原子类,基于反射的原理,可以提供对关联字段类型的访问,用于对类中的volatile字段进行原子操作。
场景:
- AtomicIntegerFieldUpdater和AtomicLongFieldUpdater用于解决多线程环境下整数类型属性操作的不安全问题。
- AtomicReferenceArray用于解决多线程环境下引用类型属性操作的不安全问题。
要求:
- 更新的属性必须使用volatile修饰符。
- 因为操作对象属性的原子类都是抽象类,所以每次使用都必须使用静态方法
newUpdater()
创建一个更新器,并且需要设置类和属性。
优势:
- 使用AtomicInteger获取结果值需要调用
get()
方法,但是AtomicIntegerFieldUpdater获取属性值可以通过对象获取,减少性能消耗。
- 使用AtomicIntegerFieldUpdater作为类的静态成员,多个对象可以共同使用,但是AtomicInteger对象不允许多个对象共同使用,造成资源浪费。
使用AtomicIntegerFieldUpdater完成多线程自增操作:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class Demo { public static AtomicIntegerFieldUpdater atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Demo.class, "count"); public volatile int count = 0;
public static void main(String[] args) throws InterruptedException { Demo demo = new Demo(); CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <= 10; i++) { new Thread(() -> { try { for (int j = 1; j <= 100; j++) { demo.addPlusPlus(); } } finally { countDownLatch.countDown(); } }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "获取到的result:" + demo.get()); }
public void addPlusPlus() { atomicIntegerFieldUpdater.incrementAndGet(this); }
public int get() { return count; } }
|
1.5 累加操作相关类
DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder是累加操作的原子类,是JDK1.8引进的并发新技术,可以看做AtomicLong和AtomicDouble的部分加强类型。
Striped64,Number是累加操作的基础类,是JDK1.8引进的并发新技术,是累加操作原子类的父类。
对比:
- 在低并发的场景下,AtomicLong和LongAdder的性能相似,并且AtomicLong提供了更加丰富的功能。
- 在高并发的场景下,多个线程同时进行自旋操作,会出现大量失败并不断自旋的场景,此时AtomicLong的自旋会成为瓶颈,所以LongAdder具有更好的性能,但是代价是消耗更多的内存空间。
缺点:
- LongAdder只提供了加减法操作,功能过于单一,更多地用于收集统计数据,而不是细粒度的同步控制。
- LongAdder的
sum()
方法并不精确,所以不能保证强一致性(在任何时刻查询到的都是最近更新的数据),只能保证最终一致性(最终更新的数据都会被查询到)。
原理:
- LongAdder类和DoubleAdder类都继承自Striped64类,其底层代码调用来自于Striped64类的
longAccumulate()
方法和doubleAccumulate()
方法。
- Striped64类的基本思路是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样发生冲突的概率就小很多。如果要获取整体的value值,只要将各个槽中的变量值累加返回即可。
2 分散热点
在多线程高并发的情况下,为了避免自旋锁带来极大的性能开销,在Striped64类中使用了分散热点机制。
多线程自旋争抢的是value值的修改权,可以将value值看做热点,通过将value值分散到数组中实现热点分散,使用类似于哈希碰撞的机制,让每个线程通过计算得到数组中的位置,各个线程只对自己位置的value值进行CAS操作,降低了自旋发生争抢的概率。
Striped64类的计数方法在ConcurrentHashMap中也有使用,ConcurrentHashMap中的baseCount对应着Striped64中的base变量,而counterCells则对应着Striped64中的cells数组,他们的实现是一样的。
3 生产者消费者
使用原子操作类实现生产者消费者:
java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| public class Demo { public static void main(String[] args) throws InterruptedException { Resource r = new Resource(new ArrayBlockingQueue<>(3)); new Thread(() -> { try { r.product(); } catch (InterruptedException e) { e.printStackTrace(); } }, "生产者").start(); new Thread(() -> { try { r.consume(); } catch (InterruptedException e) { e.printStackTrace(); } }, "消费者").start(); Thread.sleep(5000); r.stop(); } }
class Resource { private volatile boolean state = true; private AtomicInteger count = new AtomicInteger(0); private BlockingQueue<String> queue;
public Resource(BlockingQueue<String> queue) { this.queue = queue; }
public void product() throws InterruptedException { String data; while (state) { data = count.incrementAndGet() + ""; boolean result = queue.offer(data, 2L, TimeUnit.SECONDS); if (result) { System.out.println(Thread.currentThread().getName() + " product " + data); } else { System.out.println(Thread.currentThread().getName() + " product 超时"); } Thread.sleep(1000); } System.out.println(Thread.currentThread().getName() + "停止"); }
public void consume() throws InterruptedException { String data; while (state) { data = queue.poll(2L, TimeUnit.SECONDS); if (data == null || data.equals("")) { System.out.println(Thread.currentThread().getName() + " consume 超时"); } else { System.out.println(Thread.currentThread().getName() + " consume " + data); } Thread.sleep(1000); } System.out.println(Thread.currentThread().getName() + "停止"); }
public void stop() { state = false; } }
|
条