zeromq简介及各个通讯模式实例详解(附java实现)
⼀、Zeromq简介
ZeroMQ是⼀种基于消息队列的多线程⽹络库,其对套接字类型、连接处理、帧、甚⾄路由的底层细节进⾏抽象,提供跨越多种传输协议的套接字。ZeroMQ是⽹络通信中新的⼀层,介于应⽤层和传输层之间(按照TCP/IP划分),其是⼀个可伸缩层,可并⾏运⾏,分散在分布式系统间。
zeroMQ在设计上主要采⽤了以下⼏个⾼性能的特征:
1、⽆锁的队列模型
对于跨线程间的交互(⽤户端和ssion)之间的数据交换通道pipe,采⽤⽆锁的队列CAS;在pipe的两端注册有异步事件,在读或者写消息到pipe的时,会⾃动触发读写事件。
2、批量处理的算法
对于传统的消息处理,每个消息在发送和接收的时候,都需要系统的调⽤,这样对于⼤量的消息,系统的开销⽐较
⼤,zeroMQ对于批量的消息,进⾏了适应性的优化,可以批量的接收和发送消息。
3、多核下的线程绑定,⽆须CPU切换evenif
区别于传统的多线程并发模式,信号量或者临界区, zeroMQ充分利⽤多核的优势,每个核绑定运⾏⼀个⼯作者线程,避免多线程之间的CPU切换开销。
ZMQ连接和传统的TCP连接是有区别的,主要有:
使⽤多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(⼴播)、epgm;
当客户端使⽤zmq_connect()时连接就已经建⽴了,并不要求该端点已有某个服务使⽤zmq_bind()进⾏了绑定;
连接是异步的,并由⼀组消息队列做缓冲;
连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;
⼀个套接字可以有多个输⼊和输出连接;
ZMQ没有提供类似zmq_accept()的函数,因为当套接字绑定⾄端点时它就⾃动开始接受连接了;
应⽤程序⽆法直接和这些连接打交道,因为它们是被封装在ZMQ底层的。
ZMQ提供了⼀组单播传输协议(inporc, ipc, tcp),和两个⼴播协议(epgm, pgm)。⼴播协议是⽐较⾼级的协议,我们会在以后讲述。如果你不能回答我扇出⽐例会影响⼀对多的单播传输时,就先不要去学习⼴播协议了吧。
tcp作为传输协议,这种TCP连接是可以脱机运作的,它灵活、便携、且⾜够快速。为什么称之为脱机,是因为ZMQ中⼀般⽽⾔我们会使⽤tcp
的TCP连接不需要该端点已经有某个服务进⾏了绑定,客户端和服务端可以随时进⾏连接和绑定,这对应⽤程序⽽⾔都是透明的。
ipc,和tcp的⾏为差不多,但已从⽹络传输中抽象出来,不需要指定IP地址或者域名。这种协议很多时候会很⽅便,本指南进程间协议,即ipc
中的很多⽰例都会使⽤这种协议。ZMQ中的ipc协议同样可以是脱机的,但有⼀个缺点——⽆法在Windows操作系统上运作,这⼀点也许会在未来的ZMQ版本中修复。我们⼀般会在端点名称的末尾附上.ipc的扩展名,在UNIX系统上,使⽤ipc协议还需要注意权限问题。你还需要保证所有的程序都能够找到这个ipc端点。
进程内协议,即inproc
inproc,可以在同⼀个进程的不同线程之间进⾏消息传输,它⽐ipc或tcp要快得多。这种协议有⼀个要求,必须先绑定到端点,才能建⽴连接,也许未来也会修复。通常的做法是先启动服务端线程,绑定⾄端点,后启动客户端线程,连接⾄端点。
TCP套接字和ZMQ套接字之间在传输数据⽅⾯的区别:
ZMQ套接字传输的是消息,⽽不是字节(TCP)或帧(UDP)。消息指的是⼀段指定长度的⼆进制数据块,我们下⽂会讲到消息,这种设计是为了性能优化⽽考虑的,所以可能会⽐较难以理解。
ZMQ套接字在后台进⾏I/O操作,也就是说⽆论是接收还是发送消息,它都会先传送到⼀个本地的缓冲队列,这个内存队列的⼤⼩是可以配置的。
ZMQ套接字可以和多个套接字进⾏连接(如果套接字类型允许的话)。TCP协议只能进⾏点对点的连接,⽽ZMQ则可以进⾏⼀对多(类似于⽆线⼴播)、多对多(类似于邮局)、多对⼀(类似于信箱),当然也包括⼀对⼀的情况。
ZMQ套接字可以发送消息给多个端点(扇出模型),或从多个端点中接收消息(扇⼊模型)
⼆、Zeromq模式详解
1.Request-Reply模式
拼写问答模式,由请求端发起请求,然后等待回应端应答。⼀个请求必须对应⼀个回应,从请求端的⾓度来看是发-收配对,从回应端的⾓度是收-发对。请求端可以是1~N个。该模型主要⽤于远程调⽤及任务分配等。
使⽤REQ-REP套接字发送和接受消息是需要遵循⼀定规律的。客户端⾸先使⽤zmq_nd()发送消息,再⽤zmq_recv()接收,如此循环。如果打乱了这个顺序(如连续发送两次)则会报错。类似地,服务端必须先进⾏接收,后进⾏发送。
也就是说Request-Reply模式是严格同步的,Request端必须先发送后接受,reply端必须先接受后发送。
深⼊到信封通信原理,Request在发送数据帧之前⼀并包含了⼀个空⽩的数据分割符数据帧,即在程序中虽然只是发送了⼀个数据帧作为参数,实际Request套接字⼜在数据帧的基础上添加了空字符数据帧。Request在接受的时候会去掉空⽩分隔符数据帧,直接将实际的数据返回到应⽤程序。reply套接字在接受的时候会读取消息帧并存储⼀直遇到空⽩分隔符,然后将剩余的消息返回到应⽤程序,在发送的时候会将存储的消息与待发送的数据⼀并发送出去。
Server:
public static void main(String[] args) throws Exception {
ZMQ.Context context = t(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(ZMQ.REP);
responder.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = v(0);
System.out.println("Received Hello");
// Do some 'work'
Thread.sleep(1000);
// Send reply back to client
String reply = "World";
responder.Bytes(), 0);
}
cet4准考证号查询
responder.clo();
<();
}
Client:
public static void main(String[] args) {
ZMQ.Context context = t(1);
// Socket to talk to rver
System.out.println("Connecting to hello world rver…");
ZMQ.Socket requester = context.socket(ZMQ.REQ);
champion
for (int requestNbr = 0; requestNbr != 100; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
requester.Bytes(), 0);
byte[] reply = v(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.clo();
<();
}
eclip下maven项⽬⼯程下载地址,可直接运⾏:
2.Pub-Sub模式
发布订阅模式发布端单向分发数据,且不关⼼是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,⽽不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要⽤于数据分发。天⽓预报、微博明星粉丝可以应⽤这种经典模型。
在使⽤SUB套接字时,必须使⽤subscribe()⽅法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到。订阅信息可以是任何字符串,可以设置多次。只要消息满⾜其中⼀条订阅信息,SUB套接字就会收到。
PUB-SUB套接字组合是异步的。客户端在⼀个循环体中使⽤zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使⽤zmq_nd()发送消息,但不能在PUB套接字上使⽤zmq_recv()。
关于PUB-SUB套接字,还有⼀点需要注意:你⽆法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失⼀些消息的,因为建⽴连接是需要⼀些时间的。很少,但并不是零。
我们知道在建⽴TCP连接时需要进⾏三次握⼿,会耗费⼏毫秒的时间,⽽当节点数增加时这个数字也会上升。在这么短的时间⾥,ZMQ就可以发送很多很多消息了。举例来说,如果建⽴连接需要耗时5毫秒,⽽ZMQ只需要1毫秒就可以发送完这1000条消息。
所以需要发布者和订阅者同步,只有当订阅者准备好时发布者才会开始发送消息。有⼀种简单的⽅法来同步PUB和SUB,就是让PUB延迟⼀段时间再发送消息。现实编程中我不建议使⽤这种⽅式,因为它太脆弱了,⽽且不好控制。不过这⾥我们先暂且使⽤sleep的⽅式来解决。
另⼀种同步的⽅式则是认为发布者的消息流是⽆穷⽆尽的,因此丢失了前⾯⼀部分信息也没有关系。我们的⽓象信息客户端就是这么做的。
⽰例中的⽓象信息客户端会收集指定邮编的⼀千条信息,其间⼤约有1000万条信息被发布。你可以先打开客户端,再打开服务端,⼯作⼀段时间后重启服务端,这时客户端仍会正常⼯作。当客户端收集完所需信息后,会计算并输出平均温度。
关于发布-订阅模式的⼏点说明:
订阅者可以连接多个发布者,轮流接收消息;
如果发布者没有订阅者与之相连,那它发送的消息将直接被丢弃;
如果你使⽤TCP协议,那当订阅者处理速度过慢时,消息会在发布者处堆积。
在⽬前版本的ZMQ中,消息的过滤是在订阅者处进⾏的。也就是说,发布者会向订阅者发送所有的消息,订阅者会将未订阅的消息丢弃。
Server:
public static void main (String[] args) throws Exception {
// Prepare our context and publisher
ZMQ.Context context = t(1);
ZMQ.Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5556");
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
while (!Thread.currentThread ().isInterrupted ()) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = 10000 + Int(10000) ;
temperature = Int(215) - 80 + 1;
relhumidity = Int(50) + 10 + 1;
// Send message to all subscribers
String update = String.format("%05d %d %d", zipcode, temperature, relhumidity); publisher.nd(update, 0);
}
publisher.clo ();
< ();
}一建合格线
Client1:
magnetopublic static void main (String[] args) throws IOException {
ZMQ.Context context = t(1);
// Socket to talk to rver
System.out.println("Collecting updates from weather rver"); ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
// Subscribe to zipcode, default is NYC, 10001
vtime
String filter = (args.length > 0) ? args[0] : "10001 ";
subscriber.Bytes());
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
// U trim to remove the tailing '0' characteramtc
帽子的英文单词String string = vStr(0).trim();
StringTokenizer sscanf = new StringTokenizer(string, " "); int zipcode = Integer.Token());
int temperature = Integer.Token());
int relhumidity = Integer.Token());
total_temp += temperature;
}
System.out.println("Average temperature for zipcode '"
+ filter + "' was " + (int) (total_temp / update_nbr));
System.out.println("...");
ad();
subscriber.clo();
<();
}
Client2;
public static void main (String[] args) throws IOException {
ZMQ.Context context = t(1);
// Socket to talk to rver
System.out.println("Collecting updates from weather rver"); ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
// Subscribe to zipcode, default is NYC, 10001
String filter = (args.length > 0) ? args[0] : "10002";
subscriber.Bytes());
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
// U trim to remove the tailing '0' character
String string = vStr(0).trim();