⼀篇⽂章讲透zmq的四种模型以及代码实现全过程。
之前有相关的笔记及内容:
安装及简单⽰例见以下:
# Zeromq的⼏种模式⽐较
* **REQ-REP:** 请求-应答型,同步,TCP,1对n(服务器),n对1(服务器).
* **PUB-SUB:** 发布-订阅,经典模式适⽤于单向的数据发布,更新流既没有起点也没有终点,类似于⼴播,异步,UDP,1对n(服务器)* **PULL-PUSH:** 推拉模式,TCP,异步,1对n(服务器)
* **ROUTER-REQ-DEALER:** 请求应答型,同步,tcp,添加代理可以n(client)对m(rver),分配⾝份。
### 注意:
> 1.DEALER就像是⼀个异步的REQ,⽽ROUTER就像⼀个异步的REP。所以可以相互使⽤。
### > 2.ROUTER做代理可以提供可靠的模式来分别识别客户端和后端服务器。
> 3.**⼤部分**的关键字都是可以相互之间建⽴新的模式或者组合新的模式。
> 4.这些只是基础的模式,根据需求可以设计⾃⼰需要的模式。
## REP-REQ
```
graph LR
客户端-->服务器
```
```
graph LR
客户端A-->服务器
客户端B-->服务器
```
```
ps如何替换颜色graph LR
客户端-->服务器A
客户端-->服务器B
```
## PUB-SUB
```
graph TB
发布者PUB-->A订阅者SUB
发布者PUB-->B订阅者SUB
发布者PUB-->C订阅者SUB
```
## PUSH-PULL
```
graph TB
发⽣器PUSH-->A⼯⼈PULL 发⽣器PUSH-->B⼯⼈PULL 发⽣器PUSH-->C⼯⼈PULL A⼯⼈PULL-->接收器PUSH B⼯⼈PULL-->接收器PUSH C⼯⼈PULL-->接收器PUSH ```
## ROUTER担任代理
```
graph LR
clientA-->ROUTER
clientB-->ROUTER
clientC-->ROUTER ROUTER-->workerA ROUTER-->workerB ROUTER--&
1.REQ/REP模型
rvice.c
// 绑定⼀个REP套接字⾄tcp://*:5555
// 从客户端接收Hello,并应答World
//
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <asrt.h>
/
/gcc -o hwrver hwrver.c -lzmq
int main (void)
{
// Socket to talk to clients
void *context = zmq_ctx_new ();
// 与客户端通信的套接字
void *responder = zmq_socket (context, ZMQ_REP);
int rc = zmq_bind (responder, "tcp://*:5555"); // 服务器要做绑定 asrt (rc == 0);
while (1) {
// 等待客户端请求
char buffer [10];
团结的口号
int size = zmq_recv (responder, buffer, 10, 0);
buffer[size] = '\0';
printf ("收到 %s\n", buffer);
sleep (1); // Do some 'work'
// 返回应答
zmq_nd (responder, "World", 5, 0);
}
return 0;
}
client.c
// 连接REQ套接字⾄ tcp://localhost:5555
/
/ 发送Hello给服务端,并接收World
//
// Hello World client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
//编译:gcc -o hwclient hwclient.c -lzmq
int main (void)
{
printf ("Connecting to hello \n");
void *context = zmq_ctx_new ();
// 连接⾄服务端的套接字
void *requester = zmq_socket (context, ZMQ_REQ);
zmq_connect (requester, "tcp://localhost:5555");
int request_nbr;
for (request_nbr = 0; request_nbr != 10; request_nbr++) {
char buffer [10];
printf ("A 正在发送 Hello %d...\n", request_nbr);
zmq_nd (requester, "Hello", 5, 0);
牛角挂书的故事//zmq_recv (requester, buffer, 5, 0); // 收到响应才能再发
//printf ("接收到 Hello World %d\n", request_nbr);
printf ("B 正在发送 Darren %d...\n", request_nbr);
zmq_nd (requester, "Darren", 6, 0); // ⽆效
zmq_nd (requester, "king", 4, 0); // ⽆效
zmq_recv (requester, buffer, 6, 0); // 收到响应才能再发
printf ("接收到Darren World %d\n", request_nbr);
}
zmq_clo (requester);
zmq_ctx_destroy (context);
return 0;
}
2.PUB/SUB模型
中国警衔
头⽂件 zhelpers.h
/* ===================================================================== zhelpers.h
Helper header file for example applications.
少林传世===================================================================== */
#ifndef __ZHELPERS_H_INCLUDED__
#define __ZHELPERS_H_INCLUDED__
// Include a bunch of headers that we will need in the examples
#include <zmq.h>
#include <asrt.h>
#include <signal.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#if (!defined (WIN32))
# include <sys/time.h>
松茸种植
#endif
#if (defined (WIN32))
# include <windows.h>
#endif
/
/ Version checking, and patch up missing constants to match 2.1
#if ZMQ_VERSION_MAJOR == 2
# error "Plea upgrade to ZeroMQ/3.2 for the examples"
#endif
// On some version of Windows, POSIX subsystem is not installed by default.
// So define srandom and random ourlf.
#if (defined (WIN32))
# define srandom srand
# define random rand
#endif
// Provide random number from 0..(num-1)
#define randof(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0))
// Receive 0MQ string from socket and convert into C string
// Caller must free returned string. Returns NULL if the context
// is being terminated.
static char *
s_recv (void *socket) {
enum { cap = 256 };
char buffer [cap];
int size = zmq_recv (socket, buffer, cap - 1, 0);
if (size == -1)
return NULL;
buffer[size < cap ? size : cap - 1] = '\0'; // 发送字符的时候不发结束符
#if (defined (WIN32))
return strdup (buffer);
#el
return strndup (buffer, sizeof(buffer) - 1);
#endif
// remember that the strdup family of functions u malloc/alloc for space for the new string. It must be manually // freed when you are done with it. Failure to do so will allow a heap attack.
亚卫论坛
}
// Convert C string to 0MQ string and nd to socket
static int
s_nd (void *socket, char *string) {
int size = zmq_nd (socket, string, strlen (string), 0);
return size;
}
// Sends string as 0MQ string, as multipart non-terminal
static int
s_ndmore (void *socket, char *string) {
int size = zmq_nd (socket, string, strlen (string), ZMQ_SNDMORE);
return size;
}
// Receives all message parts from socket, prints neatly
//
static void
s_dump (void *socket)
{
int rc;
>梦的解析读后感