问题
- LongAdder 的实现原理?
- LongAdder 与 AtomicLong 差别?
- LongAdder 是强一致性还是最终一致性的?
- LongAdder 中的Cell 数组是无限制扩容的吗?
- LongAdder 是如何消除伪共享的?
简介
LongAdder 类是jdk1.8新增的原子类,在多线程环境下,它的性能比普通的Atomic类性能高很多, 继承 Striped64,通过Striped64的Cell来实现功能,并且在ConcurrentHashMap中也用了Striped64的Cell。
源码分析
实现原理
在初始无竞争时,只更新base的值,当有多线程竞争时通过
分段的思想
,让不同的线程更新不同的段,最后把这些段相加(sum)就得到了完整的LongAdder存储的值,最终一致性。
内部类
// Striped64中的内部类,使用@sun.misc.Contended注解,说明里面的值消除伪共享。
@sun.misc.Contended static final class Cell {
//每个cell中的值,在LongAdder中就是每次添加的值。使用volatile 来确保内存可见
volatile long value;
//构造器
Cell(long x) { value = x; }
//cas修改value的值
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe 实力 在静态代码块中实例化
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
主要属性
//这些属性都是在Striped64中的
//CPU数
static final int NCPU = Runtime.getRuntime().availableProcessors();
//cells数组,存储各个段的值,Cell的数组长度最大是CPU * 2 在下面的#longAccumulate方法中会有体现
transient volatile Cell[] cells;
//初始值,如果没有线程修改的话,就会直接通过CAS来修改这个值,不会创建cell
transient volatile long base;
//标记是否有线程在扩容或者创建Cell,会通过cas来更新该值
transient volatile int cellsBusy;
add() 方法
public void add(long x) {
Cell[] as; //as 是 Striped64中的cells属性 存储各个段的值
long b, v; //b是Striped64中的base属性 最初的base属性,还没发生竞争; v是当前线程hash到的Cell中存储的值
int m; //m是cells的长度减1,hash时作为掩码使用
Cell a; //a是当前线程hash到的Cell
//条件1: cells不为空说明已经出现过竞争了,cells已经创建了
//条件2: cas修改base值 失败,说明已经被其他线程改过了,出现竞争
if ((as = cells) != null || !casBase(b = base, b + x)) {
//true表示当前竞争还不激烈; false表示竞争激烈,多个线程hash到同一个Cell,可能要扩容
boolean uncontended = true;
//条件1:cells为空,说明正在出现竞争,上面是从条件2过来的
//条件2:从上面的条件2过来,正在出现竞争,还没有创建cell放入数组,所以这个时候会满足此条件
//条件3:当前线程所在的Cell为空,说明当前线程还没有更新过Cell,应初始化一个Cell
// 条件4:更新当前线程所在的Cell失败,说明现在竞争很激烈,多个线程hash到了同一个Cell,应扩容
if (as == null || (m = as.length - 1) < 0 ||
// getProbe()方法返回的是线程中的threadLocalRandomProbe字段
// 它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的
// 除非刻意修改它
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
// 调用Striped64中的方法处理
longAccumulate(x, null, uncontended);
}
}
- 最初无竞争时只更新base
- 直到更新base失败时就是出现了竞争,创建cells数组
- 当多个线程竞争同一个Cell比较激烈时,可能要扩容
longAccumulate 方法
只有当在上面的#add方法出现了竞争,需要创建或者扩容cell的时候才会调用此方法
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
//<1> 获取存储线程的probe值,会通过#ThreadLocalRandom.current()方法随机生成一个,之后就不会改变除非自己手动去修改
if ((h = getProbe()) == 0) {
//如果没有初始化,那么进行初始化
ThreadLocalRandom.current(); // force initialization
h = getProbe();
// 还没初始化,说明竞争的不激烈
wasUncontended = true;
}
//是否出现碰撞
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//<2> 如果Cells数组已经初始化
if ((as = cells) != null && (n = as.length) > 0) {
//<2.1> 当前线程进行hash获取在Cells数组中的值,判断是否为空,如果为空的话,进行初始化当前线程的cell然后塞入Cells数组中
if ((a = as[(n - 1) & h]) == null) {
//判断是否有竞争,其他线程在创建或者扩容Cells
if (cellsBusy == 0) { // Try to attach new Cell
//创建cell
Cell r = new Cell(x); // Optimistically create
//再次判断cellsBusy的值,并且cas的修改 意味着获取锁
if (cellsBusy == 0 && casCellsBusy()) {
//标记是否创建成功
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
//重新获取Cells数组,找到当前线程hash到Cells数组中的位置。如果为null的话,则加入Cells数组中,create标记为true
//note : 这里重新获取Cells数组的原因是因为可能有其他线程已经扩容了或者修改了里面的值
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
//释放锁
cellsBusy = 0;
}
//创建成功直接返回
if (created)
break;
continue; // Slot is now non-empty
}
}
//标记没有发生冲突
collide = false;
}
// 这里对应了<2.1> 如果当前线程hash到Cells数组的位置不为null,并且wasUncontended的值为false 出现了竞争。则更新失败,继续自旋
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 上面的else if 中如果wasUncontended为true 则没有出现竞争,尝试修改当前线程所在的cell的值,成功直接返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 查看是否要进行扩容,因为每次扩容是 * 2 所以当到了cpu的核数就不会在扩容了
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
//如果上上个elseif 修改cell的值失败了 且上个条件不成立 出现了冲突,collide值改为true
else if (!collide)
collide = true;
// 到这里就说明已经出现了冲突了,尝试获取占有锁,并且扩容
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//检查是否有其它线程已经扩容过了
if (cells == as) { // Expand table unless stale
//扩大2倍 旧数组拷贝到新数组中,重新赋值
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁
cellsBusy = 0;
}
//解决了冲突,继续尝试
collide = false;
continue; // Retry with expanded table
}
//更新失败或者达到了CPU核心数,重新生成probe,并重试
h = advanceProbe(h);
}
//<3> 对应<2> Cell数组还没初始化,cas修改cellsBusy的值,相当于是一个独占锁
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//初始化是否成功
boolean init = false;
try { // Initialize table
//检测是否有其它线程初始化过
if (cells == as) {
//初始容量为2
Cell[] rs = new Cell[2];
//当前线程会进行hash然后放置到cell数组中
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//释放锁
cellsBusy = 0;
}
// 初始化成功直接返回
// 因为增加的值已经同时创建到Cell中了
if (init)
break;
}
//<4> 对应<3> 已经有线程在初始化Cell数组了 也就是<3>中的cellsBusy = 1,那么尝试cas更新base。如果成功直接返回
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
总结一下:
- 如果Cells数组未初始化,当前线程会尝试占有cellsBusy锁并创建cells数组;
- 如果当前线程尝试创建Cells数组时,发现有其它线程已经在创建了,就尝试更新base,如果成功就返回;
- 通过线程的probe值找到当前线程应该更新Cells数组中的哪个Cell;
- 如果当前线程所在的Cell未初始化,就尝试占有cellsBusy锁并在相应的位置创建一个Cell;
- 尝试CAS更新当前线程所在的Cell,如果成功就返回,如果失败说明出现冲突;
- 当前线程更新Cell失败后并不是立即扩容,而是尝试更新probe值后再重试一次;
- 如果在重试的时候还是更新失败,就扩容;
- 扩容时当前线程占有cellsBusy锁,并把数组容量扩大到两倍,再迁移原cells数组中元素到新数组中;
- cellsBusy在创建Cells数组、创建Cell、扩容Cells数组三个地方用到;
sum 方法
获取LongAdder中真正存储的值的大小,通过把base和所有段相加得到。
public long sum() {
Cell[] as = cells;
Cell a;
//<1> sum初始等于base
long sum = base;
//<2> 如果Cells不为空
if (as != null) {
//遍历所有的Cell
for (int i = 0; i < as.length; ++i) {
//如果所在的Cell不为空,就把它的value累加到sum中
if ((a = as[i]) != null)
sum += a.value;
}
}
//返回sum
return sum;
}
note:
如果前面已经累加到sum上的Cell的value有修改,没法计算到了。所以说LongAdder是最终一致性
LongAdder VS AtomicLong
public class LongAddrVSAtomicLongTest {
public static void main(String[] args) {
testAtomicLongVSLongAdder(1,10000000);
testAtomicLongVSLongAdder(10,10000000);
testAtomicLongVSLongAdder(20,10000000);
testAtomicLongVSLongAdder(40,10000000);
testAtomicLongVSLongAdder(80,10000000);
}
static void testAtomicLongVSLongAdder(final int threadCount,final int times){
try {
System.out.println("threadCount: " + threadCount + ",times: " + times);
long start = System.currentTimeMillis();
testLongAdder(threadCount,times);
System.out.println("LongAdder elapse: " + (System.currentTimeMillis() - start) + "ms");
long start2 = System.currentTimeMillis();
testAtomicLong(threadCount,times);
System.out.println("Atomic elapse: " + (System.currentTimeMillis() - start2) + "ms");
} catch (Exception e) {
e.printStackTrace();
}
}
static void testLongAdder(final int threadCount,final int times) throws InterruptedException {
LongAdder longAdder = new LongAdder();
List<Thread> list = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
list.add(new Thread(()->{
for (int j = 0; j < times; j++) {
longAdder.add(1);
}
}));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();;
}
}
static void testAtomicLong(final int threadCount,final int times) throws InterruptedException {
AtomicLong atomicLong = new AtomicLong();
List<Thread> list = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
list.add((new Thread(()->{
for (int j = 0; j < times; j++) {
atomicLong.incrementAndGet();
}
})));
}
for (Thread thread : list) {
thread.start();
}
for (Thread thread : list) {
thread.join();
}
}
}
结果:
threadCount: 1,times: 10000000
LongAdder elapse: 219ms
Atomic elapse: 72ms
threadCount: 10,times: 10000000
LongAdder elapse: 232ms
Atomic elapse: 1948ms
threadCount: 20,times: 10000000
LongAdder elapse: 462ms
Atomic elapse: 4113ms
threadCount: 40,times: 10000000
LongAdder elapse: 952ms
Atomic elapse: 9520ms
threadCount: 80,times: 10000000
LongAdder elapse: 1508ms
Atomic elapse: 21143ms
可以看到线程数越多,竞争越多的情况下,对LongAdder的性能没有太大的影响而AtomicInteger则越来越慢。
总结
- LongAdder通过base和cells数组来存储值,无竞争的时候直接cas的修改base的值,出现竞争创建cell使用分段的思想来提交性能
- 不同的线程会hash到不同的cell上去更新,减少了竞争
- LongAdder中Cell的数组最大容量就是当前cpu数
- LongAddr 是最终一致性的。