大数据技术之Flume应用案例(2)

news/2024/9/18 7:49:56/ 标签: 大数据, flume

目录

 监控端口数据官方案例

步骤 1: 准备环境

步骤 2: 配置 Flume Agent

步骤 3: 启动 Flume Agent

步骤 4: 发送数据到 Flume

步骤 5: 查看 HDFS 中的数据

注意事项

示例说明

实时监控单个追加文件案例

需求分析

实现步骤

(1)确保环境变量配置正确

flume-file-hdfs.conf%20%E6%96%87%E4%BB%B6-toc" style="margin-left:160px;">(2)创建 flume-file-hdfs.conf 文件

(3)运行 Flume

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

(5)在 HDFS 上查看文件

实时监控目录下多个新文件案例

需求分析

实现步骤

flume-dir-hdfs.conf%20%E6%96%87%E4%BB%B6-toc" style="margin-left:160px;">(1)创建 flume-dir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 upload 文件夹中添加文件

(4)查看 HDFS 上的数据

实时监控目录下的多个追加文件案例

需求分析

实现步骤

flume-taildir-hdfs.conf%20%E6%96%87%E4%BB%B6-toc" style="margin-left:120px;">(1)创建 flume-taildir-hdfs.conf 文件

(2)启动监控文件夹命令

(3)向 files 文件夹中追加内容

(4)查看 HDFS 上的数据

Taildir Source 说明


 监控端口数据官方案例

Flume 可以用来监控网络端口数据,这对于收集来自不同系统的日志或数据非常有用。下面是一个使用 Flume 监控网络端口数据的官方示例,我们将使用 Flume 的 netcat source 来接收数据,并将其写入到 HDFS 中。

步骤 1: 准备环境

确保已经安装并配置好了 Flume 和 Hadoop。这里假设你已经在上一步中完成了 Flume 的安装。

步骤 2: 配置 Flume Agent

创建一个名为 flume-conf.properties 的配置文件,该文件将定义一个 Flume Agent 的配置。

配置文件 flume-conf.properties

# 定义 agent 名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# 配置 sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/flume
a1.sinks.k1.hdfs.filePrefix = flume-logs
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.rollSize = 512
a1.sinks.k1.hdfs.rollCount = 20
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true# 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 配置 agent 的 source、channel 和 sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

步骤 3: 启动 Flume Agent

使用以下命令启动 Flume Agent:

$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file ./flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

这里 $FLUME_HOME 是 Flume 的安装目录。

步骤 4: 发送数据到 Flume

你可以使用 netcat 工具或其他类似工具发送数据到 Flume 监听的端口。例如,如果你在另一台机器上或同一台机器的不同终端窗口中,可以使用 netcat 发送数据:

echo "This is a test message" | nc localhost 44444

步骤 5: 查看 HDFS 中的数据

一旦数据被发送到 Flume,Flume 将其写入到 HDFS 中。你可以使用 Hadoop 命令来查看数据:

hadoop fs -ls /flume
hadoop fs -cat /flume/flume-logs-*

注意事项

  • 确保 Hadoop 的 hdfs-site.xml 和 core-site.xml 配置文件已经正确配置。
  • 如果你的 Hadoop 集群使用了安全模式,确保你已经配置了正确的 Kerberos 凭证。
  • 如果你使用的是分布式 Flume,确保所有的 Flume 节点都能够访问 HDFS。

示例说明

  • Netcat Source (a1.sources.r1):配置了 netcat source 来监听 localhost 的 44444 端口。
  • HDFS Sink (a1.sinks.k1):配置了 HDFS sink 将数据写入到 HDFS 的 /flume 目录下。
  • Memory Channel (a1.channels.c1):使用内存 channel 作为 source 和 sink 之间的缓冲区。

实时监控单个追加文件案例

需求分析

  • 实时读取本地文件到HDFS案例
  • Hive日志文件位于 /opt/module/hive/logs/hive.log
  • Flume监控该文件
  • 数据最终存储到HDFS

实现步骤

(1)确保环境变量配置正确

确认 /etc/profile.d/my_env.sh 文件中包含以下内容:

JAVA_HOME=/opt/module/jdk1.8.0_212
HADOOP_HOME=/opt/module/ha/hadoop-3.1.3
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH JAVA_HOME HADOOP_HOME
flume-file-hdfs.conf%20%E6%96%87%E4%BB%B6">(2)创建 flume-file-hdfs.conf 文件

创建文件 flume-file-hdfs.conf,并添加如下内容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2# Configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c# Configure the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop12:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0# Configure the channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
(3)运行 Flume
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
[lzl@hadoop12 hadoop-2.7.2]$ sbin/start-dfs.sh
[lzl@hadoop13 hadoop-2.7.2]$ sbin/start-yarn.sh
[lzl@hadoop12 hive]$ bin/hive
(5)在 HDFS 上查看文件
hadoop fs -ls /flume

实时监控目录下多个新文件案例

需求分析

  • 使用 Flume 监听整个目录的文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/upload

实现步骤

flume-dir-hdfs.conf%20%E6%96%87%E4%BB%B6">(1)创建 flume-dir-hdfs.conf 文件

创建文件 flume-dir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
(3)向 upload 文件夹中添加文件
[lzl@hadoop12 flume]$ mkdir upload
[lzl@hadoop12 upload]$ touch lzl.txt
[lzl@hadoop12 upload]$ touch lzl.tmp
[lzl@hadoop12 upload]$ touch lzl.log
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload

 

实时监控目录下的多个追加文件案例

需求分析

  • 使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS
  • 被监控的目录位于 /opt/module/flume/files

实现步骤

flume-taildir-hdfs.conf%20%E6%96%87%E4%BB%B6">(1)创建 flume-taildir-hdfs.conf 文件

创建文件 flume-taildir-hdfs.conf,并添加如下内容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3# Configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*# Configure the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop12:9000/flume/upload2/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0# Configure the channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
(2)启动监控文件夹命令
[lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
  • 在 /opt/module/flume 目录下创建 files 文件夹
[lzl@hadoop12 flume]$ mkdir files
  • 向 files 文件夹中添加文件
[lzl@hadoop12 files]$ echo hello >> file1.txt
[lzl@hadoop12 files]$ echo lzl>> file2.txt
(4)查看 HDFS 上的数据
hadoop fs -ls /flume/upload2
Taildir Source 说明
  • Position File: Taildir Source 维护了一个 JSON 格式的 positionFile,它会定期地往 positionFile 中更新每个文件读取到的最新位置,因此能够实现断点续传。
  • Position File 格式:
    {"inode": 2496272,"pos": 12,"file": "/opt/module/flume/files/file1.txt"
    }
    {"inode": 2496275,"pos": 12,"file": "/opt/module/flume/files/file2.txt"
    }
  • Note: Linux 中存储文件元数据的区域称为 inode,每个 inode 都有一个编号,操作系统用 inode 编号来识别不同的文件。Unix/Linux 系统内部不使用文件名,而是使用 inode 编号来识别文件。

http://www.ppmy.cn/news/1517142.html

相关文章

WMI (Windows Management Instrumentation)类集合

WMI (Windows Management Instrumentation) 是一个强大的系统管理工具,包含大量的类,用于检索和操作系统信息。WMI 类的数量庞大,涵盖从操作系统到硬件设备的各种信息。 WMI 类通常以 Win32_ 或 CIM_ 前缀开头。Win32_ 类主要用于Windows系统…

AI绘画工具 Stable Diffusion【插画转绘】:建筑 | 风景| 人像照片的插画转绘制作教程,照片秒变插画风格图片!

大家好,我是画画的小强 关于Stable Diffusion 的插画转绘,今天给大家分享一种制作方法。我们先看一下效果图。 一. 图片转插画的制作方法 本期教程我们将使用AI绘画工具Stable Diffusion,关于SD的安装和入门使用可以看看我的往期入门教程…

uniapp u--input实现select下拉列表 input点击事件

背景&#xff1a; 技术框架&#xff1a; uniapp框架(vue2语法)uView组件库。 通过form表单实现数据列表的“查询”功能。注意&#xff1a; 1、<u--form>内部嵌套<u-form-item>&#xff0c;<u-form-item>内部嵌套<u--input>表单组件。 2、H5浏览器端&am…

HTTP 414错误问题

问题描述&#xff1a; 在一次前端编辑报表完成&#xff0c;打开审核人选择弹出框的时候&#xff0c;layer直接报414错误。 问题分析&#xff1a; HTTP 414是HTTP协议中的一个状态码&#xff0c;表示请求的URI&#xff08;Uniform Resource Identifier&#xff09;过长&#…

海睿思通过华东江苏大数据交易中心数商认证,提供高质量数据治理服务!

近日&#xff0c;中新赛克海睿思成功通过华东江苏大数据交易中心的数商认证&#xff0c;获得华东江苏大数据交易中心颁发的“数据治理服务商”证书。 华东数交是在实施“国家大数据战略”大背景下&#xff0c;经国家批准的华东地区首个省级特色数据要素交易平台&#xff0c;致力…

美客多卖家如何借助自养号测评提升销量

在美客多这一电商平台上&#xff0c;尽管当前多数卖家尚未充分利用测评技术&#xff0c;但其作为低成本、高回报的推广方式&#xff0c;无疑蕴藏着巨大的市场潜力。面对竞争相对缓和的市场环境及卖家对测评概念的普遍忽视&#xff0c;以下是对测评技术重要性的强调及其实施策略…

Adobe After Effects的插件--------CC Ball Action

CC Ball Action是粒子效果器,其将2D图层变为一个个由3D小球构成的图层。它是AE内置的3D插件。 使用条件 使用该插件的图层需是2D图层。 我们以一张图片素材为例: 给图片图层添加CC Ball Action效果控件,然后新建一个摄像机(利用摄像机旋转、平移、推拉工具,方便在各个角…

Spark MLlib 特征工程系列—特征转换VectorSizeHint

Spark MLlib 特征工程系列—特征转换VectorSizeHint VectorSizeHint 是 Spark 提供的一个特征转换器,用于指定向量列的大小(即维度)。在一些特征转换和建模过程中,要求输入的向量必须有固定的大小。当数据中包含不同大小的向量时,Spark 可能无法自动推断出向量的正确大小…

基于Vue3和Node.js的完整增删改查项目实现教程:从后端封装到前端调用

在 Node.js 中封装一个增删改查&#xff08;CRUD&#xff09;接口&#xff0c;并在 Vue 3 前端调用这些接口。整个过程包括后端 API 的创建和前端的调用。 一、安装 Node.js 和 Express 脚手架 1. 安装 Node.js 首先&#xff0c;你需要安装 Node.js。你可以通过以下步骤进行安…

零基础入门转录组数据分析——预后模型之多因素cox模型

零基础入门转录组数据分析——预后模型之多因素cox模型 目录 零基础入门转录组数据分析——预后模型之多因素cox模型1. 预后模型和多因素cox模型基础知识2. 多因素cox预后模型&#xff08;Rstudio&#xff09;——代码实操2. 1 数据处理2. 2 构建多因素cox模型&#xff08;用输…

如何构建社区康养管理系统?实现老年人服务管理全攻略【Java SpringBoot】

✍✍计算机毕业编程指导师 ⭐⭐个人介绍&#xff1a;自己非常喜欢研究技术问题&#xff01;专业做Java、Python、微信小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。 ⛽⛽实战项目&#xff1a;有源码或者技术上的问题欢迎在评论区一起讨论交流&#xff01; ⚡⚡ Java、…

【Go语言成长之路】多模块工作区入门

文章目录 【Go语言成长之路】多模块工作区入门前提条件一、创建一个模块二、创建工作空间三、创建第二个模块四、更多关于workspace 【Go语言成长之路】多模块工作区入门 ​ 多模块工作区(muti-module workspaces)可以使得开发者在多个模块中构建并且运行代码&#xff0c;相互…

在浏览器上使用transformers.js运行(WebGPU)RMBG-1.4进行抠图(背景移除)

在浏览器上使用transformers.js运行&#xff08;WebGPU&#xff09;RMBG-1.4进行抠图&#xff08;背景移除&#xff09; 说明&#xff1a; 首次发表日期&#xff1a;2024-08-28官方Github仓库地址&#xff1a; https://github.com/xenova/transformers.js/tree/main/examples…

《深入浅出WPF》读书笔记.8路由事件

《深入浅出WPF》读书笔记.8路由事件 背景 路由事件是直接响应事件的变种。直接响应事件&#xff0c;事件触发者和事件响应者必须显示订阅。而路由事件的触发者和事件响应者之间的没有显示订阅&#xff0c;事件触发后&#xff0c;事件响应者安装事件监听器&#xff0c;当事件传…

SpringBoot -在Axis2中,RPCServiceClient调用WebService

在 Axis2 中,RPCServiceClient 是一种用于调用 WebService 的客户端实现。下面是如何将它们 结合起来使用的一个示例: 步骤 1: 添加依赖 首先,在 pom.xml 文件中添加 Axis2 的相关依赖。 <dependencies><!-- 其他依赖 --><dependency><groupId>…

拓扑排序-

基本原理 就是存在一个入度为0的点和一个出度为0的点 然后图中所有点都是指向同一个方向&#xff1b; /* 拓扑序列&#xff1a; 特点&#xff1a;有向无环图 判断&#xff1a;判断所有的点是否入度为0 换句话 就是入度为0的点个数是否满点的总数 过程&#xff1a;建图、入度数…

2024HarmonyOS应用开发者高级认证最新整理题库和答案(已收录182道 )

更新截止2024-08-27,完整题库一共182道题,足够覆盖90%考题,如有新题和遗漏我会持续补充 所有题目的选项都是打乱顺序的,记答案不要记序号 完整题库请在我的网盘下载或查看在线文档 完整题库在线文档预览 单选(已收录102道) 1 . 以下哪个装饰器用来表示并发共享对象。(B) A. @…

通过Python绘制不同数据类型适合的可视化图表

在数据可视化中&#xff0c;对于描述数值变量与数值变量之间的关系常见的有散点图和热力图&#xff0c;以及描述数值变量与分类变量之间的关系常见的有条形图&#xff0c;饼图和折线图&#xff0c;可以通过使用Python的matplotlib和seaborn库来绘制图表进行可视化表达&#xff…

超详细!!!uniapp通过unipush全流程实现app消息推送

云风网 云风笔记 云风知识库 一、HBuilder新建APP项目 二、配置推送服务 1、登录Dcloud开发者中心开发者中心&#xff0c;查看我的应用 2、生成云端证书 3、创建平台信息 4、配置推送服务信息 这里需要关联服务空间&#xff0c;可以申请免费服务空间进行测试 三、代码配置 1…