这里写目录标题
- 一、Flume环境搭建
- 1.前提准备
- 2.搭建
- 二、Flume介绍
- 1.简介
- 2.Flume NG介绍
- Flume特点
- Flume的核心概念
- 3.Flume简单实用示例
- 安装netcat和telnet
- netcat 数据源测试 Flume
- Exec数据源测试Flume
- Spooling Directory Source
- Spooling Directory Source数据源测试Flume,并上传至HDFS
- Spooling Directory Source数据源带有过滤器
一、Flume环境搭建
1.前提准备
- hadoop环境搭建完成
可参考:Centos7中安装配置Hadoop(伪分布式搭建) - 安装包
apache-flume-1.6.0-bin.tar.gz ,需要自取:Flume安装包(提取码:6z6z)
2.搭建
- 将压缩包上传至/opt下
- 解压压缩包
tar -zxvf flume-ng-1.6.0-cdh5.14.0.tar.gz
- 修改名称
mv apache-flume-1.6.0-cdh5.14.0-bin/ flume160
- 修改配置
①进入conf目录:cd /opt/flume160/conf
②复制flume-env.sh.template --> flume-env.sh:cp ./flume-env.sh.template flume-env.sh
③修改 flume-env.sh
vi /opt/flume160/conf/ flume-env.sh
修改JAVA_HOME:export JAVA_HOME=/opt/jdk1.8.0_221
调整内存大小:export JAVA_OPTS="-Xms2048m -Xmx2048m -Dcom.sun.management.jmxremote"
二、Flume介绍
1.简介
Flume 是 cloudera 开发的实时日志收集系统,与之相似的还有facebook的scribe,apache的chukwa,linkedin的kafka等。
本次将对Flume进行介绍。Flume 初始的发行版本目前被统称为 Flume OG(original generation),但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来。为了解决这些问题,cloudera 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation),并纳入了apache旗下。
Flume 在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG,0.9.x的称为Flume OG。
2.Flume NG介绍
Flume特点
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source
-
Flume的可靠性
当节点出现故障时,日志能够被传送到其他节点上而不会丢失
Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end、Store on failure、Besteffort
①end-to-end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
②Store on failure:这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送。
③Besteffort:数据发送到接收方后,不会进行确认。 -
Flume的可恢复性
依靠Channel,Flume中的数据可以恢复。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。
Flume的核心概念
- Client:Client生产数据,运行在一个独立的线程
- Event: 一个数据单元,消息头和消息体组成。Events可以是日志记录、 avro 对象等。
- Flow: Event从源点到达目的点的迁移的抽象。
- Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。Agent使用JVM运行Flume,每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
- Source: 数据收集组件。Flume提供了各种source的实现,包括Avro Source、Exec Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果内置的Source无法满足需要, Flume还支持自定义Source
- Channel: 中转Event的一个临时存储,保存由Source组件传递过来的Event
- Sink: 从Channel中读取并移除Event,也可以将Event传递到FlowPipeline中的下一个Agent。Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。
3.Flume简单实用示例
安装netcat和telnet
- netcate
1.监听服务器端口,并与客户端通信(最多只能接收一个客户端)
2.对指定服务器进行端口扫描
3.作为客户端连接到远程服务器进行通信 - telnet
1.连接服务器端口,并进行通信
2.登录远程telnet服务器,使用命令行对其进行控制 - 安装命令
yum install -y nc
yum install telnet-server.x86_64
yum install telnet.x86_64
- 测试
开启两个窗口
一个充当server端,输入:nc -lk 7777
一个充当client端,输入:telnet localhost 7777
在客户端发送消息,服务端可以接收到,则成功
netcat 数据源测试 Flume
这个source十分像nc -k -l [host] [port]
这个命令,监听一个指定的端口,把从该端口收到的TCP协议的文本数据按行转换为Event,它能识别的是带换行符的文本数据,同其他Source一样,解析成功的Event数据会发送到channel中
- 在Flume的conf目录下新建一个文件夹,用于存放配置文件,我创建的为jobFile
mkdir /opt/flume160/conf/jobFile
- 在jobFile中新建 netcat-flume-loggere.conf
vi /opt/flume160/conf/jobFile/netcat-flume-logger.conf
# 配置Agent a1各个组件的名称
a1.sources=r1 #Agent a1 的source有一个,叫做r1
a1.channels=c1 #Agent a1 的channel也有一个,叫做c1
a1.sinks=k1 #Agent a1 的sink有一个,叫做k1# 配置Agent a1的source r1的属性
a1.sources.r1.type=netcat #使用的是NetCat TCP Source,这个的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名
a1.sources.r1.bind=localhost #NetCat TCP Source监听的hostname,这个是本机
a1.sources.r1.port=7777 #监听的端口# 配置Agent a1的sink k1的属性
a1.sinks.k1.type=logger # sink使用的是Logger Sink,这个配的也是别名# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的
a1.channels.c1.type=memory #channel的类型是内存channel
a1.channels.c1.capacity=1000 #缓冲数据容量
a1.channels.c1.transactionCapacity=1000 #事务容量# 把source和sink绑定到channel上
a1.sources.r1.channels=c1 #与source r1绑定的channel有一个,叫做c1
a1.sinks.k1.channel=c1 #与sink k1绑定的channel有一个,叫做c1
- 加载这个配置文件启动Flume
bin目录下的flume-ng是Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称。
进入flume目录执行以下语句:
// --conf/-c:表示配置文件存储在conf/目录
// --name/-n:表示给agent起名为a1,需要与自定义conf文件中的一致
// --conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件
// -Dflume.root.logger=INFO,console:-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error
./bin/flume-ng agent --name a1 --conf ./conf/ --conf-file ./conf/jobFile/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
- 打开另一个窗口,开启客户端
telnet localhost 7777
- 测试
Exec数据源测试Flume
这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据。 如果进程因任何原因退出, 则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果
- 在jobFile中新建 file-flume-logger.conf
vi /opt/flume160/conf/jobFile/file-flume-logger.conf
a2.sources=r1
a2.channels=c1
a2.sinks=k1
// 指定source的type为exec类型
a2.sources.r1.type=exec
// tail -f会把文件里的最尾部的内容显示在屏幕上,并且不断刷新
a2.sources.r1.command=tail -f /opt/bigdata/flume160/conf/jobFile/tmp/tmp.txta2.channels.c1.type=memory
a2.channels.c1.capacity=1000
a2.channels.c1.transactionCapacity=1000a2.sinks.k1.type=loggera2.sources.r1.channels=c1
a2.sinks.k1.channel=c1
- 加载配置文件启动Flume
./bin/flume-ng agent --name a2 --conf ./conf/ --conf-file ./conf/jobFile/file-flume-logger.conf -Dflume.root.logger=INFO,console
Spooling Directory Source
这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(1.txt被Flume收集完成后会重命名为1.txt.COMPLETED)。与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。
- 在jobFile中新建events-flume-logger.conf
`vi /opt/flume160/conf/jobFile/events-flume-logger.conf
events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSinkevents.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/bigdata/flume160/conf/jobFile/dataSourceFile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=10000
// 指定会被收集的文件名正则表达式,此处格式必须满足如:events_2020-12-03.csv形式
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csvevents.channels.eventsChannel.type=file
// 记录检查点的文件的存储目录
events.channels.eventsChannel.checkpointDir=/opt/bigdata/flume160/conf/jobFile/checkPointFile/events
用于存储data文件
events.channels.eventsChannel.dataDirs=/opt/bigdata/flume160/conf/jobFile/dataChannelFile/eventsevents.sinks.eventsSink.type=loggerevents.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel
- 加载配置文件启动Flume
./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/jobFile/events-flume-logger.conf -Dflume.root.logger=INFO,console
- 将符合要求的文件名称的文件传入dataSource/events下
cp ./events.csv /opt/bigdata/flume160/conf/jobkb09/dataSourceFile/events/events_2020-11-30.csv
Spooling Directory Source数据源测试Flume,并上传至HDFS
- 在jobFile中新建events-flume-logger2.conf
`vi /opt/flume160/conf/jobFile/events-flume-logger2.conf
events.sources=eSource
events.channels=eChannel
events.sinks=eSinkevents.sources.eSource.type=spooldir
events.sources.eSource.spoolDir=/opt/flumevents60/conf/jobFile/dataSourceFile/events2
events.sources.eSource.deserializer=LINE
events.sources.eSource.deserializer.maxLineLength=10000
events.sources.eSource.includePattern=es_[0-9]{4}-[0-9]{2}-[0-9]{2}.csvevents.channels.eChannel.type=file
events.channels.eChannel.checkpointDir=/opt/flumevents60/conf/jobFile/checkPointFile/events2
events.channels.eChannel.dataDirs=/opt/flumevents60/conf/jobFile/dataChannelFile/events2events.sinks.eSink.type=hdfs // 组件类型,这个Sink将Event写入Hadoop分布式文件系统
events.sinks.eSink.hdfs.fileType=DataStream // 文件格式
events.sinks.eSink.hdfs.filePrefix=events // Flume在HDFS文件夹下创建新文件的固定前缀
events.sinks.eSink.hdfs.fileSuffix=.csv // Flume在HDFS文件夹下创建新文件的后缀
events.sinks.eSink.hdfs.path=hdfs://192.168.233.133:9000/testfile/user/event/%Y-%m-%d // HDFS目录路径,%Y-%m-%d为日期转义符,可根据本地时间显示日期
events.sinks.eSink.hdfs.useLocalTimeStamp=true // 使用日期时间转义符时是否使用本地时间戳
events.sinks.eSink.hdfs.batchSize=640 // 读取并向channel发送数据时单次发送的最大数量
events.sinks.eSink.hdfs.rollInterval=20 // 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件)
events.sinks.eSink.hdfs.rollCount=0 // 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
events.sinks.eSink.hdfs.rollSize=120000000 // 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件)events.sources.eSource.channels=eChannel
events.sinks.eSink.channel=eChannel
- 加载配置文件启动Flume
./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/jobFile/events-flume-logger2.conf -Dflume.root.logger=INFO,console
- 将符合要求的文件名称的文件传入dataSource/events下
cp ./events.csv /opt/bigdata/flume160/conf/jobkb09/dataSourceFile/events2/events_2020-11-30.csv
Spooling Directory Source数据源带有过滤器
- 在jobFile中新建user-flume-hdfs.conf
vi user-flume-hdfs.conf
users.sources=usersSource
users.channels=usersChannel
users.sinks=usersSinkusers.sources.usersSource.type=spooldir
users.sources.usersSource.spoolDir=/opt/flume160/conf/jobFile/dataSourceFile/users
users.sources.usersSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.usersSource.deserializer=LINE
users.sources.usersSource.deserializer.maxLineLength=10000
users.sources.usersSource.interceptors=head_filter // 定义过滤器名称
users.sources.usersSource.interceptors.head_filter.type=regex_filter // 过滤器类型,正则过滤拦截器
users.sources.usersSource.interceptors.head_filter.regex=^user_id* // 过滤拦截内容:以user_id开头的文件
users.sources.usersSource.interceptors.head_filter.excludeEvents=true 如果为true,被正则匹配到的Event会被丢弃;如果为false,不被正则匹配到的Event会被丢弃users.channels.usersChannel.type=file
users.channels.usersChannel.checkpointDir=/opt/flume160/conf/jobFile/cheakPointFile/users
users.channels.usersChannel.dataDirs=/opt/flume160/conf/jobFile/dataChannelFile/usersusers.sinks.usersSink.type=hdfs
users.sinks.usersSink.hdfs.fileType=DataStream
users.sinks.usersSink.hdfs.filePrefix=users
users.sinks.usersSink.hdfs.fileSuffix=.csv
users.sinks.usersSink.hdfs.path=hdfs://192.168.233.133:9000/testfile/users/users/%Y-%m-%d
users.sinks.usersSink.hdfs.useLocalTimeStamp=true
users.sinks.usersSink.hdfs.batchSize=640
users.sinks.usersSink.hdfs.rollCount=0
users.sinks.usersSink.hdfs.rollSize=120000000
users.sinks.usersSink.hdfs.rollInterval=20users.sources.usersSource.channels=usersChannel
users.sinks.usersSink.channel=usersChannel
更多参数和使用方法可以参照Flume使用手册:Flume使用手册(英文版)、Flume使用手册(中文版)