LongAdder简单解析
前⾯介绍了原⼦类,接下来我们看看JDK8中新增的LongAdder
LongAdder
1.思想
1.⽆竞争只更新ba的值
2.有竞争时,采⽤分段思想,每个线程都映射到不同的Cell中去更新
最终结果是ba+各个⼦cell的值
因此当竞争激烈时,对⼀个值进⾏修改,冲突的概率很⾼,需要不断CAS,导致⼤量的时间在⾃旋,但如果采⽤分段思想将⼀个值分为多个
值,分散压⼒,性能就会变⾼。
publiclongsum(){
Cell[]as=cells;Cella;
longsum=ba;
if(as!=null){
for(inti=0;i<;++i){
if((a=as[i])!=null)
sum+=;
}
}
returnsum;
}
2.源码分析
publicclassLongAdderextendsStriped64implementsSerializable{
publicLongAdder(){
}
}
可以看到LongAdder继承了Striped64
LongAdder中的ba、Cell类、longAccumulate⽅法都是在Striped64中定义。
2.1Striped64
属性和内部类
//使⽤Contended注解修饰,解决了value的伪共享问题
@dedstaticfinalclassCell{
//volatile修饰的值,就是要存储的long类型的值
volatilelongvalue;
Cell(longx){value=x;}
//CAS更新value
finalbooleancas(longcmp,longval){
eAndSwapLong(this,valueOfft,cmp,val);
}
//Unsafemechanics
UNSAFE;
//value的内存偏移地址
privatestaticfinallongvalueOfft;
static{
try{
//获取unsafe实例
UNSAFE=afe();
Class<?>ak=;
//将value的内存偏移地址保存到valueOfft
valueOfft=FieldOfft
(laredField("value"));
}catch(Exceptione){
thrownewError(e);
}
}
}
/**CPUS的数量,要限制表⼤⼩*/
staticfinalintNCPU=time().availableProcessors();
/**
*当存在过竞争后,每个Cell数组每个元素存储各个段位的值
*如果为⾮空,说明存在过竞争,且⼤⼩为2的幂,
*/
transientvolatileCell[]cells;
/**
*基本值,主要在没有争⽤时使⽤,也⽤作表初始化过程中的后备。通过CAS更新。
*/
transientvolatilelongba;
/**
*⽤于初始化和调整表的⼤⼩或创建单元时使⽤的⾃旋锁(通过CAS锁定)。
⽆需阻塞锁;当锁不可⽤时,线程会尝试其他cell或ba。
*/
transientvolatileintcellsBusy;
⼏个属性的内存偏移地址:
//Unsafemechanics
UNSAFE;
privatestaticfinallongBASE;
privatestaticfinallongCELLSBUSY;
privatestaticfinallongPROBE;
static{
try{
UNSAFE=afe();
Class<?>sk=;
BASE=FieldOfft
(laredField("ba"));
CELLSBUSY=FieldOfft
(laredField("cellsBusy"));
Class<?>tk=;
PROBE=FieldOfft
(laredField("threadLocalRandomProbe"));
}catch(Exceptione){
thrownewError(e);
}
}
PROBE通过ThreadLocalRandom维护的“线程”探针字段⽤作每个线程的哈希码。
LongAdder⽅法
add
publicvoidadd(longx){
Cell[]as;longb,v;intm;Cella;
if((as=cells)!=null//数组⾮空,说明存在多线程竞争
||!casBa(b=ba,b+x)){//2.如果cells为空,则就cas更新ba值,如果失败,说明ba已经被另外的线程动过
//3.字⾯义,为true说明没有竞争,为fal就是多个线程在竞争操作
booleanuncontended=true;
//满⾜if的条件,都要调⽤longAccumulate处理
if(as==null//为空,即上⾯cas更新ba失败
||(m=-1)<0//长度⼩于1,emm跟4差不多意思
||(a=as[getProbe()&m])==null||//6.当前线程维护的cell为空
!(uncontended=(v=,v+x)))//更新当前线程维护的cell的值,如果更新失败,即uncontented为fal,说明有多个线程在竞争
longAccumulate(x,null,uncontended);
}
}
getProbe()
获取当前线程的hash码,就是该线程要维护Cell数组的哪个元素。
staticfinalintgetProbe(){
(tThread(),PROBE);
}
longAccumulate()
该⽅法在Striped64中
finalvoidlongAccumulate(longx,LongBinaryOperatorfn,
booleanwasUncontended){
inth;
/*
类注释中有说明,若当前cell的probe没有初始化时,为0值,就要将其初始化为尽量不与其他对象冲突的值
*/
if((h=getProbe())==0){
//调⽤ThreadLocalRandom初始化
t();//forceinitialization
//初始化后重新获取probe
h=getProbe();
//probe是0,那就说明是没有竞争,置为true
//probe是0,那就说明是没有竞争,置为true
wasUncontended=true;
}
//
booleancollide=fal;//Trueiflastslotnonempty
for(;;){
Cell[]as;Cella;intn;longv;
if((as=cells)!=null&&(n=)>0){//cells已经初始化
if((a=as[(n-1)&h])==null){//当前线程维护的cell段未初始化
if(cellsBusy==0){//为0时,说明没有线程获取该锁去创建cell或者在扩容cells,则以要增加的值x作为当前cell维护的值,尝试创建新的cell
Cellr=newCell(x);//Optimisticallycreate
if(cellsBusy==0&&
casCellsBusy()){//cellBusy为0时,尝试CAS更新其为1,即尝试获取该⾃选锁
//是否成功创建cell
booleancreated=fal;
try{//Recheckunderlock
Cell[]rs;intm,j;
if((rs=cells)!=null&&//cells⾮空
(m=)>0&&
rs[j=(m-1)&h]==null){//且当前cell为空
rs[j]=r;//将新的cell让在该位置
created=true;//创建成功
}
}finally{
//重置为0,释放该⾃选所
cellsBusy=0;
}
//如果创建成功,就break退出循环
if(created)
break;
//如果创建失败,即当前cell⾮空则继续
continue;
}
}
//没有出现竞争,不需要扩容
collide=fal;
}
elif(!wasUncontended)//前⾯add⽅法cas更新value失败
wasUncontended=true;//重置为true,后⾯会重hash,做简单⾃旋
elif((v=,((fn==null)?v+x:
sLong(v,x))))//再次对当前线程维护的cell更新值,作累加,CAS成功就退出
break;
elif(n>=NCPU//cells数组长度超过了CPU核数
||cells!=as)//或cells数组已经扩容
collide=fal;//置为fal,表明不要扩容,就不会⾛到下⾯那个elif
elif(!collide)//cas失败且没有满⾜上⾯的条件,说明有竞争
collide=true;//置为true,这样下次就可以直接进⼊下⼀个elif进⾏扩容
elif(cellsBusy==0&&casCellsBusy()){//尝试获取锁
try{
if(cells==as){//如果还没有其他线程扩容,则进⾏扩容2倍
Cell[]rs=newCell[n<<1];
for(inti=0;i
rs[i]=as[i];
cells=rs;
}
}finally{
//释放锁
cellsBusy=0;
}
//竞争解决
collide=fal;
continue;//Retrywithexpandedtable
}
//更新probe值
h=advanceProbe(h);
}
}
//cells为空,因此要初始化数组,cas获取cellsBusy锁
elif(cellsBusy==0&&cells==as&&casCellsBusy()){
//是否初始化成功
booleaninit=fal;
try{//Initializetable
if(cells==as){
Cell[]rs=newCell[2];
rs[h&1]=newCell(x);
cells=rs;
//初始化成功
init=true;
}
}finally{
//释放锁
cellsBusy=0;
}
//初始化成功则退出
if(init)
break;
}
//CAS获取cellsBusy锁失败,说明其他线程在更新当前线程维护的cell或扩容cells,则cas更新ba值,更新成功则退出
elif(casBa(v=ba,((fn==null)?v+x:
sLong(v,x))))
break;//Fallbackonusingba
}
}
流程:
1.如果add中cas更新ba失败,说明存在竞争,则在该⽅法中先通过probe判断当前线程的hash是否为0,如果为0则强制初始化
probe;此时cells为空,尝试获取锁对cells进⾏初始化,并将新值的x放在该cell中,退出
2.如果初始化为cas获取cellsBusy锁失败,说明有线程在创建cell或进⾏初始化,则尝试更新ba,如果更新成功则退出,否则继续循
环。
3.如果cells数组已经初始化,若当前线程维护的cell未初始化,则尝试获取锁创建该cell对象,值为新增的x,如果创建成功则退出;否
则其他线程已经创建,继续循环
4.否则如果当前线程维护的cell已经创建,则CAS更新,如果成功则退出;否则说明有多个线程在竞争,对当前线程重hash后继续
5.4失败后会再进⾏⼀次CAS更新,如果还是失败,就会进⾏尝试获取锁扩容(上⼀次已经将扩容标识置为true),扩容⼤⼩时2倍(n
<<1),扩容成功会重新计算线程的hash,再次进⾏CAS更新
本文发布于:2022-12-07 11:54:34,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/88/59418.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |