阿⾥开源Canal--④投递数据到Kafka
基本说明
canal1.1.1版本之后,默认⽀持将canalrver接收到的binlog数据直接投递到MQ,⽬前默认⽀持的MQ系统有:
kafka:()
RocketMQ?)
##1.配置修改
基于前⾯第⼆章节内容我们搭建的canalrver(Billow⾃编译版本为1.1.3,⽀持动态Topic配置),我们在instance的配置⽂件中做配置
的修改。
###1.1.修改instance配置⽂件
viconf/example/ties
#按需修改成⾃⼰的数据库信息
#...
s=192.168.1.20:3306
#urname/password,数据库的⽤户名和密码
...
name=canal
word=canal
...
#mqconfig
=example
#针对库名或者表名发送动态topic
#cTopic=mytest,.*,,mytest..*,.*..*
ion=0
#hashpartitionconfig
#ionsNum=3
#库名.表名:唯⼀主键,多个表之间⽤逗号分隔
#ionHash=:id,:id
#对应ip地址的MySQL数据库需进⾏相关初始化与设置,可参考前⾯Billow发的⽂章:
dynamicTopic规则:表达式如果只有库名则匹配库名的数据都会发送到对应名称topic,如果是库名.表名则匹配的数据会发送到以’库名_
表名’为名称的topic。如要指定topic名称,则可以配置:
cTopic=examp2:.*;exmaple3:mytest..*,mytest2..*;example4:
以topic名‘:’正则规则作为配置,多个topic配置之间以';'隔开,message会发送到所有符合规则的topic
###1.2.修改canal配置⽂件
vi/usr/local/canal/conf/ties
#...
#可选项:tcp(默认),kafka,RocketMQ
Mode=kafka
#...
#kafka/rocketmq集群配置:192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
s=127.0.0.1:6667
s=0
ize=16384
uestSize=1048576
Ms=1
Memory=33554432
#Canal的batchsize,默认50K,由于kafka最⼤消息体限制请勿超过1M(900K以下)
atchSize=50
#Canalget数据的超时时间,单位:毫秒,空为不限超时
etTimeout=100
#是否为flatjson格式对象
ssage=true
ssionType=none
=all
#kafka消息投递是否使⽤事务
ction=fal
mq相关参数说明
###cTopic表达式说明
canal1.1.3版本之后,⽀持配置格式:schema或,多个配置之间使⽤逗号分隔
例⼦1:指定匹配的单表,发送到以test_test为名字的topic上
例⼦2:.…*匹配所有表,每个表都会发送到各⾃表名的topic上
例⼦3:test指定匹配对应的库,⼀个库的所有表都会发送到库名的topic上
例⼦4:test.*指定匹配的表达式,针对匹配的表会发送到各⾃表名的topic上
例⼦5:test,1,指定多个表达式,会将test库的表都发送到test的topic上,1的表发送到对应的test1_test1
topic上,其余的表发送到默认的值
⽀持指定topic名称匹配,配置格式:topicName:schema或,多个配置之间使⽤逗号分隔,多组之间使⽤;分隔
例⼦:test:test,1;test2:test2,1针对匹配的表会发送到指定的topic上
⼤家可以结合⾃⼰的业务需求,设置匹配规则,建议MQ开启⾃动创建topic的能⼒
###表达式说明
canal1.1.3版本之后,⽀持配置格式::pk1^pk2,多个配置之间使⽤逗号分隔
例⼦1::pk1^pk2指定匹配的单表,对应的hash字段为pk1+pk2
例⼦2:.…*:id正则匹配,指定所有正则匹配的表对应的hash字段为id
例⼦3:.…*:正则匹配,指定所有正则匹配的表对应的hash字段为表主键(⾃动查找)
例⼦4:匹配规则啥都不写,则默认发到0这个partition上
例⼦5:.…*,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
按表hash:⼀张表的所有数据可以发到同⼀个分区,不同表之间会做散列(会有热点表分区过⼤问题)
例⼦6::id,.…*,针对test的表按照id散列,其余的表按照table散列
注意:⼤家可以结合⾃⼰的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进⾏匹配(命中⼀条规则就返回)
###mq顺序性问题
binlog本⾝是有序的,写⼊到mq之后如何保障顺序是很多⼈会⽐较关注,在issue⾥也有⾮常多⼈咨询了类似的问题,这⾥做⼀个统⼀的解
答
⽬前选择⽀持的kafka/rocketmq,本质上都是基于本地⽂件的⽅式来⽀持了分区级的顺序消息的能⼒,也就是binlog写⼊mq
是可以有⼀些顺序性保障,这个取决于⽤户的⼀些参数选择
⽀持MQ数据的⼏种路由⽅式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
cTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默
认topicname
ionsNum、ionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做
到按表级做分区、pk级做分区等
的消费顺序性,主要取决于描述2中的路由选择,举例说明:
单topic单分区,可以严格保证和binlog⼀样的顺序性,缺点就是性能⽐较慢,单分区的性能写⼊⼤概在2~3k的TPS
多topic单分区,可以保证表级别的顺序性,⼀张表或者⼀个库的所有数据都写⼊到⼀个topic的单分区中,可以保证有序性,针对热点
表也存在写⼊分区的性能问题
单topic、多topic的多分区,如果⽤户选择的是指定table的⽅式,那和第⼆部分⼀样,保障的是表级别的顺序性(存在热点表写⼊分区
的性能问题),如果⽤户选择的是指定pkhash的⽅式,那只能保障的是⼀个pk的多次binlog顺序性**pkhash的⽅式需要业务权衡,
这⾥性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产⽣业务处理错乱的情况.如果有pk变更,pk变更前和
变更后的值会落在不同的分区⾥,业务消费就会有先后顺序的问题,需要注意
pk
###1.4.启动
cd/usr/local/canal/
shbin/
1.5.查看⽇志
a.查看logs/canal/
vilogs/canal/
b.查看instance的⽇志:
vilogs/example/
1.6关闭
cd/usr/local/canal/
shbin/
数据消费
canal源码中有实例代码;如下
publicclassCanalKafkaClientExample{
protectedfinalstaticLoggerlogger=ger();
privateKafkaCanalConnectorconnector;
privatestaticvolatilebooleanrunning=fal;
privateThreadthread=null;
htExceptionHandlerhandler=htExceptionHandler(){
publicvoiduncaughtException(Threadt,Throwablee){
("pareventshasanerror",e);
}
};
publicCanalKafkaClientExample(StringzkServers,Stringrvers,Stringtopic,Integerpartition,StringgroupId){
connector=newKafkaCanalConnector(rvers,topic,partition,groupId,null,fal);
}
publicstaticvoidmain(String[]args){
try{
finalCanalKafkaClientExamplekafkaCanalClientExample=newCanalKafkaClientExample(
ers,
s,
,
ion,
d);
("##startthekafkaconsumer:{}-{}",,d);
();
("##thecanalkafkaconsumerisrunningnow......");
time().addShutdownHook(newThread(){
publicvoidrun(){
try{
("##stopthekafkaconsumer");
();
}catch(Throwablee){
("##somethinggoeswrongwhenstoppingkafkaconsumer:",e);
}finally{
("##kafkaconsumerisdown.");
}
}
});
while(running)
;
}catch(Throwablee){
("##Somethinggoeswrongwhenstartingupthekafkaconsumer:",e);
(0);
}
}
publicvoidstart(){
l(connector,"connectorisnull");
thread=newThread(newRunnable(){
publicvoidrun(){
process();
}
});
aughtExceptionHandler(handler);
();
running=true;
}
publicvoidstop(){
if(!running){
return;
}
running=fal;
if(thread!=null){
try{
();
}catch(InterruptedExceptione){
//ignore
}
}
}
privatevoidprocess(){
while(!running){
try{
(1000);
}catch(InterruptedExceptione){
}
}
while(running){
try{
t();
ibe();
while(running){
try{
List
if(messages==null){
continue;
}
for(Messagemessage:messages){
longbatchId=();
intsize=ries().size();
if(batchId==-1||size==0){
//try{
//(1000);
//}catch(InterruptedExceptione){
//}
}el{
//printSummary(message,batchId,size);
//printEntry(ries());
(ng());
}
}
();//提交确认
}catch(Exceptione){
(sage(),e);
}
}
}catch(Exceptione){
(sage(),e);
}
}
try{
cribe();
}catch(WakeupExceptione){
//ueprocess
}
nect();
}
}
本文发布于:2022-11-25 23:41:48,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/90/21450.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |