Zookeeper的通信协议详解
通信协议
基于TCP/IP协议,zk实现了⾃⼰的通信协议来完成客户端与服务端,服务端与服务端之间的⽹络通信,zk的通信协议整体上的设计⾮常简
单,
客户端发起连接,发送握⼿包进⾏timeout协商,协商成功后会返回⼀个ssion id和timeoout值.随后就可以进⾏正常通信,通信过程中要在
timeout范围内发送ping包. zookeeper client和rver之间的通信协议基本规则就是发送请求获取响应.并根据响应做不同的动作.
发送数据格式为:
消息长度+xid+request. xid每次请求必须是唯⼀的.消息长度和xid同为4字节,命令长度为4字节且必须为request的开始4字节.
命令是从1到11的数字表⽰,clo的命令为-11.不同类型请求request有些差异
特殊请求具有固定的xid:watch_xid固定为-1,ping_xid为-2,auth_xid固定为-4.普通请求⼀般从0开始每次请求累加⼀次xid.
响应数据为:
消息长度+header+respon.消息长度为4字节,表明header+respon的总长度.
二十五味鬼臼丸
header为xid,zxid,err对应长度为4,spon根据请求类型不同具有差别
根据header⾥xid的区别分为watch,ping,auth,data这四种类型
根据这四种类型来区分返回消息是事件,还是认证,⼼跳和请求数据.client并以此作出不同响应.
消息结构
握⼿消息
request消息体
protocol_version+zxid+timeout+ssion_id+passwd_len+passwd+read_only.对应的字节长度为4,8,4,8,4,16,1 取值除timeout外
其他⼏个皆可为0,password可以为任意16个字符.read_only为0或1(是布尔值). 注:握⼿包没有xid和命令
respon消息体
protocol_version+timeout+ssion_id+passwd_len+passwd+read_only. 注:握⼿响应包没有header.
效果展⽰
2020-04-21 19:26:53.990 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:646 _connect] Connecting to 30.3.3.60:988
2020-04-21 19:26:53.990 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:650 _connect] Using ssion_id: 144131667822 2020-04-21 19:26:53.990 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:299 _submit] Sending request(xid=None): Connec 2020-04-21 19:26:53.991 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:285 _invoke] Read respon Connect(protocol_ve 2020-04-21 19:26:53.991 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:694 _connect] Session created, ssion_id: 14413 negotiated ssion timeout: 30000
connect timeout: 10000.0
拍摄全家福read timeout: 20000.0
2020-04-21 19:26:53.991 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [client.py:463 _ssion_callback] test: cur state CONNECTED, old
ping消息
request消息体
type (ping包只有⼀个字段就是命令值是11,它完整的发送包是4字节长度,4字节xid,4字节命令.)
respon消息体
res_len+header+res (ping响应包⼀般只拆到header即可通过xid确认)
效果展⽰
2020-04-21 20:05:03.971 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:603 _connect_attempt] test: nd ping
油炸鸡块2020-04-21 20:05:03.971 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:490 _nd_ping] test: nd ping
2020-04-21 20:05:03.971 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:299 _submit] Sending request(xid=-2): Ping() 2020-04-21 20:05:03.973 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:606 _connect_attempt] test: read socket
2020-04-21 20:05:03.973 localhost cms_watcher info INFO cms_watcher [pid:240] [Thread-4] [connection.py:415 _read_socket] test: Received Ping
getdata消息
request消息体
type+path_len+path+watcher type=4. path_len,是4字节,为path的长度 path为需要查询的路径,⽀持utf8 watcher为布尔值.判断是否
有事件注册.为1或0. 1字节
respon消息体
data_len+data+stat data_len为data长度,4字节. stat由8,8,8,8,4,4,4,8,4,4,8字节顺序组成.
效果展⽰
2020-04-21 20:25:13.460 localhost cms_watcher info INFO cms_watcher [pid:9078] [Thread-4] [connection.py:610 _connect_attempt] test: nd request
2020-04-21 20:25:13.460 localhost cms_watcher info INFO cms_watcher [pid:9078] [Thread-4] [connection.py:482 _nd_request] test: nd request xid 4 2020-04-21 20:25:13.460 localhost cms_watcher info INFO cms_watcher [pid:9078] [Thread-4] [connection.py:299 _submit] Sending request(xid=4): GetData(p 2020-04-21 20:25:13.461 localhost cms_watcher info INFO cms_watcher [pid:9078] [Thread-4] [connection.py:606 _connect_attempt] test: read socket
草书练字2020-04-21 20:25:13.461 localhost cms_watcher info INFO cms_watcher [pid:9078] [Thread-4] [connection.py:448 _read_socket] test: Reading for header Repl 序列化和反序列化
下⾯看下kazoo这个库是怎样根据zk的这个协议来组装数据和解析数据的
request字节流
下⾯的代码展⽰了将请求对象序列化成socket字节流的过程
def _submit(lf, request, timeout, xid=None):
"""Submit a request object with a timeout value and optional
xid"""
b = bytearray()
if xid:
pe:
b += request.rialize()
COM端口
lf.logger.log(
(BLATHER if isinstance(request, Ping) el logging.DEBUG),思考人生
"Sending request(xid=%s): %s", xid, request)
lf._write(int_struct.pack(len(b)) + b, timeout)
从上⾯的代码可以看出,⾸先根据不同的请求,决定是否发送xid字段、type字段(也就是上⾯协议所说的),最后根据request对象序列
化成字节流。这⾥的request就是kazoo/protocol/rialization.py定义的各个类实例 ⽐如连接类:
class Connect(namedtuple('Connect', 'protocol_version last_zxid_en'
' time_out ssion_id passwd read_only')):
type = None
def rialize(lf):
b = bytearray()
lf.protocol_version, lf.last_zxid_en, lf.time_out,
lf.ssion_id))
return b
@classmethod
def derialize(cls, bytes, offt):
proto_version, timeout, ssion_id = int_int_long_struct.unpack_from(
bytes, offt)
offt += int_int_long_struct.size
password, offt = read_buffer(bytes, offt)
try:
read_only = bool_struct.unpack_from(bytes, offt)[0] is 1
offt += bool_struct.size
:
read_only = Fal
return cls(proto_version, 0, timeout, ssion_id, password,
read_only), offt
respon字节流
分包管理制度下⾯的代码展⽰了将socket字节流反序列化成对象的过程
英语短故事def _read_header(lf, timeout):
b = lf._read(4, timeout)
length = int_struct.unpack(b)[0]
b = lf._read(length, timeout)
header, offt = ReplyHeader.derialize(b, 0)
return header, b, offt
从上⾯的代码可以看出,⾸先从socket中读取4个字节,根据上⾯的协议,我们知道,这4个字节是data_len,即包的⼤⼩ 然后在根据len 继续读取该⼤⼩的字节流,最后解析成具体的对象。
class ReplyHeader(namedtuple('ReplyHeader', 'xid, zxid, err')):
@classmethod
def derialize(cls, bytes, offt):
"""Given bytes and the current bytes offt, return a
:class:`ReplyHeader` instance and the new offt"""
new_offt = offt + reply_header_struct.size
return cls._make(
reply_header_struct.unpack_from(bytes, offt)), new_offt