两种常⽤的并发模型:CSP和Actor
概述
现如今的机器⼤都是多核的CPU架构,为了充分利⽤计算机的资源,我们要了解⼀些并发编程的思想。
⼤家应该都了解传统的并发编程模式,多线程编程。
形容女生好的词语传统的多线程编程实际上是使⽤的ShreadMemory的⽅式来推动程序的前进。
为什么说new⼀个thread的⽅式是共享内存呢?
有并发的地⽅就有竞争,传统多线程的并发模式使⽤locks(锁),condition variable(条件变量)等同步原语来强制规定了进程的推进顺序,⽽这些同步原语本质上都是使⽤了在各个线程都可见的锁来实现,有⼀种全局变量的味道。(少数由硬件指令直接⽀持的除外,例如atomic_int。那么除了直接控制thread,使⽤shared memory之外我们还有什么别的并发模型吗?
答案是有的~
今天我要分享的CSP和Actor模型都是基于消息传递的。(Message Passing)
CSP
CSP的是Communicating Sequential Process (CSP)的缩写,翻译成中⽂是顺序通信进程,不过这个名字⽐较拗⼝,下⽂将⽤CSP来代替。CSP的核⼼思想是多个线程之间通过Channel来通信(对应到golang中的chan结构),有点像是管道的概念。(Pipe)
Actor
Actor模式有⼀点类似⾯向对象模型,世界上所有的东西都被命名为Actor。
单个Actor会拥有⼀些状态,⽐如为名字是cat的Actor可能被描述为:
name : cat
age : 3
type : British shorthair
color : white
....
到此为⽌好像和对象object没有什么不同,但是Actor不会给外界提供任何的⾏为接⼝.
⽐如cat.Move()这是很⾃然的⾯向对象的写法,在Actor是不被允许的。
每⼀个Actor的属性绝不对外暴露,想和外界进⾏通信必须发送message,所以每个Actor⾃⾝都有⼀个邮箱。
⽆论是CSP还是Actor模型,他们都完完全全贯彻了⼀句⾄理名⾔:
Don't communicate by sharing memory; share memory by communicating. (R. Pike)
CSP :Goroutine
golang中的goroutine我们可以理解为是⼀个thread,但是它⾮常精简,调度的开销也⾮常⼩。
goroutine之间的通信使⽤名为chan的数据结构,对应CSP模型中的channel。
goroutine的使⽤很简单,仅需要关键词go即可启动⼀个goroutine,不同的goroutine之间使⽤channel进⾏通讯,这都是很基本的⽤法,就不背语法书了。
关于goroutine,还是想多聊⼀聊golang是如何调度goroutine的。
Go的调度器内部有三个重要的结构:M,P,S。
M:代表真正的内核OS线程,真正⼲活的⼈
G:代表⼀个goroutine,它有⾃⼰的栈,指令集信息(要执⾏的指令)和其他信息(正在等待的channel等等),调度的单位。
P:代表调度的上下⽂,可以把它看做⼀个局部的调度器,使go代码在某个M上跑
当go运⾏起来的时候,⼤致是下图的样⼦:
简单解释⼀下:
图中的两个M表⽰当前go运⾏的机器上有两个线程可以让我们使⽤。
P是goland的调度上下⽂,它必须运⾏在某个M上,实际上就类似:
new Thread(P.run())
P上挂着好⼏个goroutine :
蓝⾊的G表⽰当前P正在调度的goroutine,此时相当于把蓝⾊的G扔到了M上。
灰⾊的G表⽰正在等待P调度的goroutine,轮到他们的时候就会被扔到M上。(如何公平的调度灰⾊的G?)⽬前来看,P好像是多余的,我直接想办法把G扔到M上就好了,为何还要有⼀个P这样的中间层的。
当然P很有必要,如果某个线程M被阻塞了,那么P可以被扔到其他的线程上,去合理的调度它挂着的G。
这个图表⽰的就是M0被阻塞了,转⽽把P挂到了M1上。
可以看到M0此时有⼀个G0在运⾏,这是啥意思…
我们在具体⼀点吧:
G0
n = read(fd, buf, size)
print(n)
G0之前在P上挂着,开⼼的在M0上运⾏(蓝⾊的G0),此时G0调⽤了系统调⽤read,显然G0被阻塞了。golang为了充分利⽤cpu资源,不让那些被排队的goroutine浪费时间,P挂到了M1上。
那么问题来了,如果read完了,系统调⽤结束,那么该打印n了,可是这时候没有P了,怎么办?
苹果8尺寸golang会维护⼀个全局队列,把G0放到⼀个全局队列了,然后M0就去sleep了。
所有的P会周期性的检查全局队列⾥的G,否则这些G就都饿死了。
由此可见,P是很有必要的。
还有很神奇的⼀点,P可以“偷“任务。
北京烟花爆竹考虑⼀种情况:
P1⾝上的G不巧都是和⽹络IO有关,运⾏的⾮常缓慢。
P2⾝上的G都是⼀些⽐较快的运算函数,他很快就完成了,那么他就会尝试从P1那⾥偷⼀些G来运⾏。channel的结构
type hchan struct {
qcount uint // 缓存当前包含数据个数
dataqsiz uint // 缓存容量
buf unsafe.Pointer // 缓存指针,缓存是⼀个循环数组,数据⼤⼩为dataqsiz
elemsize uint16 //
clod uint32 // channel开关状态
elemtype *_type // 数据类型
ndx uint // nd index
recvx uint // receive index
recvq waitq // list of recv waiters(阻塞的接受goroutine队列)
ndq waitq // list of nd waiters(阻塞的发送goroutine队列)
lock mutex
}
// 阻塞goroutine队列
type waitq struct {
first *sudog
last *sudog
}
会字组词/
/ goroutine的封装(公⽤数据结构)
type sudog struct {
g *g
lectdone *uint32 // CAS to 1 to win lect race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // 发送goroutine为发送变量地址,接收goroutine反之
......
}
这⾥可能有点抽象,上⼀个图更直观⼀点:
ch := make(chan int , 10 ) ,对应的runtime中的实现是:
func makechan(t *chantype, size int64) *hchan
该函数的主要作⽤就是从堆上分配内存,当然和c的malloc的有些不同,可以⼤略看⼀下:
var c *hchan
switch {
ca size == 0 || elem.size == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector us this location for synchronization.
c.buf = unsafe.Pointer(c)
}
注意使⽤的是mallocgc,go会在合适的时候gc掉这块内存
channel的读写
var val int = 666
ch := make(chan int , 10)
ch <- val
这是我们在go中向channel写⼊数据的结构的⽅法,对应到:
func channd(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
参数c就是实际对应的chan
参数ep是变量val的地址
0.各种参数的有效性的校验
1.获取channel上的锁,如果是已经关闭的channel,会释放锁否则继续逻辑:
lock(&c.lock)
if c.clod != 0 {
unlock(&c.lock)
panic(plainError("nd on clod channel"))
}
2.获取channel的recv队列,dequeue出的sg就是上⽂中的sudog,向等待的这个channel的goroutine会发送刚才的val(在这⾥就是ep)
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to nd
// directly to the receiver, bypassing the channel buffer (if any).
nd(c, sg, ep, func() { unlock(&c.lock) }, 3) // 发送val给sg(goroutine),sg在等待c
return true
}
钝角三角形面积公式3. 如果没有等待该channel的goroutine,看⼀下channel的剩余缓存是不是够⼤,如果可以,把数据放进去
if c.qcount < c.dataqsiz { //缓存中的数据个数 < 缓存的容量
// Space is available in the channel buffer. Enqueue the element to nd.
qp := chanbuf(c, c.ndx)
if raceenabled {
统计员
raceacquire(qp)
racerelea(qp)
}
typedmemmove(c.elemtype, qp, ep)//把数据放进去
c.ndx++
if c.ndx == c.dataqsiz {
c.ndx = 0
}计算机公式
c.qcount++
楷树unlock(&c.lock)
return true
}