RedisStream类型的使⽤详解
⽬录
6年级上册英语书⼀、背景
⼆、redis中Stream类型的特点
三、Stream的结构
四、Stream的命令
1、XADD 往Stream末尾添加消息
1、命令格式
2、举例
2、XRANGE查看Stream中的消息mtx
1、命令格式
2、准备数据
3、举例
3、XREVRANGE反向查看Stream中的消息
4、XDEL删除消息
1、命令格式
2、准备数据
3、举例
5、XLEN查看Stream中元素的长度
1、命令格式
2、举例
6、XTRIM对Stream中的元素进⾏修剪
1、命令格式
2、准备数据
3、举例
7、XREAD独⽴消费消息
1、命令格式
2、准备数据
3、举例
8、消费者组相关操作
1、消费者组命令
2、准备数据
3、创建消费者组
4、创建⼀个从某个消息之后消费的消费者组
6、⼀些监控命令
五、参考⽂档
⼀、背景
最近在看redis这⽅⾯的知识,发现在redis5中产⽣了⼀种新的数据类型Stream,它和kafka的设计有些类似,可以当作⼀个简单的消息队列来使⽤。⼆、redis中Stream类型的特点
是可持久化的,可以保证数据不丢失。
⽀持消息的多播、分组消费。
⽀持消息的有序性。
三、Stream的结构
解释:
消费者组: Consumer Group,即使⽤XGROUP CREATE命令创建的,⼀个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。
同⼀条消息,只能被这个消费者组中的某个消费者获取。
多个消费者之间是相互独⽴的,互不⼲扰。
消费者: Consumer 消费消息。
last_delivered_id:这个id保证了在同⼀个消费者组中,⼀个消息只能被⼀个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个last_delivered_id的值会往后移动⼀位,保证消费者不会读取到重复的消息。
pending_ids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调⽤ack命令。这样就确保了某个消息⼀定会被执⾏⼀次。
消息内容:是⼀个键值对的格式。
Stream 中消息的 ID:默认情况下,ID使⽤*,redis可以⾃动⽣成⼀个,格式为时间戳-序列号,也可以⾃⼰指定,⼀般使⽤默认⽣成的即可,且后⽣成的id号要⽐之前⽣成的⼤。
四、Stream的命令
1、XADD 往Stream末尾添加消息
1、命令格式
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
2、举例
xadd 命令返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第⼏条消息)
1、向流中增加⼀条数据,
127.0.0.1:6379> xadd stream-key * urname zhangsan # 向stream-key这个流中增加⼀个 urname 是zhangsan的数据 *表⽰⾃动⽣成id "1635999858912-0" # 返回的是ID
127.0.0.1:6379> keys *
1) "stream-key" # 可以看到stream⾃动创建了
127.0.0.1:6379>
2、向流中增加数据,不⾃动创建流
127.0.0.1:6379> xadd not-exists-stream nomkstream * urname lisi # 因为指定了nomkstream参数,⽽not-exists-stream之前不存在,所以加⼊失败(nil)
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>
3、⼿动指定ID的值
127.0.0.1:6379> xadd stream-key 1-1 urname lisi # 此处id的值是⾃⼰传递的1-1,⽽不是使⽤*⾃动⽣成
"1-1" # 返回的是id的值
127.0.0.1:6379>
4、设置⼀个固定⼤⼩的Stream1、精确指定Stream的⼤⼩
指定指定Stream的⼤⼩⽐模糊指定Stream的⼤⼩会稍微多少消耗⼀些性能。
2、模糊指定Stream的⼤⼩
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first
"1636001034141-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * cond cond
"1636001044506-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third
"1636001057846-0"
127.0.0.1:6379> xinfo stream stream-key
1) "length"
2) (integer) 3
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1636001057846-0"
9) "groups"
12) 1) "1636001034141-0"
2) 1) "first"
2) "first"
13) "last-entry"
14) 1) "1636001057846-0"
2) 1) "third"
2) "third"
127.0.0.1:6379>
英语周报 答案~模糊指定流的⼤⼩,可以看到指定的是1,实际上已经到了3.
2、XRANGE查看Stream中的消息
1、命令格式
xrange key start end [COUNT count]
2、准备数据
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * urname zhangsan
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * urname lisi
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636003481706-0"
2) "1636003481706-1"
grivel127.0.0.1:6379> xadd stream-key * urname wangwu
老友记第八季下载"1636003499055-0"
127.0.0.1:6379>
使⽤redis的事务操作,获取到同⼀毫秒产⽣的多条数据,时间戳⼀样,序列号不⼀样3、举例
1、获取所有的数据(-和+的使⽤)
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636003481706-0"
2) 1) "urname"
2) "zhangsan"
2) 1) "1636003481706-1"
2) 1) "urname"
2) "lisi"
3) 1) "1636003499055-0"
2) 1) "urname"
2) "wangwu"
127.0.0.1:6379>
-:表⽰最⼩id的值
+:表⽰最⼤id的值
2、获取指定id范围内的数据,闭区间
127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
1) 1) "1636003481706-1"
2) 1) "urname"
2) "lisi"
2) 1) "1636003499055-0"
2) 1) "urname"
2) "wangwu"
127.0.0.1:6379>
3、获取指定id范围内的数据,开区间
127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0
1) 1) "1636003481706-1"
2) 1) "urname"
2) "lisi"
127.0.0.1:6379>
(:表⽰开区间
4、获取某个毫秒后所有的数据
127.0.0.1:6379> xrange stream-key 1636003481706 +
1) 1) "1636003481706-0"
2) 1) "urname"
2) "zhangsan"
2) 1) "1636003481706-1"
2) 1) "urname"
2) "lisi"
3) 1) "1636003499055-0"
2) 1) "urname"
2) "wangwu"
127.0.0.1:6379>
直接写毫秒不写后⾯的序列号即可。
5、获取单条数据
127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0
1) 1) "1636003499055-0"
127.0.0.1:6379>
start和end的值写的⼀样即可获取单挑数据。
6、获取固定条数的数据
127.0.0.1:6379> xrange stream-key - + count 1
1) 1) "1636003481706-0"
2) 1) "urname"
2) "zhangsan"
127.0.0.1:6379>
使⽤count进⾏限制
3、XREVRANGE反向查看Stream中的消息
XREVRANGE key end start [COUNT count]
使⽤⽅式和XRANGE类似,略。
4、XDEL删除消息
1、命令格式
yes maden
xdel key ID [ID ...]
2、准备数据
127.0.0.1:6379> xadd stream-key * urname zhangsan
"1636004176924-0"
127.0.0.1:6379> xadd stream-key * urname lisi
"1636004183638-0"
127.0.0.1:6379> xadd stream-key * urname wangwu
"1636004189211-0"
127.0.0.1:6379>
john怎么读英语3、举例
需求:往Stream中加⼊3条消息,然后删除第2条消息
127.0.0.1:6379> xdel stream-key 1636004183638-0
(integer) 1 # 返回的是删除记录的数量
127.0.0.1:6379> xrang stream -key - +
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636004176924-0"
2) 1) "urname"
2) "zhangsan"
2) 1) "1636004189211-0"
2) 1) "urname"
2) "wangwu"
127.0.0.1:6379>
注意:
需要注意的是,我们从Stream中删除⼀个消息,这个消息并不是被真正的删除了,⽽是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。
5、XLEN查看Stream中元素的长度
1、命令格式
xlen key
2、举例
查看Stream中元素的长度
127.0.0.1:6379> xadd stream-key * urname zhangsan
"1636004690578-0"
127.0.0.1:6379> xlen stream-key
(integer) 1haowang
127.0.0.1:6379> xlen not-exists-stream-key
(integer) 0
127.0.0.1:6379>
注意:
如果xlen后⽅的key不存在则返回0,否则返回元素的个数。
6、XTRIM对Stream中的元素进⾏修剪
1、命令格式
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
2、准备数据
127.0.0.1:6379> xadd stream-key * urname zhangsan
"1636009745401-0"
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * urname lisi
地址寄存器QUEUED
127.0.0.1:6379(TX)> xadd stream-key * urname wangwu
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636009763955-0"
2) "1636009763955-1"
127.0.0.1:6379> xadd stream-key * urname zhaoliu
"1636009769625-0"
127.0.0.1:6379>
3、举例
1、maxlen精确限制
xtrap
127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最后的2个消息
(integer) 2
127.0.0.1:6379> xrange stream-key - + # 可以看到之前加⼊的2个消息被删除了
1) 1) "1636009763955-1"