【大数据学习 | flume】flume之常见的channel组件

news/2024/11/17 15:16:49/

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件,Flume对于Channel,则提供了Memory Channel、JDBC Chanel、File Channel。

MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。

FileChannel保证数据的完整性与一致性。

​ Spillable Memory Channel基于内存和磁盘,内存不够时将数据存储在磁盘中,数据出错恢复时,只恢复磁盘中的数据,还在测试阶段不建议在生产环境用。

1. file channel

# file channel#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=11.90.214.80
a1.sources.r1.port=44444#定义channel
a1.channels.c1.type=file
a1.channels.c1.dataDirs = /root/filedata#定义sink
a1.sinks.k1.type=logger
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

创建数据输出目录

mkdir -p /root/filedata

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./fileroll.agent -Dflume.root.logger=INFO,console

2. Kafka Channel

将数据存储到kafka中,kafka数据也是存储在磁盘中,并且kafka提供了高可用的功能,数据不会丢失。

重新启动镜像并需要添加kafka的组件。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=11.90.214.80
a1.sources.r1.port=44444#定义channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.c1.kafka.topic = hainiu
a1.channels.c1.kafka.consumer.group.id = flume-consumer#定义sink
a1.sinks.k1.type=logger
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./kafkachannel.agent -Dflume.root.logger=INFO,console

测试kafka中是否存储flume收集过来的数据:

启动kafka消费者消费指定分区的数据

#创建主题
./kafka-topics.sh --zookeeper11.99.16.105:2181 --create --topic hainiu --replication-factor 1 --partitions 1
#生产者生产数据
./kafka-console-producer.sh --broker-list 11.99.16.105:9092--topic hainiu
#消费者消费数据
kafka-console-consumer.sh --bootstrap-server 11.99.16.105:9092 --topic hainiu

通过telnet向flume监听的端口发数据

flume logger sink将数据打印在控制台


http://www.ppmy.cn/news/1547746.html

相关文章

【自学笔记】神经网络(2) -- 模型评估和优化

文章目录 划分数据集训练集和测试集训练误差 测试误差交叉验证测试集 偏差与方差偏差方差表现基准 学习曲线error - train set sizeerror - degree of polynomial 数据增强迁移学习项目的完整周期样本不平衡问题精确率和召回率精确率与召回率的平衡 划分数据集 我们当然希望把所…

Linux 批量配置互信

批量配置SSH互信脚本 #!/bin/bash# 定义目标机器列表 machines( "192.168.122.87" "192.168.122.89" "192.168.122.90" ) set -o errexit # 设置默认的用户名和密码 default_username"root" default_password"111111"# 读取…

接口压力测试、性能测试工具

接口压力测试、性能测试工具 文章说明核心源码1.0版本--采用浏览器发送ajax请求进行性能测试2.0版本--结合Java模拟压力测试功能 运行截图源码下载 文章说明 使用jmeter有些地方我觉得有点小复杂,我写了一个小工具来进行接口的简单性能和压力测试 核心源码 1.0版本…

python习题练习

python习题 编写一个简单的工资管理程序系统可以管理以下四类人:工人(worker)、销售员(salesman)、经理(manager)、销售经理(salemanger)所有的员工都具有员工号,工资等属性,有设置姓名,获取姓名,获取员工号,计算工资等…

怎样遵守编程规范,减少和控制C++编程中出现的bug?

遵守编程规范和最佳实践是减少和控制 C 编程中出现 bug 的重要手段。以下是一些具体的建议和策略,帮助你编写更健壮、更易于维护的 C 代码。 1. 遵循 C 标准和最佳实践 使用现代 C 特性:尽可能使用 C11 及之后的标准,避免使用过时的特性和库…

SpringBoot整合FreeMarker生成word表格文件

SpringBoot整合FreeMarker生成word表格文件(使用FTL模板)_freemarker ftl模板-CSDN博客 Freemarker基本指令语法和集合指令语法SpringBoot整合FreeMarker生成word表格文件(使用FTL模板)_freemarker ftl模板-CSDN博客https://zhua…

【MySQL】MySQL中的函数之JSON_REPLACE

在 MySQL 中,JSON_REPLACE() 函数用于在 JSON 文档中替换现有的值。如果指定的路径不存在,则 JSON_REPLACE() 不会修改 JSON 文档。如果需要添加新的键值对,可以使用 JSON_SET() 函数。 基本语法 JSON_REPLACE(json_doc, path, val[, path,…

数据结构(单向链表——c语言实现)

链式存储的优缺点: 优点: 1、动态分配内存: 链式存储不需要在数据插入之前分配固定大小的数组或内存块,因此它更适合存储动态变化的数据 2、高效的插入和删除操作: 在链表中插入或删除元素只需要调整相邻节点的指…