Canal和Kafka整合⽅案——解决Canal写⼊Kafka并发消费问题
⽂章⽬录
⼀、问题描述
在使⽤Canal读取binlog来对数据库增量进⾏同步时遇到⼀下⼏个问题
⾸先是在使⽤Canal⾃带客户端进⾏同步时需要⾃⼰⼿动调⽤get()或者getWithoutAck()进⾏拉取
拉取⽇志后进⾏同步只能⼀条⼀条处理,效率⽐较低
如果处理⽇志过慢或者其他原因容易导致⽇志堆积在服务器
为了解决上⾯的问题打算在⽇志同步过程中引⼊MQ来作为中间同步,Canal⽀持RocketMQ和Kafka两种,最终选⽤Kafka来进⾏
⼆、引⼊Kafka
1.Canal整合Kafka及项⽬初步搭建
Kafka与Canal整合在中已经介绍过,整体搭建过程不变。照片日记
这次以⼀组房间数据作为⽰例,数据库:demo
表名:pricing_hou_info
菠菜豆腐汤Kafka的topic:hou-topic
完成Canal搭建和整合Kafka之后,启动Canal,然后往pricing_hou_info表中插⼊数据,并在Kafka消费数据,搭建SpringBoot项⽬并整合Kafka(SpringBoot整合Kafka在之前的博⽂中有描述:),然后完成相关配置后启动项⽬在Kafka控制台中看到Canal同步过来的数据表⽰搭建成功,并且在项⽬中也成功消费。
2.整合Kafka后引出新问题
此时如果按照这样的⽅式,确实能够缓解Canal服务器压⼒以及不再⼿动拉取⽇志的问题,但是这样的⽅式依然是在⼀条⼀条的消费消息,性能并未得到提升。
土建施工合同如何解决这样的问题?⾸先肯定想到的是多线程并发消费,如果我们单纯地⽤多线程并发消费的话并不能保证消息的有序性,这种binlog⽇志同步是需要严格有序性的,否则会导致数据错乱。那有没有办法能够保证顺序的情况下并发消费呢?答案是有的,即将指定数据发送到指定分区当中,然后起多个消费者消费不同分区的数据即可,并且Canal提供写⼊指定分区的配置
三、最终⽅案
1.修改Canal配置⽂件
在Canal配置⽂件中的mq config⾥⾯配置如下
# mq config
pic=hou-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.ur,mytest2\\..*,.*\\..*
# canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=demo\\.pricing_hou_info:hou_code
这⾥⾯主要配置了canal.mq.partitionsNum和canal.mq.partitionHash两个参数,他们的意思如下:
canal.mq.partitionsNum:指定当前topic的分区数
canal.mq.partitionHash:指定到分区的分区规则,可以细化到字段
这⾥我们指定topic有3个分区,并且使⽤pricing_hou_info表中的hou_code字段来进⾏划分,即让相同hou_code的数据全部到⼀个分区当中
2.修改项⽬代码
原先项⽬中只有⼀个消费,现在再添加两个消费的⽅法,让三个消费者能够消费不同分区的数据,通过@TopicPartition注解指定topic和对应的分区,并且可以同时消费多个分区的数据,三个消费者的groupId⼀定要保持⼀致,因为Kafka指定在⼀个group⾥⾯⼀条partition的消息只能被⼀个消费者消费
@Component
public class MessageListener {
@KafkaListener(topicPartitions ={@TopicPartition(topic ="hou-topic",partitions ={"0"})}, groupId ="hou-consumer-group")
public void partition1(ConsumerRecord<?,?> record){
receive(record);
}
@KafkaListener(topicPartitions ={@TopicPartition(topic ="hou-topic",partitions ={"1"})}, groupId ="hou-consumer-group")
public void partition2(ConsumerRecord<?,?> record){
receive(record);
}
@KafkaListener(topicPartitions ={@TopicPartition(topic ="hou-topic",partitions ={"2"})}, groupId ="hou-consumer-group")
public void partition3(ConsumerRecord<?,?> record){
receive(record);
}
private void receive(ConsumerRecord<?,?> record){
final String value =(String) record.value();
曹操大将FlatMessage flatMessage = JSONObject.parObject(value, FlatMessage.class);
final String houCode = Data().get(0).get("hou_code");
System.out.println("分区:"+record.partition()+"\t接收到数据的code:"+houCode+"\t操作类别:"+Type());
}
}
通过这样的⽅式我们可以确保相同hou_code的数据到同⼀个分区被同⼀个消费者有序消费且只消费⼀次,这样即可达到⽬的
3.整体架构
4.结果验证
全部配置修改好后重启Canal和项⽬,然后往数据库⾥写数据,控制台结果如下:
分区:0 接收到数据的code:CD000001 操作类别:INSERT
分区:1 接收到数据的code:CD000002 操作类别:INSERT
分区:0 接收到数据的code:CD000003 操作类别:INSERT
分区:1 接收到数据的code:CD000004 操作类别:INSERT
分区:2 接收到数据的code:CD000005 操作类别:INSERT
分区:0 接收到数据的code:CD000006 操作类别:INSERT
分区:2 接收到数据的code:CD000007 操作类别:INSERT
分区:0 接收到数据的code:CD000008 操作类别:INSERT
分区:1 接收到数据的code:CD000009 操作类别:INSERT
分区:0 接收到数据的code:CD000010 操作类别:INSERT
可以看到不同hou_code的数据被写⼊了不同的分区,并且三个消费者消费到了来⾃三个分区的数据,此时如果在hou_code为
CD000010的数据上进⾏修改,得到如下结果:
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
此时更进⼀步证明了同⼀个partition只存固定的hou_code的数据,保证了单个partition和消费者的消息有序性。
四、总结思考
利⽤Canal将数据根据字段写⼊不同分区且消费者消费指定分区数据,增加了消费的吞吐量,并且保证了单个消费者的消息有序性以及单条记录(同⼀hou_code的数据)的处理有序性,本⽅案是在单⾏数据基础上来进⾏分区匹配的,还可以在表和数据库的基础上进⾏分区匹配,修改Canal参数即可。
这个⽅案是通过增加消费者来提⾼MQ的吞吐量,使数据处理更快实时性更⾼,如果要更进⼀步提⾼吞吐量还有什么⽅法呢?这⾥我能够想到的是在Consumer这⾥接收到消息后将消息放⼊队列中⽽不是直接处理,然后再根据单个Consumer中该队列中的hou_code并发消费,结构如下:
但是就⽬前⽽⾔上⾯的⽅案已经⾜以解决问题,就不需要再将程序搞复杂了,待以后确实需要性能优化再考虑这样的⽅案吧
五、参考
Canal官⽅⽂档提供的相关配置
花卷怎么卷
canal.mq.partitionHash 表达式说明:
canal 1.1.3版本之后, ⽀持配置格式:schema.table:pk1^pk2,多个配置之间使⽤逗号分隔
例⼦1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
例⼦2:.\…:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
pk
例⼦3:.\…: 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(⾃动查找)
例⼦4: 匹配规则啥都不写,则默认发到0这个partition上
例⼦5:.\… ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
按表hash: ⼀张表的所有数据可以发到同⼀个分区,不同表之间会做散列 (会有热点表分区过⼤问题)
例⼦6: test\.test:id,.\…* , 针对test的表按照id散列,其余的表按照table散列
注意:⼤家可以结合⾃⼰的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进⾏匹配(命中⼀条规则就返回)
mq顺序性问题竹笋烧肉
binlog本⾝是有序的,写⼊到mq之后如何保障顺序是很多⼈会⽐较关注,在issue⾥也有⾮常多⼈咨询了类似的问题,这⾥做⼀个统⼀的解答
1. canal⽬前选择⽀持的kafka/rocketmq,本质上都是基于本地⽂件的⽅式来⽀持了分区级的顺序消息的能⼒,也就是binlog写⼊mq是可以有⼀些
好听的网名四个字顺序性保障,这个取决于⽤户的⼀些参数选择
2. canal⽀持MQ数据的⼏种路由⽅式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
3. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
单topic单分区,可以严格保证和binlog⼀样的顺序性,缺点就是性能⽐较慢,单分区的性能写⼊⼤概在2~3k的TPS
多topic单分区,可以保证表级别的顺序性,⼀张表或者⼀个库的所有数据都写⼊到⼀个topic的单分区中,可以保证有序性,针对热点表也存在写⼊分区的性能问题
单topic、多topic的多分区,如果⽤户选择的是指定table的⽅式,那和第⼆部分⼀样,保障的是表级别的顺序性(存在热点表写⼊分区的性能问题),如果⽤户选择的是指定pk hash的⽅式,那只能保障的是⼀个pk的多次binlog顺序性 ** pk hash的⽅式需要业务权衡,这⾥性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产⽣业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区⾥,业务消费就会有先后顺序的问题,需要注意折纸乌龟