大数据技术之Flume应用案例(2)

embedded/2024/10/16 2:31:30/

目录

 监控端口数据官方案例

步骤 1: 准备环境

步骤 2: 配置 Flume Agent

步骤 3: 启动 Flume Agent

步骤 4: 发送数据到 Flume

步骤 5: 查看 HDFS 中的数据

注意事项

示例说明

实时监控单个追加文件案例

需求分析

实现步骤

(1)确保环境变量配置正确

flume-file-hdfs.conf%20%E6%96%87%E4%BB%B6-toc" style="margin-left:160px;">(2)创建 flume-file-hdfs.conf 文件

(3)运行 Flume

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

(5)在 HDFS 上查看文件

实时监控目录下多个新文件案例

需求分析

实现步骤

flume-dir-hdfs.conf%20%E6%96%87%E4%BB%B6-toc" style="margin-left:160px;">(1)创建 flume-dir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 upload 文件夹中添加文件

(4)查看 HDFS 上的数据

实时监控目录下的多个追加文件案例

需求分析

实现步骤

flume-taildir-hdfs.conf%20%E6%96%87%E4%BB%B6-toc" style="margin-left:120px;">(1)创建 flume-taildir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 files 文件夹中追加内容

(4)查看 HDFS 上的数据

Taildir Source 说明


 监控端口数据官方案例

Flume 可以用来监控网络端口数据,这对于收集来自不同系统的日志或数据非常有用。下面是一个使用 Flume 监控网络端口数据的官方示例,我们将使用 Flume 的 netcat source 来接收数据,并将其写入到 HDFS 中。

步骤 1: 准备环境

确保已经安装并配置好了 Flume 和 Hadoop。这里假设你已经在上一步中完成了 Flume 的安装。

步骤 2: 配置 Flume Agent

创建一个名为 flume-conf.properties 的配置文件,该文件将定义一个 Flume Agent 的配置。

配置文件 flume-conf.properties

# 定义 agent 名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# 配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume
a1.sinks.k1.hdfs.filePrefix = flume-logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 512
a1.sinks.k1.hdfs.rollCount = 20
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置 agent 的 source、channel 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

步骤 3: 启动 Flume Agent

使用以下命令启动 Flume Agent:

$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file ./flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

这里 $FLUME_HOME 是 Flume 的安装目录。

步骤 4: 发送数据到 Flume

你可以使用 netcat 工具或其他类似工具发送数据到 Flume 监听的端口。例如,如果你在另一台机器上或同一台机器的不同终端窗口中,可以使用 netcat 发送数据:

echo "This is a test message" | nc localhost 44444

步骤 5: 查看 HDFS 中的数据

一旦数据被发送到 Flume,Flume 将其写入到 HDFS 中。你可以使用 Hadoop 命令来查看数据:

hadoop fs -ls /flume
hadoop fs -cat /flume/flume-logs-*

注意事项

  • 确保 Hadoop 的 hdfs-site.xml 和 core-site.xml 配置文件已经正确配置。
  • 如果你的 Hadoop 集群使用了安全模式,确保你已经配置了正确的 Kerberos 凭证。
  • 如果你使用的是分布式 Flume,确保所有的 Flume 节点都能够访问 HDFS。

示例说明

  • Netcat Source (a1.sources.r1):配置了 netcat source 来监听 localhost 的 44444 端口。
  • HDFS Sink (a1.sinks.k1):配置了 HDFS sink 将数据写入到 HDFS 的 /flume 目录下。
  • Memory Channel (a1.channels.c1):使用内存 channel 作为 source 和 sink 之间的缓冲区。

实时监控单个追加文件案例

需求分析

  • 实时读取本地文件到HDFS案例
  • Hive日志文件位于 /opt/module/hive/logs/hive.log
  • Flume监控该文件
  • 数据最终存储到HDFS

实现步骤

(1)确保环境变量配置正确

确认 /etc/profile.d/my_env.sh 文件中包含以下内容:

JAVA_HOME=/opt/module/jdk1.8.0_212
HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME
flume-file-hdfs.conf%20%E6%96%87%E4%BB%B6">(2)创建 flume-file-hdfs.conf 文件

创建文件 flume-file-hdfs.conf,并添加如下内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c# Configure the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop12:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0# Configure the channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(3)运行 Flume
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
[lzl@hadoop12 hadoop-2.7.2]$ sbin/start-dfs.sh
[lzl@hadoop13 hadoop-2.7.2]$ sbin/start-yarn.sh
[lzl@hadoop12 hive]$ bin/hive
(5)在 HDFS 上查看文件
hadoop fs -ls /flume

实时监控目录下多个新文件案例

需求分析

  • 使用 Flume 监听整个目录的文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/upload

实现步骤

flume-dir-hdfs.conf%20%E6%96%87%E4%BB%B6">(1)创建 flume-dir-hdfs.conf 文件

创建文件 flume-dir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
(3)向 upload 文件夹中添加文件
[lzl@hadoop12 flume]$ mkdir upload
[lzl@hadoop12 upload]$ touch lzl.txt
[lzl@hadoop12 upload]$ touch lzl.tmp
[lzl@hadoop12 upload]$ touch lzl.log
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload

 

实时监控目录下的多个追加文件案例

需求分析

  • 使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/files

实现步骤

flume-taildir-hdfs.conf%20%E6%96%87%E4%BB%B6">(1)创建 flume-taildir-hdfs.conf 文件

创建文件 flume-taildir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
  • 在 /opt/module/flume 目录下创建 files 文件夹
[lzl@hadoop12 flume]$ mkdir files
  • 向 files 文件夹中添加文件
[lzl@hadoop12 files]$ echo hello >> file1.txt
[lzl@hadoop12 files]$ echo lzl>> file2.txt
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload2
Taildir Source 说明
  • Position File: Taildir Source 维护了一个 JSON 格式的 positionFile,它会定期地往 positionFile 中更新每个文件读取到的最新位置,因此能够实现断点续传。
  • Position File 格式:
    {"inode": 2496272,"pos": 12,"file": "/opt/module/flume/files/file1.txt"
    }
    {"inode": 2496275,"pos": 12,"file": "/opt/module/flume/files/file2.txt"
    }
  • Note: Linux 中存储文件元数据的区域称为 inode,每个 inode 都有一个编号,操作系统用 inode 编号来识别不同的文件。Unix/Linux 系统内部不使用文件名,而是使用 inode 编号来识别文件。

http://www.ppmy.cn/embedded/101497.html

相关文章

19c库启动报ORA-600 kcbzib_kcrsds_1---惜分飞

一套19c的库由于某种情况,发现异常,当时的技术使用隐含参数强制拉库,导致数据库启动报ORA-00704 ORA-600 kcbzib_kcrsds_1错误 2024-08-24T06:11:25.49430408:00 ALTER DATABASE OPEN 2024-08-24T06:11:25.49437008:00 TMI: adbdrv open database BEGIN 2024-08-24 06:11:25.49…

搭建FTP服务器,通过浏览器访问FTP服务器,测试终端上传的音频文件。

文章目录 引言I 搭建FTP服务器II 浏览器访问FTP文件PC端浏览器访问iphone-safari浏览器访问FTP设置Mac-Safari浏览器访问FTP设置III FTP基础知识FTP客户端数据连接: 被动模式(PASV)引言 需求: 通过浏览器访问,测试终端通过FTP上传的语音文件,支持直接播放语音文件。 建议…

微信小程序怎样进行本地存储的读、写、删、清?同步及异步两种类型

微信小程序提供了本地存储的API,允许开发者在页面上保存用户数据,以便在用户的会话或跨会话中持久化数据。本地存储支持同步和异步两种方式来进行读、写、删、清操作。 同步方式 微信小程序从基础库版本 2.10.0 开始,逐步废弃了同步的本地存…

在CentOS 7上安装MariaDB的方法

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 简介 MariaDB 是一个开源的数据库管理系统,通常作为流行的 LEMP(Linux、Nginx、MySQL/MariaDB、PHP/Python/Per…

0基础学习spark

零、pyspark模板 import os from pyspark import SparkContext, SparkConf os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DIRVER_PYTHON] /root/anaconda3/bin/python3 if __name__ __main__:…

【Rust光年纪】探索Rust语言中的WebAssembly利器:核心功能、安装配置与API概览

构建现代Web应用:掌握Rust语言的前端开发利器 前言 在现代的Web开发中,使用Rust语言进行前端和图形开发变得越来越流行。本文将介绍一些用于Rust语言的WebAssembly、WebGPU库,以及现代Web框架和与JavaScript互操作的库,帮助读者…

Android 模拟器的简单操作

Android 模拟器的简单操作 一、adb 命令查看模拟器基础信息 1、查看屏幕的尺寸adb shell wm size2、查看屏幕密度adb shell wm density二、常用几个 adb 命令 返回到主页,模拟按下设备的 “Home” 按钮 adb shell input keyevent 3adb shell input keyeve…

架构师面试题系列之Spring MVC面试专题及答案(31题)

目录 1、什么是 SpringMvc?说一下你对它的理解2、SpringMVC 的优点 :3、SpringMVC 工作原理?4、SpringMVC 的主要组件?5、讲下 SpringMvc 的执行流程6、SpingMvc 中的控制器的注解一般用那个,有没有别的注解可以替代?7、如果在拦截请求中,想拦截 get 方式提交的方法,怎么…