数据采集-->kafka-->hdfs

server/2024/9/23 14:47:47/

数据采集到kafka

flume:

a1.sources = r1
a1.channels = c1a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/installs/flume1.9/job/a.log
a1.sources.r1.positionFile = /opt/installs/flume1.9/job/taildir-kafka.jsona1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers =hadoop11:9092,hadoop12:9092,hadoop13:9092
a1.channels.c1.kafka.topic = topica
a1.channels.c1.parseAsFlumeEvent = falsea1.sources.r1.channels = c1

执行命令:

flume-ng agent --conf conf  --name a1 --conf-file job/taildir-kafka.conf -Dflume.root.logger=INFO,console

 向a.log添加测试数据: 

消费者:

 数据从kafkahdfs

flume

 

a1.sources = r1
a1.channels = c1 
a1.sinks = k1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize=5000
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers =hadoop11:9092,hadoop12:9092,hadoop13:9092
a1.sources.r1.kafka.topics = topica
a1.sources.r1.kafka.consumer.group.id = g1a1.channels.c1.type = memory
a1.channels.c1.capacity=5000
a1.channels.c1.transactionCapacity=5000a1.sinks.k1.type = hdfs
a1.sinks.k1.batchSize = 5000
a1.sinks.k1.hdfs.path = hdfs://hadoop11:8020/flume/date=%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.rollInterval =0 
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 0a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

执行命令:

flume-ng agent --conf conf  --name a1 --conf-file job/kafka-hdfs.conf -Dflume.root.logger=INFO,console

向a.log添加测试数据: 

消费者:

hdfs

 


http://www.ppmy.cn/server/105055.html

相关文章

【日常总结】阿里云:windows server 过一段时间登录不进去,或提示:出现身份验证错误。 无法连接到本地安全机构

场景 阿里云 : ESC系统:windows server 2022 问题 无法登录,或者登录浸提提示密码已过期 原因 密码设置了过期时间 解决方案 修改密码策略:密码设置永不过期 打开“本地安全策略”编辑器:运行 secpol.msc。 导航至“账户…

【leetcode】 27. 移除元素

题目: 题目链接 给你一个数组 nums 和一个值 val,你需要 原地 移除所有数值等于 val 的元素。元素的顺序可能发生改变。然后返回 nums 中与 val 不同的元素的数量。 假设 nums 中不等于 val 的元素数量为 k,要通过此题,您需要执行…

基于单片机车载酒精浓度的检测系统

摘 要: 为了有效地防止驾驶员酒后驾车的行为,设计了一种基于单片机车载酒精浓度的检测系统 。 该系统由酒精传感器、 A/D 转换器 、 AT89S52 单片机控制器 、 语音报警 、 LCD 液晶显示 、 LED 指示灯 、 车门锁传感器 、 压力传感器和继电器等构成。 当…

单线程环境下,用时间做为单一文件名称全局唯一处理

package org.example.file;import java.util.concurrent.ConcurrentHashMap;public class FileHelper {private FileService fileService;//全局唯一,用于控制文件时间全局唯一//value:0:没被消费,1:被消费public stat…

EasyCVR视频汇聚平台革新播放体验:WebRTC协议赋能H.265视频流畅传输

随着科技的飞速发展和网络技术的不断革新,视频监控已经广泛应用于社会各个领域,成为现代安全管理的重要组成部分。在视频监控领域,视频编码技术的选择尤为重要,它不仅关系到视频的质量,还直接影响到视频的传输效率和兼…

Tensorflow实现深度学习案例7:咖啡豆识别

本文为为🔗365天深度学习训练营内部文章 原作者:K同学啊 一、前期工作 1. 导入数据 from tensorflow import keras from tensorflow.keras import layers,models import numpy as np import matplotlib.pyplot as plt import os,PIL,p…

数学基础 -- 线性代数之矩阵的可逆性

矩阵的可逆性 1. 矩阵可逆的定义 对于一个 n n n \times n nn 的方阵 A A A,如果存在一个矩阵 B B B 使得: A B B A I n A \times B B \times A I_n ABBAIn​ 其中 I n I_n In​ 是 n n n \times n nn 的单位矩阵(对角线上全…

JavaScript类型化数组深度解析:提升二进制数据处理能力

在JavaScript中,类型化数组(Typed Arrays)是一种处理二进制数据的强大机制。它们为开发者提供了一种在JavaScript中存储和操作固定长度的原始二进制数据的方式。与普通数组不同,类型化数组允许你以特定的格式(如整数或…