关于Flume断点续传(防⽌重复消费)的解决⽅案
背景:
前段时间写了个 ,其中我们是使⽤ exec source执⾏ tail命令来监控采集⽇志的,但这样做会存在⼀些问题:如果agent进程突然挂了,下次重启采集任务,会导致⽇志⽂件内容重复采集,虽然进程挂了这种事情不常发⽣,当我们还是要尽量避免因此带来的负⾯影响!
⼀、⽅案选择
和⼀些朋友交流过Flume断点续传问题,他们往往是⾃⼰修改source源码,写⼀个⾃定义的source,继承 AbstractSource 、实现EventDrivenSource,Configurable接⼝;这种⽅案还不错,可以达到⽬的,但存在两个不便之处:⼀是⾃⼰造轮⼦需要开发、维护成本,⼆是如果团队⽔平不⾜可能导致后续各种bug。
那么Flume有没有提供现成的轮⼦可以解决此问题呢?答案是有的,那就是 Taildir Source。
我们进⼊Flume官⽹,可以在Version 1.7.0 的 Changes ⾥看到:
那么 Taildir Source有什么特点呢?
翻译如下:
注意:此source作为预览功能提供。它不适⽤于Windows。
观察指定的⽂件,并在检测到新⾏被添加到每个⽂件后能⼏乎实时地tail它们。如果正在写⼊新⾏,则此source将重试读取它们以等待写⼊完成。
此source是可靠的,即使tail的⽂件轮替也不会丢失数据。它定期以JSON格式写⼊给定位置⽂件上每个⽂件的最后读取位置。如果Flume由于某种原因stop或down,它可以从⽂件position处重新开始tail。
在其他⽤法中,此source也可以通过给定的position⽂件从每个⽂件的任意位置开始tail。当指定路径上没有position⽂件时,默认情况下它将从每个⽂件的第⼀⾏开始tail。
⽂件将按修改时间顺序使⽤。将⾸先使⽤具有最早修改时间的⽂件。
此source不会重命名或删除或对正在tail的⽂件执⾏任何修改。⽬前此source不⽀持tail⼆进制⽂件。它只能逐⾏读取⽂本⽂件。
我们可以发现,其功能的重点就在于有⼀个记录采集⽂件position记录,每次重新采集都可以从该记录中获取上⼀次的position,接着上次往后采集,也就是能解决断点续传的问题!
⼆、具体配置
在Flume的conf⽬录下创建配置⽂件:f,内容如下:
pro.sources = s1
pro.channels = c1
晋朝名人
pro.sinks = k1
pro.pe = TAILDIR
pro.sources.s1.positionFile =/home/dev/flume/flume-1.8.0/log/taildir_position.json
pro.sources.s1.filegroups = f1
pro.sources.s1.filegroups.f1 =/home/dev/log/moercredit/logstash.log
pro.sources.s1.headers.f1.headerKey1 = aaa
pro.sources.s1.fileHeader =true
pro.pe = memory
pro.channels.c1.capacity =1000
pro.ansactionCapacity =100
pro.pe = org.apache.flume.sink.kafka.KafkaSink
pro.sinks.pic = moercredit_log_test
pro.sinks.k1.kafka.bootstrap.rvers = cdh1:9092,cdh2:9092,cdh3:9092
pro.sinks.k1.kafka.flumeBatchSize =20
pro.sinks.k1.kafka.producer.acks =1
pro.sinks.k1.kafka.producer.linger.ms =1
pro.sinks.k1.pe = snappy
pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1
对⽐上篇博客只修改了source部分,这个应该能⼀眼看懂意思。
要注意的是 filegroups是⼀组⽂件,可以以空格分隔,也⽀持正则表达式。
红军长征手抄报
该source具体参数含义可以看官⽹:
三、使⽤测试及深⼊理解
建议看完,会理解更深刻⼀些。
bin⽬录下执⾏命令:
nohup ./flume-ng agent -n pro -c ../conf/-f ../f >/dev/null 2>&1&执⾏后发现在当前⽬录下产⽣了⼀个logs⽬录,⾥⾯有⼀个flume.log⽂件,部分内容如下:
19 Apr 2019 14:41:03,800 INFO [conf-file-poller-0] (org.f.FlumeConfiguration.validateConfiguration:140) - Post-validation flume configur ation contains configuration for agents: [pro]
19 Apr 2019 14:41:03,800 INFO [conf-file-poller-0] (org.de.AbstractConfigurationProvider.loadChannels:147) - Creating channels
19 Apr 2019 14:41:03,807 INFO [conf-file-poller-0] (org.apache.flume.ate:42) - Creating instance of channel c1 type m emory
19 Apr 2019 14:41:03,816 INFO [conf-file-poller-0] (org.de.AbstractConfigurationProvider.loadChannels:201) - Created channel c1
19 Apr 2019 14:41:03,817 INFO [conf-file-poller-0] (org.apache.flume.ate:41) - Creating instance of source s1, type TAIL DIR
pc机属于
19 Apr 2019 14:41:03,908 INFO [conf-file-poller-0] (org.apache.flume.ate:42) - Creating instance of sink: k1, type: org.apache. flume.sink.kafka.KafkaSink
19 Apr 2019 14:41:03,916 INFO [conf-file-poller-0] (org.apache.flume.sink.figure:314) - Using the static topic moercredit_log_test. Thi s may be overridden by event headers
19 Apr 2019 14:41:03,929 INFO [conf-file-poller-0] (org.Configuration:116) - Channel c1 connected to [s1, k1]
19 Apr 2019 14:41:03,937 INFO [conf-file-poller-0] (org.de.Application.startAllComponents:137) - Starting new configuration:{ sourceRun ners:{s1=PollableSourceRunner: { source:Taildir source: { positionFile: /home/dev/flume/flume-1.8.0/log/taildir_position.json, skipToEnd: fal, byteOfftH eader: fal, idleTimeout: 120000, writePosInterval: 3000 } counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flu me.sink.DefaultSinkProcessor@30010525 counterGroup:{ n
ame:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} } 19 Apr 2019 14:41:03,938 INFO [conf-file-poller-0] (org.de.Application.startAllComponents:144) - Starting Channel c1
19 Apr 2019 14:41:04,011 INFO [lifecycleSupervisor-1-0] (org.apache.flume.ister:119) - Monitored counter gr oup for type: CHANNEL, name: c1: Successfully registered new MBean.
19 Apr 2019 14:41:04,011 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95) - Component type: CHAN NEL, name: c1 started
19 Apr 2019 14:41:04,011 INFO [conf-file-poller-0] (org.de.Application.startAllComponents:171) - Starting Sink k1
19 Apr 2019 14:41:04,012 INFO [conf-file-poller-0] (org.de.Application.startAllComponents:182) - Starting Source s1
19 Apr 2019 14:41:04,014 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.TaildirSou
rce.start:92) - s1 TaildirSource source starting with di rectory: {f1=/home/dev/log/moercredit/logstash.log}
19 Apr 2019 14:41:04,018 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>:83) - taildirCache: [{filegroup ='f1', filePattern='/home/dev/log/moercredit/logstash.log', cached=true}]
19 Apr 2019 14:41:04,024 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>:84) - headerTable: {f1={head erKey1=aaa}}
19 Apr 2019 14:41:04,029 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile:283) - Opening file: /home /dev/log/moercredit/logstash.log, inode: 807943550, pos: 0
19 Apr 2019 14:41:04,031 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>:94) - Updating position from position file: /home/dev/flume/flume-1.8.0/log/taildir_position.json
19 Apr 2019 14:41:04,031 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.taildir.ReliableTaildirEventReader.loadPositionFile:144) - File not foun d: /home/dev/flume/flume-1.8.0/log/taildir_position.json, not updating position
19 Apr 2019 14:41:04,033 INFO [lifecycleSupervisor-1-0] (org.apache.flume.ister:119) - Monitored counter gr oup for type: SOURCE, name: s1: Successfully registered new MBean.
19 Apr 2019 14:41:04,033 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:95) - Component type: SOUR CE, name: s1 started
通过该⽇志我们可以详细看到flume运⾏过程,我们重点关注这⼏⾏:
Opening file:/home/dev/log/moercredit/logstash.log, inode:807943550, pos:0
Updating position from position file:/home/dev/flume/flume-1.8.0/log/taildir_position.json
File not found:/home/dev/flume/flume-1.8.0/log/taildir_position.json, not updating position
第⼀次执⾏该flume agent进程,先找到待采集的⽇志⽂件inode为807943550,然后会创建taildir_position.json⽂件将pos更新其中,进程运⾏后会马上采集完该⽇志⽂件,并更新position,我们此时查看下taildir_position.json⽂件内容:
[{"inode":807943550,"pos":579077,"file":"/home/dev/log/moercredit/logstash.log"}]
其实就是个json array,每个采集⽂件对应⼀个数组元素,每个元素包含三个属性:inode(⽂件唯⼀标识号码)、pos(被采集⽂件的最后采集位置,也就是⽂件的byte字节数)、file(被采集⽂件的绝对路径)
扩展知识:
除了⽂件名以外的所有⽂件元信息,都存在inode之中,每个inode都有⼀个号码,操作系统⽤inode号码来识别不同的⽂件。
这⾥值得重复⼀遍,Unix/linux系统内部不使⽤⽂件名,⽽使⽤inode号码来识别⽂件。对于系统来说,⽂件名只是inode号码便于识别的别称或者绰号。
表⾯上,⽤户通过⽂件名,打开⽂件。实际上,系统内部这个过程分成三步:⾸先,系统找到这个⽂件名对应的inode号码;其次,通过inode号码,获取inode信息;最后,根据inode信息,找到⽂件数
据所在的block,读出数据。
使⽤ls -i命令,可以看到⽂件名对应的inode号码:
[dev@localhost log]$ ls -i
taildir_position.json 542922380 taildir_position.json
欢快音乐
或者通过stat命令查看inode元信息:
[dev@localhost log]$ stat taildir_position.json
File: ‘taildir_position.json’ Size: 81 Blocks: 8 IO
Block: 4096 regular file Device: fd02h/64770d Inode: 542922380
Links: 1 Access: (0664/-rw-rw-r–) Uid: ( 1000/ dev) Gid: (
1000/ dev) Context: unconfined_u:object_r:ur_home_t:s0 Access:
2019-04-19 15:18:19.034511139 +0800 Modify: 2019-04-19
15:19:24.806511139 +0800 Change: 2019-04-19 15:19:24.806511139 +0800
Birth: -
由于inode号码与⽂件名分离,这种机制导致了⼀些Unix/Linux系统特有的现象。
1. 有时,⽂件名包含特殊字符,⽆法正常删除。这时,直接删除inode节点,就能起到删除⽂件的作⽤。
2. 移动⽂件或重命名⽂件,只是改变⽂件名,不影响inode号码。
3. 打开⼀个⽂件以后,系统就以inode号码来识别这个⽂件,不再考虑⽂件名。
因此,通常来说,系统⽆法从inode号码得知⽂件名。 第3点使得软件更新变得简单,可以在不关闭软件的情况下进⾏更新,不需要重启。因为系统通过inode号码,识别运⾏中的⽂件,不通过⽂件名。更新的时候,新版⽂件以同样的⽂件名,⽣成⼀个新的inode,不会影响到运⾏中的⽂件。等到下⼀次运⾏这个软件的时候,⽂件名就⾃动指向新版⽂件,旧版⽂件的inode则被回收。
我们测试下,如果flume进程down了,重启是否会重复消费:
⽬前topic数据为1661条。
[{"inode":807943550,"pos":585006,"file":"/home/dev/log/moercredit/logstash.log"}杭州合众
重启后:
topic数据为1663条,并未重复消费(这两条是操作时新增的数据),达到了断点续传的⽬的!
查看flume.log:
有关颜色的成语Updating position from position file:/home/dev/flume/flume-1.8.0/log/taildir_position.json
Updated position, file:/home/dev/log/moercredit/logstash.log, inode:807943550, pos:585006
这时问题来了:
1.该source是根据⽂件名还是inode采集对应⽂件呢?
2.读取taildir_position.json⽂件中既有inode也有filepath,到底以谁为主?
先看第⼀个问题,因为conf中的source配置的是⽂件路径:
pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
故猜测是根据⽂件名来采集⽂件的,即⽂件名改了,会导致采集中断,再新建⼀个⽂件名和原来⼀样的⽂件,会采集新的⽂件
保安员管理制度测试:
带雨的字
[dev@localhost moercredit]$ mv logstash.log logstash.log.bak
此时观察到flume.log发现改名后的⽂件被关闭,⽽且taildir_position.json中记录消失: