Flume配置案例@Source:Kafka,Channel:File,Sink:HDFS

news/2025/2/16 0:54:31/

创建flume配置文件

[atguigu@hadoop104 flume]$ vim job/kafka_to_hdfs_log.conf

配置内容如下:

---------------------

#定义组件

a1.sources=r1

a1.channels=c1

a1.sinks=k1

#配置source1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.batchSize = 5000

a1.sources.r1.batchDurationMillis = 2000

a1.sources.r1.kafka.bootstrap.servers= = hadoop102:9092,hadoop103:9092,hadoop104:9092

# 消费的kafka的主题是topic_log

a1.sources.r1.kafka.topics=topic_log

a1.sources.r1.interceptors = i1

# 下面写的这个拦截器是要解决零点飘移问题

a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimestampInterceptor$Builder

com.atguigu.flume.interceptor. 这是包名 ;$Builder 这是里面的内部类

#配置channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1

a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1

a1.channels.c1.maxFileSize = 2146435071

a1.channels.c1.capacity = 1000000

a1.channels.c1.keep-alive = 6

#配置sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d

/origin_data/gmall/log/topic_log/%Y-%m-%d 这个是要存的HDFS的路径,可以提前不存在

a1.sinks.k1.hdfs.filePrefix = log

a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

#组装 

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


http://www.ppmy.cn/news/1418714.html

相关文章

hive metastore使用mysql作为backend db遇到的问题

文章目录 问题解决 问题 hms使用mysql作为Backend metadata database, 但是启动爆如下错误. Underlying cause: com.mysql.cj.jdbc.exceptions.CommunicationsException : Communications link failureThe last packet sent successfully to the server was 0 milliseconds a…

【保姆级讲解Element UI】

🌈个人主页: 程序员不想敲代码啊 🏆CSDN优质创作者,CSDN实力新星,CSDN博客专家 👍点赞⭐评论⭐收藏 🤝希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共…

数据结构-堆详解

堆 图片: 二叉堆的父节点为这个子树的最值。 如何维护它。 我们发现它是一棵二叉树,那就自然满足若父节点为 x x x 则左儿子节点为 x 2 x\times2 x2 右儿子为 x 2 1 x\times 2 1 x21 这是显然的,但如果写成指针或结构体就太麻烦了&…

云计算:Linux 部署 OVN 集群

目录 一、实验 1.环境 2.Linux 部署 OVN 集群(中心端) 3.Linux 部署 OVN 集群(业务端1) 4.Linux 部署 OVN 集群(业务端2) 4.OVN 中心端 连接数据库 5.OVN 业务端1 加⼊控制器 6.OVN 业务端2 加⼊控…

Adobe Photoshop 2024 v25.6 (macOS, Windows) - 照片和设计软件

Adobe Photoshop 2024 v25.6 (macOS, Windows) - 照片和设计软件 Acrobat、After Effects、Animate、Audition、Bridge、Character Animator、Dimension、Dreamweaver、Illustrator、InCopy、InDesign、Lightroom Classic、Media Encoder、Photoshop、Premiere Pro、Adobe XD …

Spring Boot 经典面试题(七)

1. Spring Boot中如何集成Swagger生成API文档 在Spring Boot中集成Swagger生成API文档非常简单&#xff0c;以下是基本的步骤&#xff1a; 首先&#xff0c;在pom.xml文件中添加Swagger依赖&#xff1a; <dependency><groupId>io.springfox</groupId><…

zustand状态库在react类组件中使用

如果想在React类组件中使用zustand状态管理库&#xff0c;可以在类组件中调用create函数创建一个状态store&#xff0c;并使用useStore钩子来访问和更新状态。虽然zustand通常与函数式组件一起使用&#xff0c;但也可以在类组件中使用。 以下是一个简单的示例&#xff0c;展示…

pyqt和opencv结合01:读取图像、显示

在这里插入图片描述 1 、opencv读取图像用于pyqt显示 # image cv2.imread(file_path)image cv2.cvtColor(image, cv2.COLOR_BGR2RGB)# 将图像转换为 Qt 可接受的格式height, width, channel image.shapebytes_per_line 3 * widthq_image QImage(image.data, width, hei…