billow

更新时间:2022-11-25 23:41:48 阅读: 评论:0


2022年11月25日发(作者:bumper)

阿⾥开源Canal--④投递数据到Kafka

基本说明

canal1.1.1版本之后,默认⽀持将canalrver接收到的binlog数据直接投递到MQ,⽬前默认⽀持的MQ系统有:

kafka:()

RocketMQ?)

##1.配置修改

基于前⾯第⼆章节内容我们搭建的canalrver(Billow⾃编译版本为1.1.3,⽀持动态Topic配置),我们在instance的配置⽂件中做配置

的修改。

###1.1.修改instance配置⽂件

viconf/example/ties

#按需修改成⾃⼰的数据库信息

#...

s=192.168.1.20:3306

#urname/password,数据库的⽤户名和密码

...

name=canal

word=canal

...

#mqconfig

=example

#针对库名或者表名发送动态topic

#cTopic=mytest,.*,,mytest..*,.*..*

ion=0

#hashpartitionconfig

#ionsNum=3

#库名.表名:唯⼀主键,多个表之间⽤逗号分隔

#ionHash=:id,:id

#对应ip地址的MySQL数据库需进⾏相关初始化与设置,可参考前⾯Billow发的⽂章:

dynamicTopic规则:表达式如果只有库名则匹配库名的数据都会发送到对应名称topic,如果是库名.表名则匹配的数据会发送到以’库名_

表名’为名称的topic。如要指定topic名称,则可以配置:

cTopic=examp2:.*;exmaple3:mytest..*,mytest2..*;example4:

以topic名‘:’正则规则作为配置,多个topic配置之间以';'隔开,message会发送到所有符合规则的topic

###1.2.修改canal配置⽂件

vi/usr/local/canal/conf/ties

#...

#可选项:tcp(默认),kafka,RocketMQ

Mode=kafka

#...

#kafka/rocketmq集群配置:192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092

s=127.0.0.1:6667

s=0

ize=16384

uestSize=1048576

Ms=1

Memory=33554432

#Canal的batchsize,默认50K,由于kafka最⼤消息体限制请勿超过1M(900K以下)

atchSize=50

#Canalget数据的超时时间,单位:毫秒,空为不限超时

etTimeout=100

#是否为flatjson格式对象

ssage=true

ssionType=none

=all

#kafka消息投递是否使⽤事务

ction=fal

mq相关参数说明

###cTopic表达式说明

canal1.1.3版本之后,⽀持配置格式:schema或,多个配置之间使⽤逗号分隔

例⼦1:指定匹配的单表,发送到以test_test为名字的topic上

例⼦2:.…*匹配所有表,每个表都会发送到各⾃表名的topic上

例⼦3:test指定匹配对应的库,⼀个库的所有表都会发送到库名的topic上

例⼦4:test.*指定匹配的表达式,针对匹配的表会发送到各⾃表名的topic上

例⼦5:test,1,指定多个表达式,会将test库的表都发送到test的topic上,1的表发送到对应的test1_test1

topic上,其余的表发送到默认的值

⽀持指定topic名称匹配,配置格式:topicName:schema或,多个配置之间使⽤逗号分隔,多组之间使⽤;分隔

例⼦:test:test,1;test2:test2,1针对匹配的表会发送到指定的topic上

⼤家可以结合⾃⼰的业务需求,设置匹配规则,建议MQ开启⾃动创建topic的能⼒

###表达式说明

canal1.1.3版本之后,⽀持配置格式::pk1^pk2,多个配置之间使⽤逗号分隔

例⼦1::pk1^pk2指定匹配的单表,对应的hash字段为pk1+pk2

例⼦2:.…*:id正则匹配,指定所有正则匹配的表对应的hash字段为id

例⼦3:.…*:正则匹配,指定所有正则匹配的表对应的hash字段为表主键(⾃动查找)

例⼦4:匹配规则啥都不写,则默认发到0这个partition上

例⼦5:.…*,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名

按表hash:⼀张表的所有数据可以发到同⼀个分区,不同表之间会做散列(会有热点表分区过⼤问题)

例⼦6::id,.…*,针对test的表按照id散列,其余的表按照table散列

注意:⼤家可以结合⾃⼰的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进⾏匹配(命中⼀条规则就返回)

###mq顺序性问题

binlog本⾝是有序的,写⼊到mq之后如何保障顺序是很多⼈会⽐较关注,在issue⾥也有⾮常多⼈咨询了类似的问题,这⾥做⼀个统⼀的解

⽬前选择⽀持的kafka/rocketmq,本质上都是基于本地⽂件的⽅式来⽀持了分区级的顺序消息的能⼒,也就是binlog写⼊mq

是可以有⼀些顺序性保障,这个取决于⽤户的⼀些参数选择

⽀持MQ数据的⼏种路由⽅式:单topic单分区,单topic多分区、多topic单分区、多topic多分区

cTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默

认topicname

ionsNum、ionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做

到按表级做分区、pk级做分区等

的消费顺序性,主要取决于描述2中的路由选择,举例说明:

单topic单分区,可以严格保证和binlog⼀样的顺序性,缺点就是性能⽐较慢,单分区的性能写⼊⼤概在2~3k的TPS

多topic单分区,可以保证表级别的顺序性,⼀张表或者⼀个库的所有数据都写⼊到⼀个topic的单分区中,可以保证有序性,针对热点

表也存在写⼊分区的性能问题

单topic、多topic的多分区,如果⽤户选择的是指定table的⽅式,那和第⼆部分⼀样,保障的是表级别的顺序性(存在热点表写⼊分区

的性能问题),如果⽤户选择的是指定pkhash的⽅式,那只能保障的是⼀个pk的多次binlog顺序性**pkhash的⽅式需要业务权衡,

这⾥性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产⽣业务处理错乱的情况.如果有pk变更,pk变更前和

变更后的值会落在不同的分区⾥,业务消费就会有先后顺序的问题,需要注意

pk

###1.4.启动

cd/usr/local/canal/

shbin/

1.5.查看⽇志

a.查看logs/canal/

vilogs/canal/

b.查看instance的⽇志:

vilogs/example/

1.6关闭

cd/usr/local/canal/

shbin/

数据消费

canal源码中有实例代码;如下

publicclassCanalKafkaClientExample{

protectedfinalstaticLoggerlogger=ger();

privateKafkaCanalConnectorconnector;

privatestaticvolatilebooleanrunning=fal;

privateThreadthread=null;

htExceptionHandlerhandler=htExceptionHandler(){

publicvoiduncaughtException(Threadt,Throwablee){

("pareventshasanerror",e);

}

};

publicCanalKafkaClientExample(StringzkServers,Stringrvers,Stringtopic,Integerpartition,StringgroupId){

connector=newKafkaCanalConnector(rvers,topic,partition,groupId,null,fal);

}

publicstaticvoidmain(String[]args){

try{

finalCanalKafkaClientExamplekafkaCanalClientExample=newCanalKafkaClientExample(

ers,

s,

,

ion,

d);

("##startthekafkaconsumer:{}-{}",,d);

();

("##thecanalkafkaconsumerisrunningnow......");

time().addShutdownHook(newThread(){

publicvoidrun(){

try{

("##stopthekafkaconsumer");

();

}catch(Throwablee){

("##somethinggoeswrongwhenstoppingkafkaconsumer:",e);

}finally{

("##kafkaconsumerisdown.");

}

}

});

while(running)

;

}catch(Throwablee){

("##Somethinggoeswrongwhenstartingupthekafkaconsumer:",e);

(0);

}

}

publicvoidstart(){

l(connector,"connectorisnull");

thread=newThread(newRunnable(){

publicvoidrun(){

process();

}

});

aughtExceptionHandler(handler);

();

running=true;

}

publicvoidstop(){

if(!running){

return;

}

running=fal;

if(thread!=null){

try{

();

}catch(InterruptedExceptione){

//ignore

}

}

}

privatevoidprocess(){

while(!running){

try{

(1000);

}catch(InterruptedExceptione){

}

}

while(running){

try{

t();

ibe();

while(running){

try{

Listmessages=tWithoutAck(100L,ECONDS);//获取message

if(messages==null){

continue;

}

for(Messagemessage:messages){

longbatchId=();

intsize=ries().size();

if(batchId==-1||size==0){

//try{

//(1000);

//}catch(InterruptedExceptione){

//}

}el{

//printSummary(message,batchId,size);

//printEntry(ries());

(ng());

}

}

();//提交确认

}catch(Exceptione){

(sage(),e);

}

}

}catch(Exceptione){

(sage(),e);

}

}

try{

cribe();

}catch(WakeupExceptione){

//ueprocess

}

nect();

}

}

本文发布于:2022-11-25 23:41:48,感谢您对本站的认可!

本文链接:http://www.wtabcd.cn/fanwen/fan/90/21450.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

上一篇:larger than life
下一篇:when you belive
标签:billow
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图