概述
使用Flume采集数据时,我们可能会遇到各种场景,一个数据采集任务的标准配置都是Source->Channel->Sink。对于Channel组件的选择常用的有Memory Channel、File Channel。而我们都知道,Kafka组件在大数据平台的使用过程中是一个非常重要的角色,如果涉及到Flume和Kafka的交互大致也可以分为如下几种场景:
- Kafka作为数据源,将Kafka中的数据同步到其他组件中
- Kafka作为目标端,将其他如文件中的数据采集到Kafka的Topic中
- Kafka作为中转,将数据从Source采集到Sink中
对于以上3种场景,在配置Flume的job时,可能就涉及到不同的组件配置模板。是否每一种场景都需要配置Source->Channel->Sink呢?答案肯定是否定的。下文将分别介绍这几种场景的架构配置。
场景一:kafka作为数据源,将数据同步到hdfs
任务逻辑架构图
在这种场景中,因为数据事先就已经在kafka中,所以就无需配置source,直接采用Kakfa作为Channel,将数据写入到HDFS中。
Flume任务模板
## 组件
a1.channels=c2
a1.sinks=k2## Channel配置
a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.topic=channel_topic
a1.channels.c2.kafka.bootstrap.servers=192.168.0.1:9092
a1.channels.c2.kafka.consumer.group.id=channel_group
a1.channels.c2.parseAsFlumeEvent=false
a1.channels.c2.kafka.producer.security.protocol=SASL_PLAINTEXT
a1.channels.c2.kafka.producer.sasl.mechanism=GSSAPI
a1.channels.c2.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.c2.kafka.producer.acks=1# configure sinks
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://nameservice1/user/house/data/flume/test_hdfs_data/%Y-%m-%d/
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 24
a1.sinks.k2.hdfs.roundUnit = hour
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 20480000
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.callTimeout=600000
# kerberos配置
a1.sinks.k2.hdfs.kerberosPrincipal = XX@XXX
a1.sinks.k2.hdfs.kerberosKeytab = /data/keytab/XXX.keytab# 绑定Chennel和Sink
a1.sinks.k2.channel = c2
场景二:kafka作为目标端,将数据从文件采集到kafka中
任务逻辑架构图
Flume任务模板
## 组件
a1.sources=s2
a1.channels=c2## Source 配置
a1.sources.s2.type = TAILDIR
a1.sources.s2.positionFile = /data/flume/positionFile/xxl_log_to_kafka.json
a1.sources.s2.filegroups = f1
a1.sources.s2.filegroups.f1 = /data/logs/access.log## Channel配置
a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.topic=channel_topic
a1.channels.c2.kafka.bootstrap.servers=192.168.0.1:9092
a1.channels.c2.kafka.consumer.group.id=channel_group
a1.channels.c2.parseAsFlumeEvent=false
a1.channels.c2.kafka.producer.security.protocol=SASL_PLAINTEXT
a1.channels.c2.kafka.producer.sasl.mechanism=GSSAPI
a1.channels.c2.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.c2.kafka.producer.acks=1# 绑定Source和Channel
a1.sources.s2.channels = c2
场景三:将数据从日志文件中采集到HDFS,kafka作为中转
任务逻辑架构图
Flume任务模板
## 组件
a1.sources=s2
a1.channels=c2
a1.sinks=k2## Source 配置
a1.sources.s2.type = TAILDIR
a1.sources.s2.positionFile = /data/flume/positionFile/xxl_log_to_kafka.json
a1.sources.s2.filegroups = f1
a1.sources.s2.filegroups.f1 = /data/logs/access.log## Channel配置
a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.topic=channel_topic
a1.channels.c2.kafka.bootstrap.servers=192.168.0.1:9092
a1.channels.c2.kafka.consumer.group.id=channel_group
a1.channels.c2.parseAsFlumeEvent=false
a1.channels.c2.kafka.producer.security.protocol=SASL_PLAINTEXT
a1.channels.c2.kafka.producer.sasl.mechanism=GSSAPI
a1.channels.c2.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.c2.kafka.producer.acks=1# configure sinks
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://nameservice1/user/house/data/flume/test_hdfs_data/%Y-%m-%d/
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 24
a1.sinks.k2.hdfs.roundUnit = hour
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 20480000
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.callTimeout=600000
# kerberos配置
a1.sinks.k2.hdfs.kerberosPrincipal = XX@XXX
a1.sinks.k2.hdfs.kerberosKeytab = /data/keytab/XXX.keytab# 绑定Source和Channel
a1.sources.s2.channels = c2# 绑定Sink和Channel
a1.sinks.k2.channel = c2
总结
在使用Flume采集数据时,使用Kafka Channel有如下优点:
- 减少Flume对内存资源的消耗,使用默认的内存Channel时,数据存储在内存中,如果下游写入较慢,数据会一直堆积产生反压,甚至会导致内存过高而崩溃。
- 增加系统的可靠性,没有数据丢失风险。使用内存Channel时,如果Flume发生崩溃,内存Channel中没有写入到Sink端的数据会丢失。
- 性能远优于文件Channel,同时兼顾系统的可靠性,另外在配置Flume任务时可以依照上述架构方案减少Flume组件,缩短数据的处理流程。