Striped64
是一个抽象类,继承了 Number
类。它是 Java 并发包中的一部分,用于支持多线程环境下的原子性操作。
在 Striped64
中,使用 int
数组 base
和 CounterCell
数组 cells
来存储计数器的值。其中,base
数组存储未被分配到 CounterCell
的线程的计数器值,而 cells
数组则存储已经被分配到 CounterCell
的线程的计数器值。通过使用数组来存储计数器值,可以减少锁的竞争,从而提高并发性能。
Striped64
实现了一些基本的原子操作,例如加、减、乘、除等。它内部实现使用了无锁算法(CAS),保证了操作的原子性和线程安全性。
Number
是 Java 中的一个抽象类,它是所有数字类型的父类,包括整数类型和浮点类型。Number
中定义了一系列抽象方法,包括将数字转换为特定数据类型的方法,例如 intValue()
、longValue()
、floatValue()
和 doubleValue()
等。
由于 Striped64
继承了 Number
类,因此它也具有将数字转换为特定类型的方法,可以方便地将计数器的值转换为 int、long、float 或 double 类型。
示例代码:
java
import java.util.concurrent.atomic.AtomicInteger;public abstract class Striped64 extends Number {private static final long serialVersionUID = -1182697631144950088L;// base 数组用于存储未分配到 CounterCell 的线程的计数器值volatile long base;// cells 数组用于存储已经分配到 CounterCell 的线程的计数器值volatile transient CounterCell[] cells;// 使用无锁算法添加 delta 到 base 或 cells 中指定下标的计数器值final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); // 强制初始化 ThreadLocalRandomh = getProbe();wasUncontended = true;}boolean collide = false; // 如果 hash 冲突,即当前线程在 cells 中对应的元素已被占用,值为 truefor (;;) {CounterCell[] as;CounterCell a;int n;long v;if ((as = cells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { // 当前没有其他线程修改 cells 数组,尝试为当前线程分配 CounterCellCounterCell r = new CounterCell(x);if (cellsBusy == 0 && casCellsBusy()) { // 成功获取 cellsBusy 锁boolean created = false;try {CounterCell[] rs;int m, j;if ((rs = cells) != null && (m = rs.length) > 0&& rs[j = (m - 1) & h] == null) { // 仍然没有其他线程修改 cells 数组,将 CounterCell 设置到 cells 数组中rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created) {break;}continue; // 竞争失败,继续循环}}collide = false;} else if (!wasUncontended) {wasUncontended = true;} else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) { // 使用 CAS 更新计数器值break;} else if (n >= NCPU || cells != as) { // 由于 hash 冲突,在 cells 中对应的元素已被占用,尝试随机获取一个空闲的元素collide = false;} else if (!collide) { // 记录当前线程在 cells 数组中对应的元素已被占用collide = true;} else if (cellsBusy == 0 && casCellsBusy()) { // 成功获取 cellsBusy 锁try {if (cells == as) { // double checkCounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i) {rs[i] = as[i];}cells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // 竞争失败,继续循环}h = advanceProbe(h);} else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 成功获取 cellsBusy 锁boolean init = false;try {if (cells == as) { // double checkCounterCell[] rs = new CounterCell[2];rs[h & 1] = new CounterCell(x);cells = rs;init = true;}} finally {cellsBusy = 0;}if (init) {break;}} else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) { // 通过 CAS 更新 base 中的计数器值break;}}}// 获取当前线程对应的 hash 值,用于将计数器添加到 cells 数组中指定下标的位置static final int getProbe() {return UNSAFE.getInt(Thread.currentThread(), PROBE);}// 随机化当前线程的 hash 值,用于避免不同线程的索引值冲突static final int advanceProbe(int probe) {probe ^= probe << 13; // xorshiftprobe ^= probe >>> 17;probe ^= probe << 5;UNSAFE.putInt(Thread.currentThread(), PROBE, probe);return probe;}// AtomicLong 中用于控制对 cells 数组的竞争static final long CELLSBUSY = 0x00000000L;volatile long cellsBusy;// 获取对 cells 数组的独占锁,避免多线程修改 cells 数组时发生竞争final boolean casCellsBusy() {return UNSAFE.compareAndSwapLong(this, CELLSBUSY, 0, 1);}// 计算集合中所有 CounterCell 的值的总和final long sumCells() {CounterCell[] as = cells;long sum = base;if (as != null) {int n = as.length;for (int i = 0; i < n; ++i) {CounterCell a = as[i];if (a != null) {sum += a.value;}}}return sum;}
}// 定义 CounterCell 类,用于存储已经被分配到的线程的计数器值
class CounterCell {private static final Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = getUnsafe();Class<?> ak = CounterCell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}// 使用 CAS 更新计数器值final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}volatile long value;CounterCell(long x) {value = x;}// Unsafe 相关操作private static Unsafe getUnsafe() {try {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);return (Unsafe) theUnsafe.get(null);} catch (Exception e) {throw new Error(e);}}
}
在上述代码中,Striped64
实现了 longAccumulate
方法,用于实现多线程环境下的原子操作。其中使用了无锁算法(CAS)来保证线程安全,并且继承自 Number
类,具有将计数器值转换为各种数值类型的方法。另外,定义了 CounterCell
类,用于存储已经被分配到的线程的计数器值。