【Flume实操】实时监听 NetCat 端口和本地文件数据到 HDFS 案例分析

server/2024/11/15 8:33:25/

聚合:实时监听 NetCat 端口和本地文件数据到 HDFS 案例分析

案例需求:假设有一个生产场景,Flume1 在实时产生日志数据,日志类型为 flume.log。Flume2 在持续监控一个 netcat 端口的数据流。先需要将 Flume1、Flume2产生的数据采集汇总到 Flume3 上,并统一收集上传到 HDFS 上保存。

需求分析:该实训是,Flume1 使用 Exec Source 监控 /root/software/apache-flume-1.9.0-bin/logs/flume.log 文件,Flume2 监听本机 4141 端口的数据流。Flume1 与 Flume2 将数据发送给 Flume3,Flume3 将数据收集并保存到 HDFS。
过程如下图所示:
image.png

Flume1

要求:配置 Exec Source 用于监控 /root/software/apache-flume-1.9.0-bin/logs/flume.log 日志文件,配置 Avro Sink 输出数据到下一级 Flume。

  1. 创建 Agent 配置文件(Exec Source + Memory Channel + Avro Sink)
    进入 /root/software/apache-flume-1.9.0-bin/conf 目录,使用 touch 命令创建一个名为 flume1_exec_avro.conf 的配置文件。

  2. 查看端口
    使用 netstat -nlp | grep 6666 命令查看 6666 端口是否被占用。

  3. 编辑配置文件 flume1_exec_avro.conf
    使用 vim 命令打开 flume1_exec_avro.conf 文件,在里面添加如下配置:

(1)配置 Flume Agent——ExecAgent

首先,我们需要为 ExecAgent 命名/列出组件,将数据源(Source)、缓冲通道(Channel)和接收器(Sink)分别命名为 execSource、 memoryChannel 和 avroSink。

## Name the components on this agent
# execSource为 ExecAgent的 Source的名称
ExecAgent.sources = execSource
# memoryChanne1为 ExecAgent的 Channel的名称
ExecAgent.channels = memoryChannel
# avroSink为ExecAgent的 Sink的 名称
ExecAgent.sinks = avroSink

(2)描述和配置 Exec Source

Exec Source 在启动时运行给定的 Unix 命令,并期望该进程在标准输出上连续生成数据。如果进程因任何原因退出,则 Source 也会退出并且不会继续生成数据。这意味着诸如 cat [named pipe] 或 tail -F [file] 之类的命令会产生所需的结果,而 date 这种命令可能不会,因为前两个命令(tail 和 cat)能产生持续的数据流,而后者(date 这种命令)只会产生单个 Event 并退出。

我们需要为 execSource 设置以下属性:

属性名称描述设置值
channels与 Source 绑定的 ChannelmemoryChannel
type数据源的类型exec
command所使用的系统命令,一般是 cat 或者 tailtail -F /root/software/apache-flume-1.9.0-bin/logs/flume.log
## Describe/configure the source
# 与 Source绑定的 Channel
ExecAgent.sources.execSource.channels = memoryChannel
# 数据源的类型为 exec类型
ExecAgent.sources.execSource.type = exec
# 实时监控单个追加文件
ExecAgent.sources.execSource.command = tail -F /root/software/apache-flume-1.9.0-bin/logs/flume.log

(3)描述和配置 Avro Sink

Avro Sink 可以作为 Flume 分层收集特性的下半部分。发送到此 Sink 的 Flume Event 将转换为 Avro Event 发送到配置的主机/端口上。Event 从配置的 Channel 中批量获取,数量根据配置的 batch-size 而定。

我们需要为 avroSink 设置以下属性:

属性名称描述设置值
channel与 Sink 绑定的 ChannelmemoryChannel
type接收器的类型avro
hostname要监听的主机名或 IP 地址0.0.0.0(代表所有不清楚的主机和目的网络,即表示整个网络,也就是网络中的所有主机)
port要监听的服务端口6666(任意可用端口)
## Describe the sink
#与 Sink绑定的 Channel
ExecAgent.sinks.avroSink.channel = memoryChannel
# 接收器要监听的主机名或 IP 地址
ExecAgent.sinks.avroSink.type = avro
ExecAgent.sinks.avroSink.hostname = 0.0.0.0
# 接收器要监听的服务端口
ExecAgent.sinks.avroSink.port = 6666

(4)描述和配置 Memory Channel

Memory Channel 是把 Event 队列存储到内存上,队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景,但也是有代价的,当发生故障的时候会丢失当时内存中的所有 Event。

我们需要为 memoryChannel 设置以下属性:

属性名称描述设置值
type缓冲通道的类型memory
capacity存储在 Channel 中的最大 Event 数,默认值1001000
transactionCapacityChannel 将从 Source 接收或向 Sink 传递的每一个事务中的最大 Event 数(capacity>= transactionCapacity),默认值100100
## Use a channel which buffers events inmemory
#缓冲通道的类型为memory内存型
ExecAgent.channels.memoryChannel.type = memory
# capacity为最大容量,transactioncapacity为 Channe1每次提交的Event的最大数量,capacity>= transactionCapacity
ExecAgent.channels.memoryChannel.capacitv=1000
ExecAgent.channels.memoryChannel.transactionCapacity=100
  1. 启动 Flume1 Agent ExecAgent
    在 $FLUME_HOME 目录下使用如下命令启动 ExecAgent:
flume-ng agent -c conf/ -f conf/flume1_exec_avro.conf -n ExecAgent -Dflume.root.logger=INFO,console

Flume2

flume1_exec_avro.conf

  1. 编辑配置文件 flume2_netcat_avro.conf
    使用 vim 命令打开 flume2_netcat_avro.conf 文件,在里面添加如下配置:

(1)配置 Flume Agent——NetCatAgent

首先,我们需要为 NetCatAgent 命名/列出组件,将数据源(Source)、缓冲通道(Channel)和接收器(Sink)分别命名为 netcatSource、 memoryChannel 和 avroSink。

## Name the components on this agent
# netcatSource为 NetCatAgent的 Source的名称
NetCatAgent.sources = netcatSource
# memoryChanne1为 NetCatAgent的 Channel的名称
NetCatAgent.channels = memoryChannel
# avroSink为NetCatAgent的 Sink的 名称
NetCatAgent.sinks = avroSink

(2)描述和配置 NetCat TCP Source

NetCat TCP Source 侦听给定端口并将每一行文本转换为一个事件(Event),也就是数据是基于换行符分隔。它的工作就像命令 nc -k -l [host] [port]。换句话说,它打开一个指定端口并监听数据。期望提供的数据是换行符分隔的文本。每一行文本都会变成一个 Flume 事件,并通过连接的通道(Channel)发送。

我们需要为 netcatSource 设置以下属性:

属性名称描述设置值
channels与 Source 绑定的 ChannelmemoryChannel
type数据源的类型netcat
bind要监听的主机名或 IP 地址localhost(本机)
port要监听的服务端口4141(任意可用端口)
## Describe/configure the source
# 与 Source绑定的 Channel
NetCatAgent.sources.netcatSource.channels = memoryChannel
# 数据源的类型为 exec类型
NetCatAgent.sources.netcatSource.type = netcat
# 实时监控单个追加文件
NetCatAgent.sources.netcatSource.bind = localhost
NetCatAgent.sources.netcatSource.port = 4141

(3)描述和配置 Avro Sink

Avro Sink 可以作为 Flume 分层收集特性的下半部分。发送到此 Sink 的 Flume Event 将转换为 Avro Event 发送到配置的主机/端口上。Event 从配置的 Channel 中批量获取,数量根据配置的 batch-size 而定。

我们需要为 avroSink 设置以下属性:

属性名称描述设置值
channel与 Sink 绑定的 ChannelmemoryChannel
type接收器的类型avro
hostname要监听的主机名或 IP 地址0.0.0.0(代表所有不清楚的主机和目的网络,即表示整个网络,也就是网络中的所有主机)
port要监听的服务端口6666(任意可用端口)
## Describe the sink
#与 Sink绑定的 Channel
NetCatAgent.sinks.avroSink.channel = memoryChannel
# 接收器要监听的主机名或 IP 地址
NetCatAgent.sinks.avroSink.type = avro
NetCatAgent.sinks.avroSink.hostname = 0.0.0.0
# 接收器要监听的服务端口
NetCatAgent.sinks.avroSink.port = 6666

(4)描述和配置 Memory Channel

## Use a channel which buffers events inmemory
#缓冲通道的类型为memory内存型
NetCatAgent.channels.memoryChannel.type = memory
# capacity为最大容量,transactioncapacity为 Channe1每次提交的Event的最大数量,capacity>= transactionCapacity
NetCatAgent.channels.memoryChannel.capacitv=1000
NetCatAgent.channels.memoryChannel.transactionCapacity=100
  1. 启动 Flume2 Agent NetCatAgent
flume-ng agent -c conf/ -f conf/flume2_netcat_avro.conf -n NetCatAgent -Dflume.root.logger=INFO,console

Flume3

(1)配置 Flume Agent——AvroAgent

首先,我们需要为 AvroAgent 命名/列出组件,将数据源(Source)、缓冲通道(Channel)和接收器(Sink)分别命名为 avroSource、 memoryChannel 和 HDFSSink。

## Name the components on this agent
# avroSource为 AvroAgent的 Source的名称
AvroAgent.sources = avroSource
# memoryChanne1为 AvroAgent的 Channel的名称
AvroAgent.channels = memoryChannel
# HDFSSink为AvroAgent的 Sink的 名称
AvroAgent.sinks = HDFSSink

(2)描述和配置 Avro Source

## Describe/configure the source
# 与 Source绑定的 Channel
AvroAgent.sources.avroSource.channels = memoryChannel
# 数据源的类型为 avro类型
AvroAgent.sources.avroSource.type = avro
# 接收器要监听的主机名或 IP 地址
AvroAgent.sources.avroSource.bind = 0.0.0.0
# 接收器要监听的服务端口
AvroAgent.sources.avroSource.port = 6666

(3)描述和配置 HDFS Sink

属性名称描述设置值
channel与 Sink 绑定的 ChannelmemoryChannel
type接收器的类型hdfs
hdfs.pathHDFS 目录路径hdfs://localhost:9000/flumedata/%Y-%m-%d
hdfs.filePrefixFlume 在 HDFS 文件夹下创建新文件的固定前缀,默认值为 FlumeDataflumelogs
hdfs.rollInterval滚动当前文件之前等待的秒数(0=从不基于时间间隔滚动),单位:秒,默认值:3060
hdfs.rollSize触发滚动的文件大小(0=永不基于文件大小滚动),单位:字节,默认值:1024134217728 (128M)
hdfs.rollCount在滚动之前写入文件的 Event 数(0=从不基于 Event 数滚动),默认值:100
hdfs.fileType文件格式。目前支持: SequenceFile、DataStream 、CompressedStream。 其中,DataStream 不会压缩文件,不需要设置 hdfs.codeC,而 CompressedStream 必须设置 hdfs.codeC 参数。默认值:SequenceFileDataStream
hdfs.writeFormat文件写入格式。可选值:Text 和 Writable,默认值:Writable。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala 或 Apache Hive 无法读取这些文件Text
hdfs.useLocalTimeStamp在替换转义序列时使用本地时间(而不是使用 Event headers 中的时间戳),默认值:falsetrue
## Describe the sink
#与 Sink绑定的 Channel
AvroAgent.sinks.HDFSSink.channel = memoryChannel
# 接收器的类型为 hdfs类型,输出目的地是HDFS
AvroAgent.sinks.HDFSSink.type = hdfs
# 数据存放在HDFS上的目录
AvroAgent.sinks.HDFSSink.hdfs.path = hdfs://localhost:9000/flumedata/%Y-%m-%d
# 文件的固定前缀为 hivelogs-
AvroAgent.sinks.HDFSSink.hdfs.filePrefix = flumelogs-
# 按时间间隔滚动文件,默认30s,此处设置为 60s
AvroAgent.sinks.HDFSSink.hdfs.rollInterval = 60
# 按文件大小滚动文件,默认1024字节,此处设置为5242880字节 ( 5M)
AvroAgent.sinks.HDFsSink.hdfs.rollSize = 134217728
# 当Event个数达到该数量时,将临时文件滚动成目标文件,默认是10,0表示文件的滚动与Event数量无关
AvroAgent.sinks.HDFSSink.hdfs.rollCount =0
# 文件格式,默认为SequenceFile,但里面的内容无法直接打开浏览,所以此处设置为DataStream
AvroAgent.sinks.HDFSSink.hdfs.fileType = DataStream
# 文件写入格式,默认为Writable,此处设置为Text
AvroAgent.sinks.HDFSSink.hdfs.writeFormat = Text
# HDFS Sink是否使用本地时间,默认为false,此处设置为true
AvroAgent.sinks.HDFSSink.hdfs.useLocalTimeStamp = true

(4)描述和配置 Memory Channel

## Use a channel which buffers events inmemory
#缓冲通道的类型为memory内存型
AvroAgent.channels.memoryChannel.type = memory
# capacity为最大容量,transactioncapacity为 Channe1每次提交的Event的最大数量,capacity>= transactionCapacity
AvroAgent.channels.memoryChannel.capacitv=1000
AvroAgent.channels.memoryChannel.transactionCapacity=100
  1. 启动 Flume3 Agent AvroAgent
    在 $FLUME_HOME 目录下使用如下命令启动 AvroAgent:
flume-ng agent -c conf/ -f conf/flume3_avro_hdfs.conf -n AvroAgent -Dflume.root.logger=INFO,console

2. Flume 采集数据测试

为了验证并查看 Flume 采集数据的效果,可以在本机 4141 端口模拟生成 netcat 数据。打开另外一个终端,使用 Netcat 工具向本机的 4141 端口发送数据。命令如下:

nc localhost 4141
上述指令的作用就是使用 Netcat 工具连接到本机 4141 端口,用来持续发送信息作为 Flume 将要采集的源数据。执行指令后的效果如下图所示:
image.png

从上图可以看出, Netcat 工具正式启动成功,并且光标处于闪烁状态,等待用户在该端口输入数据信息。在 Netcat 工具测试界面,随意输入数据信息,如 hello flume 并按下【Enter】键,查看效果如下图所示:
image.png

  1. 查看采集结果
    (1)通过本机的浏览器访问 http://localhost:50070 或 http://本机IP地址:50070进入 HDFS Web UI 界面。效果如下图所示:
    在这里插入图片描述

(2)查看 /flumedata/2022-04-13/flumelogs.1649836480055 文件内容。如下图所示:
在这里插入图片描述

从上图可以看出,Flume 已经准确监听并采集到了监听应用发送的 netcat 数据。


http://www.ppmy.cn/server/141190.html

相关文章

关于word 页眉页脚的一些小问题

去掉页眉底纹: 对文档的段落边框和底纹进行设置,也是页眉横线怎么删除的一种解决方式,具体操作如下: 选中页眉中的横线文本; 点击【开始】选项卡,在【段落】组中点击【边框】按钮的下拉箭头; …

FreeRTOS 23:事件组EventGroup创建、删除、置位操作

FreeRTOS 提供了事件标志组的一些相关操作函数,如下表所示: 创建一个事件组 xEventGroupCreate() 动态方式创建事件标志组 xEventGroupCreate()用于创建一个事件组,并返回对应的句柄。 要想使用该函数必须 在头文件 FreeRTOSConfig.h 定义宏…

Linux安装Mysql详细教程(两种安装方法)

Linux之Mysql安装配置 第一种:Linux离线安装Mysql(提前手动下载好tar.gz包)第二种:通过yum安装配置Mysql(服务器有网络) 第一种:tar.gz包安装 1、 查看是否已经安装 Mysql rpm -qa | grep m…

AMD-OLMo:在 AMD Instinct MI250 GPU 上训练的新一代大型语言模型。

AMD-OLMo是一系列10亿参数语言模型,由AMD公司在AMD Instinct MI250 GPU上进行训练,AMD Instinct MI250 GPU是一个功能强大的图形处理器集群,它利用了OLMo这一公司开发的尖端语言模型。AMD 创建 OLMo 是为了突出其 Instinct GPU 在运行 “具有…

计算机网络常见面试题(一):TCP/IP五层模型、TCP三次握手、四次挥手,TCP传输可靠性保障、ARQ协议

文章目录 一、TCP/IP五层模型(重要)二、应用层常见的协议三、TCP与UDP3.1 TCP、UDP的区别(重要)3.2 运行于TCP、UDP上的协议3.3 TCP的三次握手、四次挥手3.3.1 TCP的三次握手3.3.2 TCP的四次挥手3.3.3 随机生成序列号的原因 四、T…

【学习笔记】SAP ABAP——OPEN SQL(一)【INTO语句】

【INTO语句】 结构体插入(插入一条语句时) SELECT...INTO [CORRESPONDING FIELDS OF] <wa> FROM <db> WHERE <condition>.内表插入(插入多条语句时) SELECT...INTO|APPENDING [CORRESPONDING FIELDS OF] TABLE <itab>FROM <db> WHERE <con…

阿里云文本内容安全处理

1、什么是内容安全 内容安全是一款基于AI算法和云计算技术&#xff0c;对多媒体内容的不宜或违规内容提供识别和标注的产品。该产品&#xff0c;支持对各行业及业务场景下的图片、视频、文本、语音等对象进行检测&#xff0c;可以帮助您提高内容审核效率、提高平台内容质量和用…

Oracle 第24章:云数据库服务

在《Oracle 第24章&#xff1a;云数据库服务》中&#xff0c;我们将探讨Oracle提供的云数据库服务&#xff0c;包括如何在云端部署与管理Oracle数据库。这部分内容将涵盖Oracle Cloud服务的概览&#xff0c;并通过具体的案例来说明如何利用这些服务进行高效的数据库管理和开发。…