rocketMQ安装配置及使用(技术文档)

更新时间:2023-06-30 10:35:32 阅读: 评论:0

rocketMQ安装配置及使⽤(技术⽂档)
RocketMQ
简介:
Name Server:是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步
Broker分为Master与Slave,⼀个Master可以对应多个Slave,但是⼀个Slave只能对应⼀个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表⽰Master,⾮0表⽰Slave。Master也可以部署多个。
每个Broker与Name Server集群中的所有节点建⽴长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到⼼跳,则Name Server断开与Broker的链接
Producer与Name Server集群中的其中⼀个节点(随机选择)建⽴长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建⽴长连接,且定时向Master发送⼼跳。Producer完全⽆状态,可集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name rver获取所有topic队列的最新情况,这意味着如果Broker 不可⽤,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送⼼跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到⼼跳数据,则关闭与Producer的连接
Consumer与Name Server集群中的其中⼀个节点(随机选择)建⽴长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建⽴长连接,且定时向Master、Slave发送⼼跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Consumer每隔30s从Name rver获取topic的最新队列情况,这意味着Broker不可⽤时,Consumer最多最需要30s才能感知。Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送⼼跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送⼼跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。
当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来
了,因此会有少量的消息丢失。但是⼀旦master恢复,未同步过去的消息会被最终消费掉。
Rocketmq的三种⾃带模式为:
1: 双MASTER : 优点配置简单,快捷 但是⼀旦MASTER机器宕机或出现问题就⽆法提供服务
名刀网2:双MASTER双SLAVE 同步双写:
⽐异步复制的性能差10% 但能保证数据不丢失
3: 双MASTER双SLAVE 异步复制:
性能最好,但是遇到突发情况会有少量数据丢失
以centOS7(4台),rocketMQ4.4.0 (2m-2s-sync)为实例:
实例模式为:双MASTER双SLAVE 同步双写
以下操作4台主机操作相同
192.168.1.111 (MASTER)
192.168.1.112 (MASTER)
192.168.1.113 (SLAVE)
192.168.1.114 (SLAVE)
1 依赖环境 jdk1.8
2 wget 安装jdk1.8, 修改/etc/profile 配置环境变量 完成后测试:java -version 出现版 本信息后表⽰成功。
3 安装rocketmq ⼆进制⽂件zip, 解压到/usr/local/ 下 (unzip rocketMQ.zip) 使⽤unzip 命令解压
4 切换到BIN ⽬录下 修改 runrver.sh 中的Xms1g -Xmx1g -Xmn512m (设置的参数按实际服务器参数为准),修改runbroker.sh 中的 Xms1g -Xmx1g -Xmn512m (设置的参数按实际服务器参数为准),保存(参数不要⼩于1G,如果⼩于1G 可能会导致rocketMQ⽆法启动)
5 在 rocketmq主⽬录下 创建logs (⽤户存放⽇志) 然后进⼊conf⽬录 执⾏ d -i ‘s#${ur.home}#/usr/local/rocketmq#g’*.xml
创建store⽬录 进⼊store⽬录 创建 mkdir /www/rocketmq/store/commitlog (存储⽂件)mkdir
/www/rocketmq/store/consumequeue (消息队列存储)mkdir /www/rocketmq/store/index (消息索引),
6 修改/etc/hosts ⽂件 给ip地址取别名
192.168.1.111 namerver1
192.168.1.112 namerver2
192.168.1.113 namerver3
192.168.1.114 namerver4
设置好保存。
修改/rocketmq/conf/2m-2s-sync/ broker-a.properties : (内容如下)
brokerClusterName=twoCluster  #集群名⼀个集群⾥的MQ 集群名相同
brokerName=broker-a  # 基本与⽂件名保持⼀致 slave机器上设置名字与⾃⼰的主master保持⼀致
brokerId=0  #master设置为0 ,slave 设置⼤于0
namesrvAddr=namerver1:9876;namerver2:9876;namerver3:9876;namerver4:9876 #此处为设置的IP别名,多个以;号隔开defaultTopicQueueNums=4 # ⾃动创建队列数量(以需求为主)
autoCreateTopicEnbale=true #⾃动创建标题正式环境设置为fal
autoCreateSubscriptionGroup=true #⾃动创建组正式环境设置为fal
listenPort=10911  #对外的监听端⼝
deleteWhen=04  #凌晨4点删除⽂件
fileRervedTime=120 #⽂件默认保留时间48h
mapedFileSizeCommitLog=1073741824 #设置⽂件的容量为1G
mapedFileSizeConsumeQueue=300000 #设置存放的消息条数为30w
storePathRootDir=/www/rocketmq/store #指定⽬录地址
storePathCommitLog=/www/rocketmq/store/commitlog #指定⽬录地址
storePathConsumeQueue=/www/rocketmq/store/consumequeue #指定⽬录地址
storePathIndex=/www/rocketmq/store/index #指定⽬录地址
storeCheckpoint=/www/rocketmq/store/checkpoint #指定⽬录地址
abortFile=/www/rocketmq/store/abort #指定⽬录地址
maxMessageSize=65536 #限制消息的⼤⼩
brokerRote=SYNC_MASTER #rocketmq的⾓⾊ slave 上设置为SLAVE
flushDiskType=SYNC_FLUSH #刷盘⽅式
惺惺的意思还有很多配置项 具体参考官⽅⽂档,
7:配置完成后 启动 : 先启动4台机器的 namerver 在启动master 在启动 slave
启动进⼊bin⽬录:启动namerver: nohup sh mqnamesrv &
启动broker(192.168.1.111):nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties >/dev/null 2>&1 &
启动broker(192.168.1.112):nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties >/dev/null 2>&1 &
启动broker(192.168.1.113):nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties
>/dev/null 2>&1 &
启动broker(192.168.1.111):nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties
>/dev/null 2>&1 &
2进⼊主题查看路由 可以看到主从关系及路由
3 进⼊驾驶舱可以看到集群中主机的动态
9: JAVA API:陶渊明独爱菊
1:引⼊MAVEN包:(本例以4.4.0为主)
<groupId>ketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.4.0</version>
<exclusions>
<exclusion>
<groupId>ioty</groupId>
<artifactId>netty-tcnative</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>韩式婚礼
<groupId>ketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>ioty</groupId>
<artifactId>netty-tcnative</artifactId>
<version>1.1.33.Fork2</version>
</dependency>
2: producer提供者:(三种⽅式发送消息)
1 同步发送
1:同步发送
DefaultMQProducer.nd(message, timeout)
2:顺序发送
DefaultMQProducer.nd(message,new MessageQueueSelector(){
@Override
public MessageQueue lect(List list, Message message, Object o)
},订单ID
);
3: 事务发送
TransactionMQProducer.ndMessagenTransaction(message,callBack,String args),
callback : 回调函数 必须实现LocalTransactionExecuter 接⼝ 并重写⾥⾯的⽅法 ,回调函数与发送消息是并⾏的,事务发送的消息消费者⽆需做什么改变 ⼤多数使⽤顺序消费(根据情况⽽定)
2 异步发送
DefaultMQProducer.nd(message, new SendCallback()
{},timeout)
3单点发送
DefaultMQProducer.Sendoneway();
2: comsumer 消费者
Pull :消费
1 DefaultMQPullConsumer
此⽅式需要⾃⼰提供定时器,让consumer定时去broker拉去消息
2 MQPullConsumerScheduleService
此⽅法⽆需⾃⼰提供定时器
Push: 消费
建⽴长连接, 让broker主动把消息推给consumer
泡泡游戏1:顺序消费DefaultMQPushConsumer .registerMessageListener(new MessageListenerOrderly(){})
2: 普通消费
牛喉咙
1: 使⽤mqFilfer组件必须要启动mqfilersrv
在2m-2s-sync 的配置⽂件中 加上配置:
filterServerNums=1
启动⽅式与启动briker的⽅式相同
Myfilersrv 放在最后启动
2:新建类 实现messageFilter接⼝ 重写match()⽅法
注意: 类中不可有中⽂
3:使⽤MixAll.File2string() ⽅法 把类转成字符串
4:consumer.subscribe(A,B,C)
A:消费的主题宝马的广告语
B: 是过滤器类的名称
C:是过滤器类转成字符串的值
送行祝福语过滤器可以设置好⼏个,使⽤过滤器减少了⽆⽤的消息,但也增加broker的负担11 消息的幂等去重
1 使⽤Redis 去重 reids的key 天然⽀持不重复
2 使⽤redis 和数据库⼀起去重

本文发布于:2023-06-30 10:35:32,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1070031.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   配置   设置
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图