actor

更新时间:2022-11-24 15:53:48 阅读: 评论:0


2022年11月24日发(作者:烟波什么淼)

Akka的Actor模型及使⽤实例

本⽂的绝⼤部分内容转载⾃这⼀blog,⽼外写的东西就是好啊。

ACTORS介绍

Anyonewhohasdonemultithreadinginthepastwon'tdenyhowhardandpainfulitistomanagemultithreadedapplications.I

saidmanagebecauitstartsoutsimpleandr,it

acheswhenyouethatyoudon'thaveaeasierwaytorecoverfromerrorsinyoursub-tasksORthozombiebugsthatyoufindhardto

reproduceORwhenyourprofilershowsthatyourthreadsarespendingalotoftimeblockingwastefullybeforewritingtoasharedstate.

IprefernottotalkabouthowJavaconcurrencyAPIandtheircollectionsmadeitbetterandeasierbecauIamsureifyouarehere,you

probablyneededmorecontroloverthesub-tasksorsimplybecauyoudon'tliketowritelocksandsynchronizedblocksandwouldprefer

ahigherlevelofabstraction.

InthisriesofAkkaNotes,wewouldgothroughsimpleAkkaexamplestoexplorethevariousfeaturesthatwehaveinthetoolkit.

WHATAREACTORS?

whodon'sttalkthroughmails.

Let'xpandonthatabit.

ING

dentndsamaileverymorningtotheTeacherandthewiTeachernds

awiquoteback.

Pointstonote:

nt,themailcouldn'outnaturalimmutability.

chercheckshismailboxwhenhewishestodoso.

cheralsondsamailback(immutableagain).

dentchecksthemailboxathisowntime.

dentdoesn'twaitforthereply.(noblocking)

ThatprettymuchsumsupthebasicblockoftheActorModel-passingmessages.

RENCY

Now,imaginethereare3wisppensthen?Nothing

tlepointtonotehereisthis:

Bydefault,Mailsinthemailboxareread/procesdintheordertheyarrived.

Internally,cenobodywaitsforthemailtobepickedup,itissimplyanon-blockingmessage.(Thereare

,wecouldbuildoneourlftoo)

ER

Imaginethe3teachersarefromthreedifferentdepartments-History,GeographyandPhilosophy.

HistoryteachersreplieswithanoteonanEventinthepast,GeographyteachersndsanInterestingPlaceandPhilosophyteachers,a

dentdoesntcarewhichteacherinthedepartmentnds

oneday,ateacherfallssick?

ca,anotherteacherinthedepartmentstepsupanddoesthejob.

Pointstonote:

ouldbeapoolofActorswhodoesdifferentthings.

dn'hcaanewActorcould

atively,theActorcouldjustignorethatoneparticularmessageandproceedwiththerestofthe

recalledDirectivesandwe'lldiscussthemlater.

ASKING

Foratwist,let'sassumethateachoftheteachersalsondtheexamscorethroughmailtoo,rly,anthe

Actorcouldhandlemorethanonetypeofmessagecomfortably.

NG

Whatifthestudentwouldliketogetonlyonefinalconsolidatedtriviamailinsteadofthree?

'llcomebacktothatlaterwhenwetalkabout

SupervisorsandrevisitthesamethoughtwhenwetalkaboutFutures.

AsrequestedbyMohan,let'sjusttrytomaptheanalogycomponentswiththethecomponentsintheActorModel.

uestandtheresponcan'tbe

y,theMessageDispatchercomponentinActormanagesthemailboxandroutesthemessagestothe

respectiveMailbox.

ACTOR消息机制

FromtheoftheAkkaNotes,wesawabird'condpartoftheAkkaNotes,we'lllookatthe

heexample,wewoulduthesameStudent-Teacherexamplethatwediscusdearlier.

InthisfirstpartofActorMessaging,we'llcreatetheTeacherActorandinsteadoftheStudentActor,we'lluamainprogramcalled

StudentSimulatorApp.

REVISITINGSTUDENT-TEACHERINDETAIL

Let'sfornowconsiayStudentSimulatorApp,Ijustmean

anormalmainprogram.

Thepictureconveysthis:

(ifthetermsareoverwhelming,don'tworry,we'llgothroughthemindetail)

tcreatessomethingcalledanActorSystem

teRequestmessageisnttotheActorRef(aproxyto

TeacherActor)

efpassthemessagealongtoaDispatcher

patcherenqueuesthemessageinthetargetActor'sMailBox.

patcherthenputstheMailboxonaThread(moreonthatinthenextction).

lBoxdequeuesamessageandeventuallydelegatesthattotheactualTeacherActor'sreceivemethod.

LikeIsaid,don''comebackandrevisitthefivestepsoncewearedone.

THESTUDENTSIMULATORAPPPROGRAM

WewoulduthisStudentSimulatorApptobringuptheJVMandinitializetheActorSystem.

Asweunderstandfromthepicture,theStudentSimulatorApp

sanActorSystem

eActorSystemtocreateaproxytotheTeacherActor(ActorRef)

heQuoteRequestmessagetotheproxy.

Let'sfocusonthethreepointsalonenow.

nganActorSystem

shutdowntheentireActor

environment.

Ontheotherendofthespectrum,allActors-

meaning,ucreateanActorusingtheActorSystem'sactorOfmethod,youcreateanActorjustbelowthe

ActorSystem.

ThecodeforinitializingtheActorSystemlookslike

valsystem=ActorSystem("UniversityMessageSystem")

TheUniversityMessageSystemissimplyacutenameyougivetoyourActorSystem.

ngaProxyforTeacherActor?

Let'sconsiderthefollowingsnippet:

valteacherActorRef:ActorRef=f(Props[TeacherActor])

,asyoucane,itdoesn'rns

somethingoftypeActorRef.

ActorModel'swayofavoidingdirect

accesstoanycustom/privatemethodsorvariablesintheTeacherActororanyActorforthatsake.

Torepeat,younNEVERtalktoyourActordirectly.

Peoplewillhateyoutodeathifyoufindsomemeanwaystodothat.

uoteRequesttotheProxy

It'lmethodinActorisactually!.(there'salsoatellmethod

inActorRefwhichjustdelegatesthecallbackto!)

//ndamessagetotheTeacherActor

teacherActorRef!QuoteRequest

That'sit

IfyouthinkIamlying,checktheentirecodeoftheStudentSimulatorAppbelow:

sg1

ystem

ef2Scala

rProtocol._

objectStudentSimulatorAppextendsApp{

//InitializetheActorSystem

valactorSystem=ActorSystem("UniversityMessageSystem")

//constructtheTeacherActorRef

valteacherActorRef=f(Props[TeacherActor])

//ndamessagetotheTeacherActor

teacherActorRef!QuoteRequest

//Let'swaitforacoupleofcondsbeforeweshutdownthesystem

(2000)

//ShutdowntheActorSystem.

wn()

}

Well,'llhavetoshutdowntheActorSystemorotherwi,makingthemain

thread''llwrite

someneattestcasinthenextpartinordertoavoidthishack.

THEMESSAGE

WejusttoldaQuoteRequesttotheActorRefbutwedidn'tethemessageclassatall!!

Hereitcomes:

(Itisarecommendedpracticetowrapyourmessagesinaniceobjectforeasierorganization)

TeacherProtocol

ols

objectTeacherProtocol{

caclassQuoteRequest()

caclassQuoteRespon(quoteString:String)

}

Asyouknow,orwouldrespondbackwithaQuoteRespon.

DISPATCHERANDAMAILBOX

Thehehood,whilewecreatedtheActorSystemand

theActorRef,'sewhattheyareabout.

MailBox

EverActorhasoneMailBox(we'lleonespecialcalater).Perouranalogy,cherhasto

rworld,it'stheotherwayround-themailbox,whenitgetsachanceustheActorto

accomplishitswork.

AlsothemailboxhasaqueuetostoreandprocessthemessagesinaFIFOfashion-alittledifferentfromourregularinboxwherethemost

latestistheoneatthetop.

Now,thedispatcher

elooksofit,theDispatcherjustgetsthemessagefromtheActorRefandpassitontothe

re'soneamazingthinghappeningbehindthescenes:

elooksofit,theDispatcherjustgetsthemessagefromtheActorRefandpassitontothe

re'soneamazingthinghappeningbehindthescenes:

TheDispatcherwrapsan(ForkJoinPoolorThreadPoolExecutor).ItexecutestheMailBoxagainstthisExecutorService.

Checkoutthissnippetfromthe

protected[akka]overridedefregisterForExecution(mbox:Mailbox,...):Boolean={

...

try{

executorServiceexecutembox

...

}

⽰例代码

这是⼀套⽤于计算熵增益的代码,⽤在相似度检测和内容推荐领域,主要思想是通过MasterActor派发计算任务给WorkerActor完成计算任

务后,发消息通知Master完成计算结果的综合⼯作,最后结果的展⽰由listenerActor完成。

EntropyMain

y;

;

ef;

ystem;

;

er;

;

ate;

publicclassEntropyMain

{

privatestaticLoggerlogger=ger();

/**

*启动的⼯作线程数

*/

privatestaticfinalintWORKER_COUNT=10;

publicstaticvoidmain(String[]args)

{

("begintoexecuteentorpyforrecommand");

calculate(WORKER_COUNT);

("executeentorpyforrecommandfinished.");

}

/**

*⼯作线程

*/

privatestaticvoidcalculate(intnumOfWorkers)

{

//创建Akka系统

ActorSystemsystem=("Entropy-calc");

//结果监听器,⽤于结果输出

ActorReflistener=f((),

"listener");

//启动actor,驱动计算开始

ActorRefmaster=f(

(,numOfWorkers,listener),"master");

//发送启动消息

(newCalculate(),master);

}

}

Master

;

ist;

tions;

;

it;

tionUtils;

;

on;

ef;

;

dActor;

obinRouter;

ate;

;

Entropy;

;

y;

ilityCalcService;

ilityCalcServiceImpl;

@SuppressWarnings("deprecation")

publicclassMasterextendsUntypedActor

{

privatestaticLoggerlogger=ger();

/**

*返回的结果数量,⽤于判断计算是否完成

*/

privateintnumOfResults;

/**

*⽤于将结果传递到外部

*/

privateActorReflistener;

/**

*执⾏器

*/

privateActorRefworkerRouter;

/**

*内容key

*/

privateListcontentKeys=null;

/**

*特征key

*/

privateListitemKeys=null;

/**

*中间结果集合

*/

privateListmiddleResult=newArrayList();

/**

*计算开始时间

*/

privatefinallongstart=tTimeMillis();

/**

*计算服务

*/

privateProbabilityCalcServicepcService=newProbabilityCalcServiceImpl();

/**

*

*@paramnrOfWorkers

*表⽰要启动多少worker

*@paramnrOfElements

*分配给每个worker的分块数量

*@paramlistener

*/

publicMaster(intnrOfWorkers,ActorReflistener)

{

tKeys=Codes();

ys=ms();

er=listener;

workerRouter=text().actorOf(

().withRouter(

newRoundRobinRouter(nrOfWorkers)));

}

@Override

publicvoidonReceive(Objectmessage)throwsException

{

if(messageinstanceofCalculate)

{

("receivecalculatemesssage.");

if(mpty(itemKeys)

&&mpty(contentKeys))

{

longcount=0L;

for(Stringitem:itemKeys)

{

count++;

("ndmessagetoworker,item="+item+","

+count+"/"+());

(newWork(item,contentKeys),getSelf());

}

}

el

{

("stopprocess,nodatatocalculate.");

//停⽌整个系统

getContext().system().shutdown();

}

}

elif(messageinstanceofResult)

{

Resultresult=(Result)message;

numOfResults+=1;

("receiveresultmesssagefromworker,item="

+m()+","+numOfResults+"/"

+());

//内存存⼊缓存

(newEntropy(m(),ue()));

//计算完毕

if(numOfResults==())

{

("calculatefinished,begintosorttheresult.");

//发送消息到listener

Durationduration=(tTimeMillis()

-start,ECONDS);

//对输出结果进⾏派讯

(middleResult);

("ndSortedEntropymessagetolistener.");

(newSortedEntropy(middleResult,duration),

getSelf());

//停⽌master以及其控制的actor

getContext().stop(getSelf());

}

}

el

{

unhandled(message);

}

}

}

Worker

;

imal;

;

;

dActor;

;

;

ilityCalcService;

ilityCalcServiceImpl;

thm;

/**

*计算某个item的熵增益

*/

publicclassWorkerextendsUntypedActor

{

privatestaticLoggerlogger=ger();

/**

*计算结果保留位数

*/

privatestaticfinalintDECIMAL_SIZE=4;

privateProbabilityCalcServicepcService=newProbabilityCalcServiceImpl();

@Override

publicvoidonReceive(Objectmessage)throwsException

{

if(messageinstanceofWork)

{

Workwork=(Work)message;

("receiveworkmessagefrommaster,item="

+m());

doubleresult=calculateEntropyForItem(m(),

KeyList());

("ndresultmessagetomaster,item="+m()

+",value="+result);

getSender().tell(newResult(result,m()),getSelf());

}

el

{

unhandled(message);

}

}

/**

*计算对某个item的熵增益

*

*@paramitem

*要计算的短语名称

*@paramcntKeys

*系统内全部的cnt

*@return熵增益

*/

privatedoublecalculateEntropyForItem(Stringitem,ListcntCodes)

{

DoubleentropyIncr=0.0;

//中间计算逻辑,计算出对某个item的熵增益并返回

if(cntCodes!=null&&!y())

{

Doublex=tProbability();

Doubley=cProbability(item);

for(StringcntCode:cntCodes)

{

Doublez=dProbability(cntCode,item);

if(isParamLegal(x,y,z))

{

Doubletemp=calculateByFormular(x,y,z);

if(temp!=null&&!nite()&&!())

{

entropyIncr=entropyIncr+temp;

}

el

{

("calculateectropyerror,item="+item

+",cntCode="+cntCode+",x="+x+",y="

+y+",z="+z);

}

}

}

}

returnformat(entropyIncr,DECIMAL_SIZE);

}

/**

*计算值

*/

privatedoublecalculateByFormular(Doublex,Doubley,Doublez)

{

Doublea1=-x*2(x);

Doublea2=z*x*2((z*x)/y);

Doublea3=(1-z)*x*2(((1-z)*x)/(1-y));

returna1+a2+a3;

}

privatebooleanisParamLegal(Doublex,Doubley,Doublez)

{

booleanres=fal;

if(x!=null&&y!=null&&z!=null)

{

res=true;

}

el

{

("parametererror,cancelthisround,x="+x+",y="

+y+",z="+z);

}

returnres;

}

/**

*计算结果完整的返回⽤于调整算法

*/

privateDoubleformat(Doubleorg,intdecimalSize)

{

try

{

BigDecimalb=newBigDecimal(org);

Doublef=le(decimalSize,_HALF_UP)

.doubleValue();

returnf;

}

catch(Exceptione)

{

//TODOAuto-generatedcatchblock

tackTrace();

}

return0.0;

}

}

Listener

;

rmat;

DateFormat;

;

;

dActor;

Entropy;

il;

/**

*结果输出actor

*/

publicclassListenerextendsUntypedActor

{

privatestaticLoggerlogger=ger();

@Override

publicvoidonReceive(Objectmessage)throwsException

{

if(messageinstanceofSortedEntropy)

{

SortedEntropysortedEntropy=(SortedEntropy)message;

//结果输出⾄⽂件系统

2File(getOutputPath(),orpyList());

("outputresultfilefinished,takes"

+ation().toSeconds()+"s.");

getContext().system().shutdown();

}

el

{

unhandled(message);

}

}

privateStringgetOutputPath()

{

DateFormatdf=newSimpleDateFormat("yyyyMMddHHmmss");

Stringdate=(newDate());

return"./entropy_"+date+".out";

}

}

使⽤总结

Akka确实可以简化多线程编程,开发⼈员可以从繁复的多线程开发中解脱出来,更加专注于业务实现,这种抽象适合绝⼤多数应⽤。

本文发布于:2022-11-24 15:53:48,感谢您对本站的认可!

本文链接:http://www.wtabcd.cn/fanwen/fan/90/12787.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

上一篇:century
标签:actor
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图