Akka的Actor模型及使⽤实例
本⽂的绝⼤部分内容转载⾃这⼀blog,⽼外写的东西就是好啊。
ACTORS介绍
Anyonewhohasdonemultithreadinginthepastwon'tdenyhowhardandpainfulitistomanagemultithreadedapplications.I
saidmanagebecauitstartsoutsimpleandr,it
acheswhenyouethatyoudon'thaveaeasierwaytorecoverfromerrorsinyoursub-tasksORthozombiebugsthatyoufindhardto
reproduceORwhenyourprofilershowsthatyourthreadsarespendingalotoftimeblockingwastefullybeforewritingtoasharedstate.
IprefernottotalkabouthowJavaconcurrencyAPIandtheircollectionsmadeitbetterandeasierbecauIamsureifyouarehere,you
probablyneededmorecontroloverthesub-tasksorsimplybecauyoudon'tliketowritelocksandsynchronizedblocksandwouldprefer
ahigherlevelofabstraction.
InthisriesofAkkaNotes,wewouldgothroughsimpleAkkaexamplestoexplorethevariousfeaturesthatwehaveinthetoolkit.
WHATAREACTORS?
whodon'sttalkthroughmails.
Let'xpandonthatabit.
ING
dentndsamaileverymorningtotheTeacherandthewiTeachernds
awiquoteback.
Pointstonote:
nt,themailcouldn'outnaturalimmutability.
chercheckshismailboxwhenhewishestodoso.
cheralsondsamailback(immutableagain).
dentchecksthemailboxathisowntime.
dentdoesn'twaitforthereply.(noblocking)
ThatprettymuchsumsupthebasicblockoftheActorModel-passingmessages.
RENCY
Now,imaginethereare3wisppensthen?Nothing
tlepointtonotehereisthis:
Bydefault,Mailsinthemailboxareread/procesdintheordertheyarrived.
Internally,cenobodywaitsforthemailtobepickedup,itissimplyanon-blockingmessage.(Thereare
,wecouldbuildoneourlftoo)
ER
Imaginethe3teachersarefromthreedifferentdepartments-History,GeographyandPhilosophy.
HistoryteachersreplieswithanoteonanEventinthepast,GeographyteachersndsanInterestingPlaceandPhilosophyteachers,a
dentdoesntcarewhichteacherinthedepartmentnds
oneday,ateacherfallssick?
ca,anotherteacherinthedepartmentstepsupanddoesthejob.
Pointstonote:
ouldbeapoolofActorswhodoesdifferentthings.
dn'hcaanewActorcould
atively,theActorcouldjustignorethatoneparticularmessageandproceedwiththerestofthe
recalledDirectivesandwe'lldiscussthemlater.
ASKING
Foratwist,let'sassumethateachoftheteachersalsondtheexamscorethroughmailtoo,rly,anthe
Actorcouldhandlemorethanonetypeofmessagecomfortably.
NG
Whatifthestudentwouldliketogetonlyonefinalconsolidatedtriviamailinsteadofthree?
'llcomebacktothatlaterwhenwetalkabout
SupervisorsandrevisitthesamethoughtwhenwetalkaboutFutures.
AsrequestedbyMohan,let'sjusttrytomaptheanalogycomponentswiththethecomponentsintheActorModel.
uestandtheresponcan'tbe
y,theMessageDispatchercomponentinActormanagesthemailboxandroutesthemessagestothe
respectiveMailbox.
ACTOR消息机制
FromtheoftheAkkaNotes,wesawabird'condpartoftheAkkaNotes,we'lllookatthe
heexample,wewoulduthesameStudent-Teacherexamplethatwediscusdearlier.
InthisfirstpartofActorMessaging,we'llcreatetheTeacherActorandinsteadoftheStudentActor,we'lluamainprogramcalled
StudentSimulatorApp.
REVISITINGSTUDENT-TEACHERINDETAIL
Let'sfornowconsiayStudentSimulatorApp,Ijustmean
anormalmainprogram.
Thepictureconveysthis:
(ifthetermsareoverwhelming,don'tworry,we'llgothroughthemindetail)
tcreatessomethingcalledanActorSystem
teRequestmessageisnttotheActorRef(aproxyto
TeacherActor)
efpassthemessagealongtoaDispatcher
patcherenqueuesthemessageinthetargetActor'sMailBox.
patcherthenputstheMailboxonaThread(moreonthatinthenextction).
lBoxdequeuesamessageandeventuallydelegatesthattotheactualTeacherActor'sreceivemethod.
LikeIsaid,don''comebackandrevisitthefivestepsoncewearedone.
THESTUDENTSIMULATORAPPPROGRAM
WewoulduthisStudentSimulatorApptobringuptheJVMandinitializetheActorSystem.
Asweunderstandfromthepicture,theStudentSimulatorApp
sanActorSystem
eActorSystemtocreateaproxytotheTeacherActor(ActorRef)
heQuoteRequestmessagetotheproxy.
Let'sfocusonthethreepointsalonenow.
nganActorSystem
shutdowntheentireActor
environment.
Ontheotherendofthespectrum,allActors-
meaning,ucreateanActorusingtheActorSystem'sactorOfmethod,youcreateanActorjustbelowthe
ActorSystem.
ThecodeforinitializingtheActorSystemlookslike
valsystem=ActorSystem("UniversityMessageSystem")
TheUniversityMessageSystemissimplyacutenameyougivetoyourActorSystem.
ngaProxyforTeacherActor?
Let'sconsiderthefollowingsnippet:
valteacherActorRef:ActorRef=f(Props[TeacherActor])
,asyoucane,itdoesn'rns
somethingoftypeActorRef.
ActorModel'swayofavoidingdirect
accesstoanycustom/privatemethodsorvariablesintheTeacherActororanyActorforthatsake.
Torepeat,younNEVERtalktoyourActordirectly.
Peoplewillhateyoutodeathifyoufindsomemeanwaystodothat.
uoteRequesttotheProxy
It'lmethodinActorisactually!.(there'salsoatellmethod
inActorRefwhichjustdelegatesthecallbackto!)
//ndamessagetotheTeacherActor
teacherActorRef!QuoteRequest
That'sit
IfyouthinkIamlying,checktheentirecodeoftheStudentSimulatorAppbelow:
sg1
ystem
ef2Scala
rProtocol._
objectStudentSimulatorAppextendsApp{
//InitializetheActorSystem
valactorSystem=ActorSystem("UniversityMessageSystem")
//constructtheTeacherActorRef
valteacherActorRef=f(Props[TeacherActor])
//ndamessagetotheTeacherActor
teacherActorRef!QuoteRequest
//Let'swaitforacoupleofcondsbeforeweshutdownthesystem
(2000)
//ShutdowntheActorSystem.
wn()
}
Well,'llhavetoshutdowntheActorSystemorotherwi,makingthemain
thread''llwrite
someneattestcasinthenextpartinordertoavoidthishack.
THEMESSAGE
WejusttoldaQuoteRequesttotheActorRefbutwedidn'tethemessageclassatall!!
Hereitcomes:
(Itisarecommendedpracticetowrapyourmessagesinaniceobjectforeasierorganization)
TeacherProtocol
ols
objectTeacherProtocol{
caclassQuoteRequest()
caclassQuoteRespon(quoteString:String)
}
Asyouknow,orwouldrespondbackwithaQuoteRespon.
DISPATCHERANDAMAILBOX
Thehehood,whilewecreatedtheActorSystemand
theActorRef,'sewhattheyareabout.
MailBox
EverActorhasoneMailBox(we'lleonespecialcalater).Perouranalogy,cherhasto
rworld,it'stheotherwayround-themailbox,whenitgetsachanceustheActorto
accomplishitswork.
AlsothemailboxhasaqueuetostoreandprocessthemessagesinaFIFOfashion-alittledifferentfromourregularinboxwherethemost
latestistheoneatthetop.
Now,thedispatcher
elooksofit,theDispatcherjustgetsthemessagefromtheActorRefandpassitontothe
re'soneamazingthinghappeningbehindthescenes:
elooksofit,theDispatcherjustgetsthemessagefromtheActorRefandpassitontothe
re'soneamazingthinghappeningbehindthescenes:
TheDispatcherwrapsan(ForkJoinPoolorThreadPoolExecutor).ItexecutestheMailBoxagainstthisExecutorService.
Checkoutthissnippetfromthe
protected[akka]overridedefregisterForExecution(mbox:Mailbox,...):Boolean={
...
try{
executorServiceexecutembox
...
}
⽰例代码
这是⼀套⽤于计算熵增益的代码,⽤在相似度检测和内容推荐领域,主要思想是通过MasterActor派发计算任务给WorkerActor完成计算任
务后,发消息通知Master完成计算结果的综合⼯作,最后结果的展⽰由listenerActor完成。
EntropyMain
y;
;
ef;
ystem;
;
er;
;
ate;
publicclassEntropyMain
{
privatestaticLoggerlogger=ger();
/**
*启动的⼯作线程数
*/
privatestaticfinalintWORKER_COUNT=10;
publicstaticvoidmain(String[]args)
{
("begintoexecuteentorpyforrecommand");
calculate(WORKER_COUNT);
("executeentorpyforrecommandfinished.");
}
/**
*⼯作线程
*/
privatestaticvoidcalculate(intnumOfWorkers)
{
//创建Akka系统
ActorSystemsystem=("Entropy-calc");
//结果监听器,⽤于结果输出
ActorReflistener=f((),
"listener");
//启动actor,驱动计算开始
ActorRefmaster=f(
(,numOfWorkers,listener),"master");
//发送启动消息
(newCalculate(),master);
}
}
Master
;
ist;
tions;
;
it;
tionUtils;
;
on;
ef;
;
dActor;
obinRouter;
ate;
;
Entropy;
;
y;
ilityCalcService;
ilityCalcServiceImpl;
@SuppressWarnings("deprecation")
publicclassMasterextendsUntypedActor
{
privatestaticLoggerlogger=ger();
/**
*返回的结果数量,⽤于判断计算是否完成
*/
privateintnumOfResults;
/**
*⽤于将结果传递到外部
*/
privateActorReflistener;
/**
*执⾏器
*/
privateActorRefworkerRouter;
/**
*内容key
*/
privateList
/**
*特征key
*/
privateList
/**
*中间结果集合
*/
privateList
/**
*计算开始时间
*/
privatefinallongstart=tTimeMillis();
/**
*计算服务
*/
privateProbabilityCalcServicepcService=newProbabilityCalcServiceImpl();
/**
*
*@paramnrOfWorkers
*表⽰要启动多少worker
*@paramnrOfElements
*分配给每个worker的分块数量
*@paramlistener
*/
publicMaster(intnrOfWorkers,ActorReflistener)
{
tKeys=Codes();
ys=ms();
er=listener;
workerRouter=text().actorOf(
().withRouter(
newRoundRobinRouter(nrOfWorkers)));
}
@Override
publicvoidonReceive(Objectmessage)throwsException
{
if(messageinstanceofCalculate)
{
("receivecalculatemesssage.");
if(mpty(itemKeys)
&&mpty(contentKeys))
{
longcount=0L;
for(Stringitem:itemKeys)
{
count++;
("ndmessagetoworker,item="+item+","
+count+"/"+());
(newWork(item,contentKeys),getSelf());
}
}
el
{
("stopprocess,nodatatocalculate.");
//停⽌整个系统
getContext().system().shutdown();
}
}
elif(messageinstanceofResult)
{
Resultresult=(Result)message;
numOfResults+=1;
("receiveresultmesssagefromworker,item="
+m()+","+numOfResults+"/"
+());
//内存存⼊缓存
(newEntropy(m(),ue()));
//计算完毕
if(numOfResults==())
{
("calculatefinished,begintosorttheresult.");
//发送消息到listener
Durationduration=(tTimeMillis()
-start,ECONDS);
//对输出结果进⾏派讯
(middleResult);
("ndSortedEntropymessagetolistener.");
(newSortedEntropy(middleResult,duration),
getSelf());
//停⽌master以及其控制的actor
getContext().stop(getSelf());
}
}
el
{
unhandled(message);
}
}
}
Worker
;
imal;
;
;
dActor;
;
;
ilityCalcService;
ilityCalcServiceImpl;
thm;
/**
*计算某个item的熵增益
*/
publicclassWorkerextendsUntypedActor
{
privatestaticLoggerlogger=ger();
/**
*计算结果保留位数
*/
privatestaticfinalintDECIMAL_SIZE=4;
privateProbabilityCalcServicepcService=newProbabilityCalcServiceImpl();
@Override
publicvoidonReceive(Objectmessage)throwsException
{
if(messageinstanceofWork)
{
Workwork=(Work)message;
("receiveworkmessagefrommaster,item="
+m());
doubleresult=calculateEntropyForItem(m(),
KeyList());
("ndresultmessagetomaster,item="+m()
+",value="+result);
getSender().tell(newResult(result,m()),getSelf());
}
el
{
unhandled(message);
}
}
/**
*计算对某个item的熵增益
*
*@paramitem
*要计算的短语名称
*@paramcntKeys
*系统内全部的cnt
*@return熵增益
*/
privatedoublecalculateEntropyForItem(Stringitem,List
{
DoubleentropyIncr=0.0;
//中间计算逻辑,计算出对某个item的熵增益并返回
if(cntCodes!=null&&!y())
{
Doublex=tProbability();
Doubley=cProbability(item);
for(StringcntCode:cntCodes)
{
Doublez=dProbability(cntCode,item);
if(isParamLegal(x,y,z))
{
Doubletemp=calculateByFormular(x,y,z);
if(temp!=null&&!nite()&&!())
{
entropyIncr=entropyIncr+temp;
}
el
{
("calculateectropyerror,item="+item
+",cntCode="+cntCode+",x="+x+",y="
+y+",z="+z);
}
}
}
}
returnformat(entropyIncr,DECIMAL_SIZE);
}
/**
*计算值
*/
privatedoublecalculateByFormular(Doublex,Doubley,Doublez)
{
Doublea1=-x*2(x);
Doublea2=z*x*2((z*x)/y);
Doublea3=(1-z)*x*2(((1-z)*x)/(1-y));
returna1+a2+a3;
}
privatebooleanisParamLegal(Doublex,Doubley,Doublez)
{
booleanres=fal;
if(x!=null&&y!=null&&z!=null)
{
res=true;
}
el
{
("parametererror,cancelthisround,x="+x+",y="
+y+",z="+z);
}
returnres;
}
/**
*计算结果完整的返回⽤于调整算法
*/
privateDoubleformat(Doubleorg,intdecimalSize)
{
try
{
BigDecimalb=newBigDecimal(org);
Doublef=le(decimalSize,_HALF_UP)
.doubleValue();
returnf;
}
catch(Exceptione)
{
//TODOAuto-generatedcatchblock
tackTrace();
}
return0.0;
}
}
Listener
;
rmat;
DateFormat;
;
;
dActor;
Entropy;
il;
/**
*结果输出actor
*/
publicclassListenerextendsUntypedActor
{
privatestaticLoggerlogger=ger();
@Override
publicvoidonReceive(Objectmessage)throwsException
{
if(messageinstanceofSortedEntropy)
{
SortedEntropysortedEntropy=(SortedEntropy)message;
//结果输出⾄⽂件系统
2File(getOutputPath(),orpyList());
("outputresultfilefinished,takes"
+ation().toSeconds()+"s.");
getContext().system().shutdown();
}
el
{
unhandled(message);
}
}
privateStringgetOutputPath()
{
DateFormatdf=newSimpleDateFormat("yyyyMMddHHmmss");
Stringdate=(newDate());
return"./entropy_"+date+".out";
}
}
使⽤总结
Akka确实可以简化多线程编程,开发⼈员可以从繁复的多线程开发中解脱出来,更加专注于业务实现,这种抽象适合绝⼤多数应⽤。
本文发布于:2022-11-24 15:53:48,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/90/12787.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |