Flume 与 Kafka 整合实战

ops/2024/11/27 16:53:16/

目录

kafka%E4%B8%AD%EF%BC%8C%E6%8A%BD%E5%8F%96%E5%87%BA%E6%9D%A5%E3%80%91-toc" style="margin-left:40px;">一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

(一)环境准备与配置文件创建

(二)创建主题

(三)测试步骤

kafka%E9%87%8C%E9%9D%A2%E3%80%91-toc" style="margin-left:40px;">二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

(一)编写配置脚本

(二)创建 topic

(三)测试过程

三、应用场景示例 

四、总结


        在大数据处理的生态系统中,Flume 和 Kafka 都是非常重要的组件。Flume 擅长收集、聚合和传输大量的日志数据等,而 Kafka 则是一个高性能的分布式消息队列,能够处理海量的实时数据。将 Flume 和 Kafka 进行整合,可以构建强大的数据处理管道,实现数据的高效采集、传输和处理。本文将详细介绍 Flume 和 Kafka 整合的两种常见方式:Kafka 作为 Source 和 Kafka 作为 Sink。

kafka%E4%B8%AD%EF%BC%8C%E6%8A%BD%E5%8F%96%E5%87%BA%E6%9D%A5%E3%80%91" style="background-color:transparent;">一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

 

(一)环境准备与配置文件创建

        在 Flume 的 conf 文件夹下,创建一个名为 kafka - memory - logger.conf 的脚本文件。这里需要注意,在实际操作中可能会遇到错误,例如 kafka 的每一批次的读取数量大于了 channel 的容量。这种情况下的解决方案是要么降低 kafka 的每一批次读取的容量,要么提高 channel 的容量。

https://flume.liyifeng.org/#kafka-source

kafka-memory-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = five
a1.sources.r1.kafka.consumer.group.id = qiaodaohu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128

 

(二)创建主题

        接着创建一个 topic,名字可以叫做 kafka - flume,当然也可以直接使用以前创建好的主题。

kafka-topics.sh --create --topic kafka-flume --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试步骤

首先启动一个消息生产者,向 topic 中发送消息。

kafka-console-producer.sh --topic kafka-flume --bootstrap-server bigdata01:9092

然后启动 Flume,接收消息并查看 log 日志,这样就可以验证数据是否能够从 Kafka 成功抽取到 Flume 中并进行后续处理。

flumeflumeconf 文件夹下

flume-ng agent -n a1 -c ../conf -f ./kafka-memory-logger.conf -Dflume.root.logger=INFO,console

kafka%E9%87%8C%E9%9D%A2%E3%80%91" style="background-color:transparent;">二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

 

 

(一)编写配置脚本

编写一个名为 flume - kafka - sink.conf 的脚本,内容如下:

##a1就是flume agent的名称
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = five
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里的流程是 netcat(模拟数据源)→ memory(内存通道)→ kafka

 

(二)创建 topic

使用以下命令创建 topic(flume - kafka):

kafka-topics.sh --create --topic flume-kafka --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试过程

启动 Flume:

flume-ng agent -n a1 -c conf -f $FLUME_HOME/job/flume-kafka-sink.conf -Dflume.root.logger=INFO,console

使用 telnet 命令,向端口发送消息:

yum -y install telnettelnet bigdata01 44444

在窗口不断地发送文本数据,数据就会被抽取到 Kafka 中。


使用消费者获取 Kafka 数据:

kafka-console-consumer.sh --topic flume-kafka --bootstrap-server bigdata01:9092 --from-beginning

 

三、应用场景示例 

        假定有这样一个场景:Flume 可以抽取不断产生的日志,抽取到的日志数据,发送给 Kafka,Kafka 经过处理,可以展示在页面上,或者进行汇总统计。这样就实现了一定的实时效果,在实际的大数据处理流程中,这种整合方式能够有效地处理海量的实时数据,提高数据处理的效率和可靠性。

四、总结

        通过 Flume 和 Kafka 的整合,我们能够构建更加灵活、高效的数据处理架构,满足不同场景下的大数据处理需求,为后续的数据挖掘、分析等提供坚实的数据基础。


http://www.ppmy.cn/ops/137118.html

相关文章

HTML的自动定义倒计时,这个配色存一下

<!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>自定义倒计时</title><style>* {mar…

前端 Vue 3 后端 Node.js 和Express 结合cursor常见提示词结构

cursor 提示词 后端提示词 请为我开发一个基于 Node.js 和Express 框架的 Todo List 后端项目。项目需要实现以下四个 RESTful API 接口&#xff1a; 查询所有待办事项 接口名: GET /api/get-todo功能: 从数据库的’list’集合中查询并返回所有待办事项参数: 无返回: 包含所…

Rust sqlx包访问sqlite数据库

如果您正在钻研Rust并希望使用数据库&#xff0c;那么SQLx是一个极好的选择。在本教程中&#xff0c;我们将探索将SQLx与SQLite&#xff08;一种轻量级嵌入式SQL数据库引擎&#xff09;结合使用的基础知识。SQLx crate是一个异步纯Rust SQL crate&#xff0c;具有编译时检查查询…

25A物联网微型断路器 智慧空开1P 2P 3P 4P-安科瑞黄安南

微型断路器&#xff0c;作为现代电气系统中不可或缺的重要组件&#xff0c;在保障电路安全与稳定运行方面发挥着关键作用。从其工作原理来看&#xff0c;微型断路器通过感知电流的异常变化来迅速作出响应。当电路中的电流超过预设的安全阈值时&#xff0c;其内部的电磁感应装置…

windows中idea选择bash作为控制台指令集,但是系统环境变量未在其中生效处理

1. 引言 在windows系统中安装node 以及npm时配置其环境&#xff0c;使用window环境变量的配置方式在系统环境变量设置的地方设置了环境变量如下图1-1&#xff0c;设置后在idea中的控制台通过 echo $PATH 查看环境变量发先跟系统中配置的不一致&#xff0c;而且node -v npm -v指…

用el-scrollbar实现滚动条,拖动滚动条可以滚动,但是通过鼠标滑轮却无效

问题&#xff1a; 用elementplus实现的滚动条的页面中&#xff0c;滑动滚动条可以滚动&#xff0c;但是通过鼠标滑轮却无效&#xff0c;鼠标没有问题。 解决&#xff1a; 在开发者工具中&#xff0c; 元素->事件监听器中发现当我移除网页中祖先元素的滚动事件&#xff0c;该…

【后端面试总结】MySQL索引

数据库索引不只一种实现方法&#xff0c;但是其中最具代表性&#xff0c;也是我们面试中遇到最多的无疑是B树。 索引为什么选择B树 数据量很大的查找&#xff0c;是不能直接放入内存的&#xff0c;而是需要什么数据就通过磁盘IO去获得。 红黑树&#xff0c;AVL树等二叉查找树…

Vue.js基础——贼简单易懂!!(响应式 ref 和 reactive、v-on、v-show 和 v-if、v-for、v-bind)

Vue.js是一个渐进式JavaScript框架&#xff0c;用于构建用户界面。它专门设计用于Web应用程序&#xff0c;并专注于视图层。Vue允许开发人员创建可重用的组件&#xff0c;并轻松管理状态和数据绑定。它还提供了一个虚拟DOM系统&#xff0c;用于高效地渲染和重新渲染组件。Vue以…