大数据-227 离线数仓 - Flume 自定义拦截器(续接上节) 采集启动日志和事件日志

news/2024/11/22 14:34:15/

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Flume 自定义拦截器
  • 拦截原理 拦截器实现 Java

在这里插入图片描述

自定义拦截器

(续接上节,上节已经到了打包的部分)

上传结果

将刚才的打包上传到这个目录下:
我拷贝的是带依赖的:“flume-test-1.0-SNAPSHOT-jar-with-dependencies.jar”

/opt/servers/apache-flume-1.9.0-bin/lib/

测试效果

我们创建刚才说的conf文件:

vim /opt/wzk/flume-conf/flumetest1.conf

编写的内容如下图所示:
在这里插入图片描述
启动进行测试:

flume-ng agent --conf-file /opt/wzk/flume-conf/flumetest1.conf -name a1 -Dflume.roog.logger=INFO,console

启动结果如下图所示:
z

我们启动 telnet 来传入数据:

telnet h122.wzk.icu 9999

启动之后输入数据:

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

此时控制台的数据内容部分为:

4/08/27 15:31:31 INFO source.NetcatSource: Source starting
24/08/27 15:31:31 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/172.16.1.130:9999]
24/08/27 15:32:09 INFO sink.LoggerSink: Event: { headers:{logtime=2020-07-30} body: 32 30 32 30 2D 30 37 2D 33 30 20 31 34 3A 31 38 2020-07-30 14:18 }

对应的截图如下图所示:
在这里插入图片描述

采集启动日志(使用自定义拦截器)

配置文件

新建一个配置文件:

vim /opt flume-log2hdfs2.conf

写入的内容如下所示:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.CustomerInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
# a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

内容的截图如下所示:
在这里插入图片描述
修改的内容如下:

  • 给source增加自定义拦截器
  • 去掉时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
  • 根据header中的logtime写文件

测试运行

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console

拷贝日志

修改日志的内容:

vim /opt/wzk/logs/start/test.log

继续写入如下的内容:

2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}

写入内容如下图所示:
在这里插入图片描述

测试效果

可以看到HDFS上,已经有了读出日志中的时间的内容:
在这里插入图片描述

采集启动日志和事件日志

本系统中要采集两种日志:

  • 启动日志
  • 事件日志
    不同的日志放置在不同的目录下,要想一次拿到全部日志需要监控多个目录。
    在这里插入图片描述

总体思路

  • taildir 监控多个目录
  • 修改自定义拦截器,不同来源的数据加上不同标志
  • HDFS、Sink 根据标志写文件

Agent 介绍

Flume 是一个分布式、高可靠、可用来收集、聚合和传输大量日志数据的系统。在 Flume 的体系结构中,Agent 是一个关键的组件。每个 Agent 是一个独立的 JVM 进程,负责从数据源获取数据并将其传递到下游(如文件系统、数据库、或者另一个 Agent)

Agent 的核心组成部分

Flume Agent 的架构是高度模块化的,它由以下三个核心组件构成:

Source (源)

Source 是数据流的起点,负责接收外部数据。它支持多种数据传输协议和格式,能够从日志文件、网络端口、消息队列等数据源中接收事件。

支持的 Source 类型:

  • Avro Source:接收来自其他 Flume Agent 的 Avro 格式数据。
  • Syslog Source:处理 Syslog 消息。
  • Exec Source:从本地命令或脚本的输出中读取数据。
  • HTTP Source:通过 HTTP 接口接收数据。
  • Spooling Directory Source:监控特定目录中的文件并读取内容。

Channel (通道)

Channel 是 Agent 的数据缓冲区域,用于在 Source 和 Sink 之间暂存事件。Channel 的设计保证了在数据流动中断(如网络故障)时,数据不会丢失。

常见的 Channel 类型:

  • Memory Channel:将事件存储在内存中,速度快,但可能会丢失数据(如果 Agent 崩溃)。
  • File Channel:将事件存储在磁盘文件中,提供高可靠性但性能较低。
  • Kafka Channel:使用 Kafka 作为中转通道,适合分布式场景。

Sink (接收器)

Sink 是数据流的终点,负责将事件传递到下游存储或处理系统。

支持的 Sink 类型:

  • HDFS Sink:将事件写入 Hadoop HDFS。
  • Kafka Sink:将事件发送到 Kafka。
  • Elasticsearch Sink:将事件写入 Elasticsearch。
  • Logger Sink:将事件输出到日志。
  • Avro Sink:将事件传递到另一个 Flume Agent 的 Avro Source。

Agent配置

vim /opt/wzk/flume-conf/flume-log2hdfs3.conf

写入的内容如下图所示:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%
{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • filegroups:指定filegroups,可以有多个,以空格分割(taildir source可以同时监控多个目录中的文件)
  • headers.filegroups.headerKey

给Event增加header Key,不同的filegroup,可配置不同的value。


http://www.ppmy.cn/news/1549058.html

相关文章

Redis Pipeline 管道 ✨

✨探索Redis Pipeline 管道✨ Redis Pipeline 管道批量处理:谨防传输命令过多 在 Redis 操作中,Redis Pipeline 管道批量处理是个很实用的功能,但使用时需格外留意,不要在一次批处理中传输太多命令,下面来详细说说原因…

【LeetCode面试150】——202快乐数

博客昵称:沈小农学编程 作者简介:一名在读硕士,定期更新相关算法面试题,欢迎关注小弟! PS:哈喽!各位CSDN的uu们,我是你的小弟沈小农,希望我的文章能帮助到你。欢迎大家在…

力扣 LeetCode 98. 验证二叉搜索树(Day9:二叉树)

解题思路&#xff1a; 方法一&#xff1a;直接中序遍历放入数组&#xff08;非常直观的想法&#xff09; 需要注意用for循环进行重复值的判断&#xff0c;二叉搜索树不能有值相等的节点 class Solution {List<Integer> list new ArrayList<>();public boolean …

Redis 集群主要有以下几种类型

Redis 集群主要有以下几种类型&#xff1a; 主从复制模式&#xff1a; 这种模式包含一个主数据库实例&#xff08;master&#xff09;与一个或多个从数据库实例&#xff08;slave&#xff09;。客户端可以对主数据库进行读写操作&#xff0c;对从数据库进行读操作&#xff0c;主…

Kafka Stream实战教程

Kafka Stream实战教程 1. Kafka Streams 基础入门 1.1 什么是 Kafka Streams Kafka Streams 是 Kafka 生态中用于 处理实时流数据 的一款轻量级流处理库。它利用 Kafka 作为数据来源和数据输出&#xff0c;可以让开发者轻松地对实时数据进行处理&#xff0c;比如计数、聚合、…

2024-11-18-sklearn学习(1)-线性回归(2)

文章目录 sklearn学习&#xff08;1&#xff09;-线性回归(2)3.Lasso&#xff08;拟合系数稀疏的线性模型&#xff09;3.1 Lasso基础3.2 Lasso的正则化参数设置3.2.1 使用交叉验证3.2.1 基于信息标准的模型选择3.2.1 与 SVM&#xff08;支持向量机&#xff09; 的正则化参数的比…

【halcon技巧】如何扩大背景

背景介绍 我需要将大量零散的区域聚合到一起,所以会用到膨胀,将分散的区域粘到一起。 形成一个整体之后还需要恢复到之前的大小!于是就会用到腐蚀。这样就能恢复到和之前一样的大小。 但是理想很饱满,现实很意外。现在出现的情况是,膨胀时 图片右边膨胀的区域大小超出的…

springboot基于微信小程序的食堂预约点餐系统

摘 要 基于微信小程序的食堂预约点餐系统是一种服务于学校和企事业单位食堂的智能化解决方案&#xff0c;旨在提高食堂就餐的效率、缓解排队压力&#xff0c;并优化用户的就餐体验。系统作为一种现代化的解决方案&#xff0c;为食堂管理和用户就餐提供了便捷高效的途径。它不仅…