Flume实战:Kafka Channel的使用配置场景

embedded/2025/3/22 9:22:04/

概述

使用Flume采集数据时,我们可能会遇到各种场景,一个数据采集任务的标准配置都是Source->Channel->Sink。对于Channel组件的选择常用的有Memory Channel、File Channel。而我们都知道,Kafka组件在大数据平台的使用过程中是一个非常重要的角色,如果涉及到Flume和Kafka的交互大致也可以分为如下几种场景:

  • Kafka作为数据源,将Kafka中的数据同步到其他组件中
  • Kafka作为目标端,将其他如文件中的数据采集到Kafka的Topic中
  • Kafka作为中转,将数据从Source采集到Sink中

对于以上3种场景,在配置Flume的job时,可能就涉及到不同的组件配置模板。是否每一种场景都需要配置Source->Channel->Sink呢?答案肯定是否定的。下文将分别介绍这几种场景的架构配置。

场景一:kafka作为数据源,将数据同步到hdfs

任务逻辑架构图

在这种场景中,因为数据事先就已经在kafka中,所以就无需配置source,直接采用Kakfa作为Channel,将数据写入到HDFS中。

Flume任务模板

## 组件
a1.channels=c2
a1.sinks=k2## Channel配置
a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.topic=channel_topic
a1.channels.c2.kafka.bootstrap.servers=192.168.0.1:9092
a1.channels.c2.kafka.consumer.group.id=channel_group
a1.channels.c2.parseAsFlumeEvent=false
a1.channels.c2.kafka.producer.security.protocol=SASL_PLAINTEXT
a1.channels.c2.kafka.producer.sasl.mechanism=GSSAPI
a1.channels.c2.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.c2.kafka.producer.acks=1# configure sinks
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://nameservice1/user/house/data/flume/test_hdfs_data/%Y-%m-%d/
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 24
a1.sinks.k2.hdfs.roundUnit = hour
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 20480000
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.callTimeout=600000
# kerberos配置
a1.sinks.k2.hdfs.kerberosPrincipal = XX@XXX
a1.sinks.k2.hdfs.kerberosKeytab = /data/keytab/XXX.keytab# 绑定Chennel和Sink
a1.sinks.k2.channel = c2

场景二:kafka作为目标端,将数据从文件采集到kafka

任务逻辑架构图

Flume任务模板

## 组件
a1.sources=s2
a1.channels=c2## Source 配置
a1.sources.s2.type = TAILDIR
a1.sources.s2.positionFile = /data/flume/positionFile/xxl_log_to_kafka.json
a1.sources.s2.filegroups = f1
a1.sources.s2.filegroups.f1 = /data/logs/access.log## Channel配置
a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.topic=channel_topic
a1.channels.c2.kafka.bootstrap.servers=192.168.0.1:9092
a1.channels.c2.kafka.consumer.group.id=channel_group
a1.channels.c2.parseAsFlumeEvent=false
a1.channels.c2.kafka.producer.security.protocol=SASL_PLAINTEXT
a1.channels.c2.kafka.producer.sasl.mechanism=GSSAPI
a1.channels.c2.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.c2.kafka.producer.acks=1# 绑定Source和Channel
a1.sources.s2.channels = c2

场景三:将数据从日志文件中采集到HDFS,kafka作为中转

任务逻辑架构图

Flume任务模板

## 组件
a1.sources=s2
a1.channels=c2
a1.sinks=k2## Source 配置
a1.sources.s2.type = TAILDIR
a1.sources.s2.positionFile = /data/flume/positionFile/xxl_log_to_kafka.json
a1.sources.s2.filegroups = f1
a1.sources.s2.filegroups.f1 = /data/logs/access.log## Channel配置
a1.channels.c2.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.topic=channel_topic
a1.channels.c2.kafka.bootstrap.servers=192.168.0.1:9092
a1.channels.c2.kafka.consumer.group.id=channel_group
a1.channels.c2.parseAsFlumeEvent=false
a1.channels.c2.kafka.producer.security.protocol=SASL_PLAINTEXT
a1.channels.c2.kafka.producer.sasl.mechanism=GSSAPI
a1.channels.c2.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.c2.kafka.producer.acks=1# configure sinks
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://nameservice1/user/house/data/flume/test_hdfs_data/%Y-%m-%d/
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 24
a1.sinks.k2.hdfs.roundUnit = hour
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 20480000
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.useLocalTimeStamp = true
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.callTimeout=600000
# kerberos配置
a1.sinks.k2.hdfs.kerberosPrincipal = XX@XXX
a1.sinks.k2.hdfs.kerberosKeytab = /data/keytab/XXX.keytab# 绑定Source和Channel
a1.sources.s2.channels = c2# 绑定Sink和Channel
a1.sinks.k2.channel = c2

总结

在使用Flume采集数据时,使用Kafka Channel有如下优点:

  • 减少Flume对内存资源的消耗,使用默认的内存Channel时,数据存储在内存中,如果下游写入较慢,数据会一直堆积产生反压,甚至会导致内存过高而崩溃。
  • 增加系统的可靠性,没有数据丢失风险。使用内存Channel时,如果Flume发生崩溃,内存Channel中没有写入到Sink端的数据会丢失。
  • 性能远优于文件Channel,同时兼顾系统的可靠性,另外在配置Flume任务时可以依照上述架构方案减少Flume组件,缩短数据的处理流程。

http://www.ppmy.cn/embedded/174650.html

相关文章

【JavaEE进阶】Linux常用命令

目录 🍃前言 🌴pwd 与 ls 🚩pwd 🚩ls 🎍cd 🌲mkdir与touch 🚩mkdir 🚩touch 🍀cat与rm 🚩cat 🚩rm 🎋vim 🚩…

C++ 各种map对比

文章目录 特点比较1. std::map2. std::unordered_map3. std::multimap4. std::unordered_multimap5. hash_map(SGI STL 扩展) C 示例代码代码解释 特点比较 1. std::map 底层实现:基于红黑树(一种自平衡的二叉搜索树&#xff09…

C# 集合(Collection)详解以及区别

C# 集合(Collection)是用于数据存储和检索的核心数据结构,支持动态内存管理、多种数据组织形式及高效操作。以下是其核心特性和分类: 一、集合的核心作用 ‌1、动态扩展‌ 集合可动态调整容量(如 List)&am…

为什么从另一个电脑复制项目文件过来后,QT 在自己电脑上登录界面登不上,Shadow build 被选中原因

### 为什么从另一个电脑复制项目文件过来后,QT 在自己电脑上登录界面登不上,Shadow build 被选中原因 #### 1. **Shadow build 的作用** Shadow build 是 Qt Creator 提供的一种构建模式,将编译生成的中间文件和可执行文件存放在源代码目录之…

Java基础面试题学习

转换成自已的语言来回答,来源小林coding、沉默王二以及其它资源和自已改编。 1、概念 1、说一下Java的特点 我认为Java有很多特点 首先是平台无关性:Java可以实现一次编译到处运行,因为Java的编译器将源代码编译成字节码,使得该…

【搜索页】- 功能流程

【搜索页】- 功能流程 【搜索组件】- 改造搜索组件HdSearch src/main/ets/common/components/HdSearch.ets 课程目标 直接将搜索关键字写死在keywords数组中:keywords:string[][html,css,js,vue,react]使用setInterval实现每隔3秒完成题目分类数据的切换使用rout…

JAVA 中的 HashMap 工作原理

‌1. 底层数据结构‌ ‌数组 链表/红黑树‌&#xff1a; HashMap 内部维护一个 ‌桶数组&#xff08;Node[] table&#xff09;‌&#xff0c;每个桶&#xff08;Bucket&#xff09;存储链表或红黑树的头节点。 transient Node<K,V>[] table; // 桶数组 static class N…

C++基础 [十二] - 继承与派生

目录 前言 什么是继承 继承的概念 继承的定义 基类与派生类对象的赋值转换 继承的作用域 派生类中的默认成员函数 默认成员函数的调用 构造函数与析构函数 拷贝构造 赋值运算符重载 显示成员函数的调用 构造函数 拷贝构造 赋值运算符重载 析构函数 继承与…