PubSub——“发布订阅”模式
订阅者(Sub)通过SUBSCRIBE 命令和PSUBSCRIBE命令向redis 服务订阅频道(channel),当发布者通过PUBLISH 命令向chinnel发布命令时,订阅该频道的客户端都会受到此消息。
##PUB/SUB 机制
三个客户端都订阅channel1频道
当有新消息通过PUBLISH命令发布到channel1时,这个消息会被发送给订阅这个频道的客户端。
redis中的pubsub机制:
⼀个Redis client发布消息,其他多个redis client订阅消息,发布的消息“即发即失”,redis不会持久保存发布的消息;消息订阅者也将只能得到订阅之后的消息,通道中此前的消息将⽆从获得。
消息发布者,即publish客户端,⽆需独占链接,你可以在publish消息的同时,使⽤同⼀个redis-client链接进⾏其他操作(例如:INCR 等)
消息订阅者,即subscribe客户端,需要独占链接,即进⾏subscribe期间,redis-client⽆法穿插其他操作,
此时client以阻塞的⽅式等待“publish端”的消息;因此这⾥subscribe端需要使⽤单独的链接,甚⾄需要在额外的线程中使⽤。
moss
Tcp默认连接时间固定,如果在这时间内sub端没有接收到pub端消息,或pub端没有消息产⽣,sub端的连接都会被强制回收,这⾥就需要使⽤特殊⼿段解决,⽤定时器来模拟pub和sub之间的保活机制,定时器时间不能超过TCP最⼤连接时间,具体根据机器环境来定;
⼀旦subscribe端断开链接,将会失去部分消息,即链接失效期间的消息将会丢失,所以这⾥就需要考虑到借助redis的list来持久化;单价英文
总结:pub发布的消息不会持久化,sub是阻塞等待消息,只能获取订阅之后的产⽣的消息,⼀段时间内sub没有收到消息或pub没有⽣产消息,sub连接会被回收(因为sub是阻塞的).
高中英语短文改错如果你⾮常关注每个消息,那么你应该基于Redis做⼀些额外的补充⼯作,如果你期望订阅是持久的,那么如下的设计思路可以借鉴:
1) subscribe端:
⾸先向⼀个Set集合中增加“订阅者ID”, 此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯⼀的订阅者,此Set为 "活跃订阅者集合"
2) subcribe端开启订阅操作,并基于Redis创建⼀个以 "订阅者ID" 为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的消息,此List称为 "订阅者消息队列"
3) publish端:
每发布⼀条消息之后,publish端都需要遍历 "活跃订阅者集合",并依次向每个 "订阅者消息队列" 尾部追加此次发布的消息.
4) 到此为⽌,我们可以基本保证,发布的每⼀条消息,都会持久保存在每个 "订阅者消息队列" 中.
5) subscribe端,每收到⼀个订阅消息,在消费之后,必须删除⾃⼰的 "订阅者消息队列" 头部的⼀条记录.
6) subscribe端启动时,如果发现⾃⼰的 "订阅者消息队列" 有残存记录, 那么将会⾸先消费这些记录,然后再去订阅.
以上⽅法可以保证成功到达的消息必消费不丢失;
pub/sub中消息发布者不需要独占⼀个Redis的链接,⽽消费者则需要单独占⽤⼀个Redis的链接,在java中便不得独⽴出分出⼀个线程来处理消费者。这种场景⼀般对应这多个消费者,此时则有着过⾼的资源消耗。
对于如上的⼏种不⾜,如果在项⽬中需要考虑的话可以使⽤JMS来实现该功能。JMS提供了消息的持久化/耐久性等各种企业级的特性。如果依然想使⽤Redis来实现并做⼀些数据的持久化操作,则可以根据JMS的特性来通过Redis模拟出来.
subscribe端⾸先向⼀个Set集合中增加“订阅者ID”,此Set集合保存了“活跃订阅”者,订阅者ID标记每个唯⼀的订阅者,例
如:sub:email,sub:web。此SET称为“活跃订阅者集合”
subcribe端开启订阅操作,并基于Redis创建⼀个以“订阅者ID”为KEY的LIST数据结构,此LIST中存储了所有的尚未消费的
消息。此LIST称为“订阅者消息队列”
publish端:每发布⼀条消息之后,publish端都需要遍历“活跃订阅者集合”,并依次向每个“订阅者消息队列”尾部追加此次发
布的消息。
到此为⽌,我们可以基本保证,发布的每⼀条消息,都会持久保存在每个“订阅者消息队列”中。
生活大爆炸第三季第九集
subscribe端,每收到⼀个订阅消息,在消费之后,必须删除⾃⼰的“订阅者消息队列”头部的⼀条记录。galgame翻译器
subscribe端启动时,如果发现⾃⼰的⾃⼰的“订阅者消息队列”有残存记录,那么将会⾸先消费这些记录,然后再去订阅。gis是什么
蒙昧协程通信机制——Pub/Sub
surpris新的协程框架VLCP。它使⽤的是⼀种Pub/Sub模型,即发布者、订阅者模型,这是常⽤于消息队列中的模型,熟悉消息队列⽤法就会⾮常熟悉这⼀套⽤法。接下来我们详细介绍⼀下这⼀套⽅法,并且看⼀下这种设计如何结合前⼏种⽅案的优点,弥补前⼏种⽅案的缺点。
在Pub/Sub模型中,主要分为三个不同的⾓⾊:
1. 发送⽅:通过nd⽅法发送⼀个事件
2. 接收⽅:通过参数订阅并接收⼀个事件
3. 管理:调整事件的优先顺序等
管理也可以由发送⽅或接收⽅中任意⼀个来兼任,主要⽬的是在许多事件同时存在时,调整优先级顺序,从⽽影响协程的执⾏先后次序。
在VLCP当中,发送通过scheduler.nd(或者更⾼层的RoutinerContainer.waitForSend),接收则通过yield语句,可以⾮常容易的进⾏。管理则通过调整调度器队列设置进⾏。
VLCP中的事件是vlcp.event.Event的⼦类,它⾸先根据⼦类类型进⾏区分,但与其他框架不同,⼦类可以进⼀步携带⼀组索引,⽤来标识这个事件的不同性质,它与事件类型⼀起共同起着类似于Pub/Sub中的主题(Topic)的作⽤。⽐如说,我们处理OpenFlow协议中的PACKET_IN消息,现在希望定义⼀种事件来表⽰有⼀个PACKET_IN消息到来了,对于接收⽅来说,可能关⼼的信息有:消息来⾃于哪个datapath;来⾃于哪个连接对象;由哪个table中、cookies为多少的流表⽣成。我们可以将这些信息作为索引来定义这个事件:
from vlcp.event import Event, withIndices
@withIndices('datapath', 'connection', 'table' 'cookies')
class OpenFlowPacketInEvent(Event):
pass
如你所见,定义⼀个事件⾮常容易,⽽定义⼀个事件⼏乎就完成了通信需要进⾏的所有准备⼯作——不需要创建额外的Future或者Channel 对象,甚⾄,不需要关⼼要进⾏通信的双⽅究竟是谁、在哪、有多少个。使⽤注解withIndices来定义⼀个Event的索引,这是必须的,即使Event没有可选的索引,也必须⽤@withIndices()来表明Event没有索引。
接下来,处理OpenFlow协议的协程会在这个事件发⽣时通知需要处理事件的协程,它只需要调⽤发送⽅的标准⽅法:
for m in container.waitForSend(OpenFlowPacketInEvent(
conn.datapath,
conn,
message.table,
message = message
)):
yield m
在这个过程中,我们创建了⼀个新的Event的实例,并提供了相应的索引的值。除了规定的索引值以外,我们还可以给这个Event对象提供额外的属性,它可以直接通过keyword-argument在构造函数中初始化,也可以在创建后再进⾏属性赋值。所有的索引也会⾃动被赋给相应的属性,⽐如说newevent.datapath就会得到datapath索引的值。将这个对象传递给waitForSend过程就完成了发送,waitForSend是个协程过程,使⽤for来在外层协程中代理这个过程,在Python3当中也可以更简单写成yield from container.waitForSend(...)
safe and sound 歌词
那么接下来是接收⽅的问题,接收⽅不需要关⼼事件何时由谁发出,当需要等待⼀个新的事件发⽣的时候,只需要简单使⽤:
packet_in_matcher = ateMatcher(None, None, my_table, my_cookies)
# Or:
高兴的英文
# packet_in_matcher = ateMatcher(table = my_table, cookies = my_cookies)
yield (packet_in_matcher,)
在协程中使⽤yield会暂停协程执⾏,在VLCP中,yield返回的是⼀个EventMatcher构成的元组,它可以包含⼀个或多个EventMatcher。EventMatcher通过Event⼦类的createMatcher⽅法创建,它代表⼀种匹配规则,即匹配这个⼦类的Event中,相应索引匹配相应值的事件。返回多个EventMatcher时,yield语句会在某个事件匹配任意⼀个EventMatcher时返回。匹配到的EventMatcher会保存在
container.matcher,⽽发⽣的事件会保存在container.event。
VLCP内部使⽤前缀树的数据结构对Event和EventMatcher进⾏匹配,这是⼀个很有效率的数据结构,将Event匹配到相应的EventMatcher只需要O(1)的时间。
对同⼀个事件,不同的协程可以通过createMatcher时的不同参数,来匹配事件集合的不同的⼦集,这在处理量⾮常⼤的时候可以有效提⾼处理效率,同时不增加程序复杂度。除了使⽤索引以外,还可以增加⼀个⾃定义的筛选过程:
customized_matcher = ateMatcher(
table = my_table,
cookies = my_cookies,
_ismatch = lambda x: ssage.data) < 100)
_ismatch的keyword参数⽤来指定⼀个函数⽤于筛选,它接受Event作为唯⼀的参数,返回True或者Fal表⽰是否应当匹配这个Event。
_ismatch只有指定索引值已经匹配的情况下才会进⾏计算。
Event的⼦类可以进⼀步派⽣。进⼀步派⽣的Event会继承⽗类的类型和索引,但也会有⾃⼰的类型和索引。⼦类的⼦类遵循⼀般的继承派⽣的规则:⽗类的EventMatcher可以匹配⼦类的Event,但⼦类的EventMatcher不能匹配⽗类的Event。⽐如:
@withIndices('a', 'b')
class MyEventBa(Event):
pass
@withIndices('c', 'd')
class MyEventChild(MyEventBa):
pass
MyEventChild(1,2,3,4) # a = 1, b = 2, c = 3, d = 4
利⽤这种特殊的继承关系可以拓展原有逻辑,在兼容以前代码的情况下提供新的功能。
VLCP的事件循环结构
VLCP的事件循环结构可以⽤上图表⽰。所有在调度器中运⾏的协程,都会在暂停运⾏时将⾃⼰注册到匹配树中,与⼀个或多个EventMatcher进⾏关联,这个过程通过yield语句完成。在事件循环运⾏过程中,调度器每次从中⼼队列中取出⼀个事件,在匹配树中查找与这个事件匹配的EventMatcher和相关联的协程,然后依次唤醒这些协程,通知它们等待的事件已经发⽣;协程在运⾏时,可以将事件通过nd过程发送到中⼼消息队列。在协程停⽌运⾏时,协程重新使⽤yield语句将⾃⼰注册到匹配树中,等待下⼀个循环。
当消息队列为空或⽆法出队时,调度器会调⽤Poller(在Linux当中由EPoll实现,其他操作系统当中使⽤Select)等待socket活动。Poller会将socket的活动返回成PollEvent,这同样是Event的⼦类,这些事件会由负责处理socket活动的协程进⾏处理,然后进⼀步产⽣后续的事件。当没有活动的socket时,调度器会开始引导整个框架退出。