Flume
概述
一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。基于流式架构,灵活简单。
可以实时读取服务器本地磁盘的数据,将数据写入到HDFS。
组件
source
收集数据 以event为单元进行封装发送给channel
常见的source有:
1)
netcat tcp source
:用来监听端口数据
2)exec source
:监听单个追加文件,支持文件实时追加,不能断点续度
3)spooling Directory Source
:监听目录下新增文件,不支持实时
4)Taildir Source
:监听目录下新增文件以及追加文件,支持实时,可以断点续读
5)kafka source
:监听Kafka topics
channel
source 和 sink之间的缓冲区
是线程安全的
允许source和sink运作在不同的速率上
可以同时处理多个source的写入和多个sink的读取操作
Flume自带两种channel:
Memory channel:内存中的队列
使用场景:不需要关心内存丢失的情况下
因为程序死亡和机器宕机或者重启都会造成数据丢失
File Channel:将所有事件写到磁盘
使用场景:需要关心数据丢失的情况
因为事件被写入到磁盘 所以程序关闭、宕机并不会造成数据丢失
sink
不断从channel取出数据,发送到设定的目标地
常用的sink:
1)
logger sink
:将数据写入日志 主要用来测试
2)hdfs sink
:将输出写到hdfs上 工作中主要使用
3)Avro sink
:将数据发送到其他的Flume
4)File Roll sink
:将数据保存到本地磁盘
事务
一个 event 在 Flume agent 的 channel 中被分阶段处理。
Sink将该event传递给流程中的下一个agent或终端存储库(如HDFS)。
只有在 event 被存储到下一个 agent 的 channel中或存储到终端存储库中后,Sink 才会从通道中删除 event 。
这就是 Flume 中的单跳消息传递语义如何提供流的端到端可靠性。
Flume 使用事务性方法来保证事件的可靠交付。
sources和sinks将events的存储/检索封装在一个由channel提供的transaction中。
这确保了events在流中从一个点到另一个点可靠地传递。
在多跳流的情况下,上一跳的sink和下一跳的source都打开他们的transactionas,以确保event数据安全地存储在下一跳的channel中。
source使用ChannelProcessor来管理事务,sink通过其配置的Channel管理它们。
数据丢失
Flume是不可能丢失数据的。
source:有事务支持,不会丢失。
sink:有事务支持,不会丢失。
channel:
采用memoryChannel,agent宕机导致数据丢失。
存储数据满了,source不能写了,未写入的丢失。
Flume不会丢失数据,但是有可能造成数据的重复。
例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。
拦截器
静态拦截器:
//如果没有使用静态拦截器
Event: { headers:{} body: 36 Sun Jun 2 18:26 }//使用静态拦截器之后 自己添加kv标识对
Event: { headers:{type=access} body: 36 Sun Jun 2 18:26 }
Event: { headers:{type=nginx} body: 36 Sun Jun 2 18:26 }
Event: { headers:{type=web} body: 36 Sun Jun 2 18:26 }//后续在存放数据的时候可以使用flume的规则语法获取到拦截器添加的kv内容
%{type}
自定义ETL拦截器
主要是用来判断json数据是否完整,将Json不完整的数据过滤掉。没有做复杂的清洗操作主要是防止过多的操作降低传输速率。
自定义时间戳拦截器
主要是解决零点漂移问题
hdfs sink默认会用linux系统时间,作为输出到HDFS路径的时间。
获取日志产生的实际时间,将时间写入拦截器的header头,header的key必须是timestamp,flume框架会根据key的值识别为时间,写入到HDFS
自定义拦截器步骤
//1.实现 Interceptor
public class xxx implements Interceptor {//2.重写四个方法@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {}@Overridepublic List<Event> intercept(List<Event> list) {}@Overridepublic void close() {}//3.静态内部类,实现 Interceptor.Builderpublic static class Builder implements Interceptor.Builder {}}
选择器
Replicating Channel Selector (default)
Load Balancing Channel Selector
Multiplexing Channel Selector
Custom Channel Selector
Replicating 会将source过来的events发往所有channel,而 Multiplexing可以选择该发往哪些Channel。
监控器
Ganglia
调优
source
batchSize:source一次批量运输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。
channel
Memory channel 的性能更好 但是如果出现进程意外挂掉会丢失数据
File channel:容错性更好 但是性能会比Memory channel差
capacity:channel可容纳最大的event条数
transactionCapacity:每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数
sink
batchSize:Sink一次批量从Channel读取的event条数。适当调大这个参数可以提高Sink从Channel搬出event的性能。