ZeroMQXPubXSub模式

更新时间:2023-06-27 18:50:04 阅读: 评论:0

ZeroMQXPubXSub模式
ZeroMQ XPub/XSub模式
Motivation
相对于XPub/XSub模式,我们很容易想到Pub/Sub模式,即订阅发布模式。当我们使⽤ZeroMQ创建⼀个包含订阅发布模式的系统时,我们通常创建⼀个消息的发布者,即Publisher,和若⼲个消息订阅者(Subscriber)。消息的发布者绑定端⼝,订阅者通过发布者的IP和端⼝连接发布者,并且注册消息主题(Topic),然后进⾏接收匹配主题的消息。整体结构如下图:
在订阅发布模式下,订阅者可以动态加⼊,随时连接消息的发布者,然后接收消息。但是,在这种结构中,如果有新的Publisher加⼊,那么所有订阅者都需要连接到这个Publisher上。如果系统中有成百上千的订阅者,每⼀个新的Publisher的加⼊都会给系统造成很⼤的操作成本,这显然限制了系统规模。要解决这个问题,也很简单,就像只有⼀个发布者情况,所有的订阅者都只与这⼀个消息发布者交互,不管是Publisher内部发⽣什么变化,Subscriber都可以动态感知这种变化。所以很容易我们可以想到创建⼀个中间件来解耦Publishers和Subscribers,所有Subscriber都只与这⼀个中间件交互,换句话说,这个中间件从很多个Publisher那⾥接收消息,然后转发给Subscibers。事实上,有了这个中间件,我们可以做很多Pub/Sub模式做不了的事情,⽐如说对传送过程中的消息进⾏管理,重构,或者对系统进⾏负载均衡等等。我们把这个中间件称为Broker,上⾯说的这种模式,我们称之为XPub/XSub模式。
XPub/XSub
在XPub/XSub模式中,对Publisher来说由Pub/Sub模式中的bind操作变成了connect操作,connect的对象为Broker中的XSub端⼝。对Subscriber⽽⾔,和Publisher的操作⼀样,只不过connect的是Broker的XPub端⼝。在Broker中我们绑定XSub和XPub这两个端⼝。Proxy的作⽤即为中转消息,在ZMQ的API中提供了zmq.proxy⽅法来中转消息,其实Proxy就是⼀个代码块,在这个代码块中可以做任何我们想做的操作。后⾯会介绍⼀个简单的例⼦。从XPub/XSub这个模式中,我们可以发现,不管
是Publisher还是Subscriber,它们的加⼊和离开都可以被系统动态发现。
XPub/XSub例⼦
在这个例⼦中,我们让Publisher从CSV⽂件中读取数据,在Broker中维护⼀个buffer,如果有Subscriber加⼊,我们⾸先发送缓冲区的历史消息,然后转发新消息给Subscribers。
封装ZMQ API
# -*- coding: utf-8 -*-
# utl.py
import zmq
def get_publisher(address, port):
入党自我总结
context = zmq.Context()
socket = context.socket(zmq.PUB)
connect_addr ='tcp://%s:%s'%(address, port)
return socket
def get_subscriber(address, port, topics):
# Subscriber can register one more topics once
context = zmq.Context()
socket = context.socket(zmq.SUB)
connect_addr ='tcp://%s:%s'%(address, port)
if isinstance(topics,str):
socket.subscribe(topics)
elif isinstance(topics,list):
[socket.subscribe(topic)for topic in topics]
return socket
def get_broker(xsub_port, xpub_port):
context = zmq.Context()
xsub_socket = context.socket(zmq.XSUB)
xsub_addr ='tcp://*:%s'% xsub_port
xsub_socket.bind(xsub_addr)
# make xsub receive any message
xsub_socket.nd(b'\x01')
健康教育与健康促进xpub_addr ='tcp://*:%s'% xpub_port
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind(xpub_addr)
# make xpub receive verbo messages
xpub_socket.tsockopt(zmq.XPUB_VERBOSE,1)
# zmq.proxy(xsub_socket, xpub_socket)
return xsub_socket, xpub_socket
Publisher类
import argpar
import utl
class Publisher(object):
def__init__(lf, topic, broker_address, broker_port, data, rate):
'''
:param topic: the topic associated with messages
:param broker_address: broker public IP
:
param broker_port: XSub port number
:param data: csv file path
:param rate: publishing rate, unit is cond
社会实践成果'''
lf.pub_socket = _publisher(broker_address, broker_port)
lf.data = data
lf.rate = rate
def publish_data(lf):
with open(lf.data, newline='')as csv_file:
reader = ader(csv_file, delimiter=',')
for row in reader:
row.inrt(0, lf.topic)
record =','.join(row)
lf.pub_socket.nd_string(record)
print('[Publisher] Published message: %s'% record)校园景色
time.sleep(lf.rate)
if __name__ =='__main__':
parr = argpar.ArgumentParr()
parr.add_argument('-t','--topic',type=str,help='Topic')
parr.add_argument('-a','--address',type=str,help='Broker public IP address')    parr.add_argument('-p','--port',type=str,help='Broker XSub port number')    parr.add_argument('-f','--file',type=str,help='Data file path')
parr.add_argument('-r','--rate',type=int,help='Publishing rate in cond')
args = parr.par_args()
pub = pic, args.address, args.port, args.file, args.rate)
pub.publish_data()
Broker类
import utl
import zmq
class Broker(object):
def__init__(lf, xsub_port, xpub_port):
lf.xsub_socket, lf.xpub_socket = _broker(xsub_port, xpub_port)        lf.poller = zmq.Poller()
ister(socket=lf.xpub_socket, flags=zmq.POLLIN)
ister(socket=lf.xsub_socket, flags=zmq.POLLIN)
lf.buffer={}
def update_buffer(lf, msg):
topic = msg.split(',')[0]
if topic in lf.buffer:
lf.buffer[topic].append(msg)
el:
lf.buffer.update({topic:[msg]})
def handler(lf):
while True:
events =dict(lf.poller.poll(1000))
# events from publishers
if lf.xsub_socket in events:
msg = lf.v_string()
lf.xpub_socket.nd_string(msg)
print('[Broker] Forwarded message: %s'% msg)
lf.update_buffer(msg)
# events from subscribers
if lf.xpub_socket in events:
topic =''.join(list(lf.v_string())[1:])
if topic in lf.buffer:
# nd history messages
[lf.xpub_socket.nd_string(item)for item in lf.buffer[topic]]
el:
lf.xsub_socket.nd_string(topic)艾草洗澡
if __name__ =='__main__':
# The 1st argument is XSub port number, the 2nd is XPub port number
broker = Broker(sys.argv[1], sys.argv[2])
芙蓉寺broker.handler()
Subscriber类
import argpar
import utl
class Subscriber(object):
def__init__(lf, broker_address, broker_port, topics):
lf.socket = _subscriber(broker_address, broker_port, topics)
def subscribe(lf):
while True:
msg = v_string()
print('[Subscriber] Received message: %s'% msg)
if __name__ =='__main__':
parr = argpar.ArgumentParr()
parr.add_argument('-t','--topics',type=str,help='Topics parated by comma')    parr.add_argument('-a','--address',type=str,help='Broker address')
parr.add_argument('-p','--port',type=str,help='Broker port number')
露出研究会
args = parr.par_args()
topics = pics.split(',')
sub = Subscriber(args.address, args.port, topics)
二力平衡sub.subscribe()

本文发布于:2023-06-27 18:50:04,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1057596.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   订阅   模式   发布者   系统   操作   发布
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图