【大数据学习 | flume】flume之常见的sink组件

server/2024/11/18 9:07:04/

Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、HBase sink,。

​ Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据,在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

1. File_roll Sink

File_roll sink是将收集到的数据存放在本地文件系统中,根据指定的时间生成新的文件用来保存数据。

# file_role sink#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/root/file_role
a1.sinks.k1.sink.rollInterval=60
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

rollInterval=60:每隔60s滚动生成一个文件。

创建数据输出目录

mkdir -p /root/file_role

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./file_roll.agent -Dflume.root.logger=INFO,console

2. hdfs sink

hdfs sink是将flume收集到的数据写入到hdfs中,方便数据可靠的保存。

其中:

sink 输出到hdfs中,默认每10个event 生成一个hdfs文件,hdfs文件目录会根据hdfs.path 的配置自动创建。

sink hdfs 配置参数描述:

名称描述
hdfs.pathhdfs目录路径
hdfs.filePrefix文件前缀。默认值FlumeData
hdfs.fileSuffix文件后缀
hdfs.rollInterval多久时间后close hdfs文件。单位是秒,默认30秒。设置为0的话表示不根据时间close hdfs文件
hdfs.rollSize文件大小超过一定值后,close文件。默认值1024,单位是字节。设置为0的话表示不基于文件大小
hdfs.rollCount写入了多少个事件后close文件。默认值是10个。设置为0的话表示不基于事件个数
hdfs.fileType文件格式, 有3种格式可选择:SequenceFile(默认), DataStream(不压缩) or CompressedStream(可压缩)
hdfs.batchSize批次数,HDFS Sink每次从Channel中拿的事件个数。默认值100
hdfs.minBlockReplicasHDFS每个块最小的replicas数字,不设置的话会取hadoop中的配置
hdfs.maxOpenFiles允许最多打开的文件数,默认是5000。如果超过了这个值,越早的文件会被关闭
hdfs.callTimeoutHDFS操作允许的时间,比如hdfs文件的open,write,flush,close操作。单位是毫秒,默认值是10000
hdfs.codeC压缩编解码器。以下之一:gzip,bzip2,lzo,lzop,snappy
# hdfs sink
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/data/xinniu/output/%Y-%m-%d
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.filePrefix=hainiu-
a1.sinks.k1.hdfs.fileSuffix=.log
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

3. kafka sink

将数据写入到kafka中

# kafka sink
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.topic = hainiu
#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动kafka消费者消费hainiu topic中的数据

启动fluem agent

启动flume agent a1 服务端

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./kafkasink.agent -Dflume.root.logger=INFO,console

kafka保存flume收集到的数据,并通过kafka消费者消费到收集到的数据

4. avro sink

flume收集到的数据通过avro sink序列化出去,通常用于数据跨服服务多级流动。

启动三台机器:

在第一台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 55555a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

第二台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=11.94.204.87
a1.sources.r1.port=55555a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname =11.147.251.96
a1.sinks.k1.port = 55555a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

第三台节点编写agent

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=loggera1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第三台节点打印数据到控制台

4.1 扇出操作

还可以通过avro sink 实现扇出操作:即第一台服务器收集数据,将数据发送到第二台和第三台服务器。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 c2a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k2.type=avro
a1.sinks.k2.hostname = worke-2
a1.sinks.k2.port = 55555a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=loggera1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第二台和第三台节点打印数据到控制台

4.2 扇入操作

还可以通过avro sink 实现扇入操作:即第一台和第二台手机数据,将数据发送到第三台服务器。


http://www.ppmy.cn/server/142866.html

相关文章

【windows】05-windows系统级深度隐藏文件方法

文章目录 高级文件隐藏技巧基本用法隐藏文件或文件夹显示隐藏的文件或文件夹 完整语法 高级文件隐藏技巧 使用 attrib 命令可以在 Windows 系统中快速修改文件和文件夹的属性,实现更高级的隐藏。这种方法不仅隐藏文件,而且设置为系统文件,使…

使用vscode+expo+Android夜神模拟器运行react-native项目

1.进入夜神模拟器安装路径下的bin目录 2.输入命令,连接Android Studio 启动夜神模拟器后, 打开安装目录的bin文件夹执行下面的命令,只需执行一次) nox_adb.exe connect 127.0.0.1:62001adb connect 127.0.0.1:62001 3.运行项目…

Springboot整合xxl-job

拉取xxl-job xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。 配置项目 执行sql语句 更改配置 启动 访问 任务调度中心http://127.0.0.1:8081/xxl-job-a…

OTX 架构开发需求分析

(一)应用场景调研 汽车行业应用 深入研究汽车生产线上的整车检测场景,包括对发动机、传动系统、电子控制单元(ECU)等关键部件的功能测试和故障诊断。了解汽车售后维修中,维修人员利用诊断设备对车辆进行故障…

鸿蒙开发应用权限管理

简介 一种允许应用访问系统资源(如:通讯录等)和系统能力(如:访问摄像头、麦克风等)的通用权限访问方式,来保护系统数据(包括用户个人数据)或功能,避免它们被…

React 中如何解析字符串中的 html 结构

React 中解析字符串中的 html 结构 通过 dangerouslySetInnerHTML 属性进行绑定 const htmlStr <h1>Hello, React</h1> <div dangerouslySetInnerHTML{{ __html: htmlStr }}></div>

数据驱动的期货市场决策:民锋科技的量化分析创新

期货市场因其高度的波动性和市场前瞻性&#xff0c;成为各类投资者的关注焦点。民锋科技利用先进的数据分析和量化技术&#xff0c;为期货市场的参与者提供智能决策支持&#xff0c;帮助其在快速变化的环境中洞察市场趋势。本文将详细介绍民锋科技在期货市场中的数据应用优势。…

第一章 Spring Boot快速⼊⻔ —— 构建Spring Boot项目

概览&#xff1a; SpringBoot设计目的是用来简化Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置&#xff0c;可以更加快速便捷地开发Spring项目&#xff0c;在开发过程当中可以专注于应用程序本身的功…