kafka:使用flume自定义拦截器,将json文件抽取到kafka的消息队列(topic)中,再从topic中将数据抽取到hdfs上

server/2024/11/19 11:49:03/

抽取trans_info.json的数据到kafka上,对其中的tr_flag=0的数据进行过滤抛弃,只保留正常的状态数据: 将此json文件放在集群中的 /home/zidingyi/trans_info.json 目录下

首先先在java代码中自定义拦截器:

1):创建maven项目,在pom文件中导入相关依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.48</version></dependency>
</dependencies><!--可以使用maven中的某些打包插件,不仅可以帮助我们打包代码还可以打包所依赖的jar包--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- 禁止生成 dependency-reduced-pom.xml--><createDependencyReducedPom>false</createDependencyReducedPom></configuration><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><relocations><relocation><!-- 解决包冲突 进行转换--><pattern>com.google.protobuf</pattern><shadedPattern>shaded.com.google.protobuf</shadedPattern></relocation></relocations><artifactSet><excludes><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude></excludes></filter></filters><transformers><!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>mainclass</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

自定义拦截器代码:

package com.bigdata;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;
import java.util.List;public class zidingyi implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {String body = new String(event.getBody());JSONObject jsonObject = JSONObject.parseObject(body);//获取json中的值int trFlag = jsonObject.getInteger("tr_flag");// 如果tr_flag中的值为0,就返回空if (trFlag == 0){return null;}return event;}@Overridepublic List<Event> intercept(List<Event> list) {ArrayList<Event> filterEvents = new ArrayList<>();for (Event event : list) {Event intercept = intercept(event);if (intercept != null){filterEvents.add(intercept);}}return filterEvents;}@Overridepublic void close() {}public static class BuilderEvent implements Builder{@Overridepublic Interceptor build() {return new zidingyi();}@Overridepublic void configure(Context context) {}}}

使用maven打包,生成jar包后上传到flume下的lib目录下

2):上传好jar包后,在flume下的conf中创建了一个myconf文件,创建一个zidinfyi.conf文件,编写flume的conf文件即可(记得使用自定义拦截器)

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zidingyi/trans_info.json#使用自定义拦截器
a1.sources.s1.interceptors = i1
# type指的是编写java代码所在目录的路径名(我的是在com.bigdata.zidingyi下)
a1.sources.s1.interceptors.i1.type = com.bigdata.zidingyi$BuilderEvent# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = zidingyi
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

执行之前,先在kafka中创建消息队列(topic)中创建一个topic :zidingyi 数据将会导入到这个topic中

创建好后执行conf文件即可

flume-ng agent -c ./ -f zidingyi.conf -n a1 -Dflume.root.logger=INFO,console

可以使用

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic zidingyi

把主题中所有的数据都读取出来(包括历史数据)并且还可以接收来自生产者的新数据

3):将topic中的数据抽取到hdfs

里面的group.id随便指定即可

执行此conf文件即可

flume-ng agent -c ./ -f zidingyi2.conf -n a1 -Dflume.root.logger=INFO,console
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = zidingyi
a1.sources.r1.kafka.consumer.group.id = donghu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /zidingyi/ods/clearDate/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp=truea1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text

数据抽取成功


http://www.ppmy.cn/server/143175.html

相关文章

django从入门到实战(三)——CBV视图介绍

在 Django 中&#xff0c;不同类型的视图&#xff08;如数据显示视图、数据操作视图和日期筛选视图&#xff09;都有各自的方法和参数。以下是对这些视图及其方法的详细介绍。 1. 数据显示视图 1.1 重定向视图 方法&#xff1a; redirect(): 用于重定向到另一个 URL。 使用…

无人机+无人车+机器狗:城市巷战突破技术详解

在城市巷战中&#xff0c;无人机、无人车和机器狗的组合可以形成一种全新的战场突破技术&#xff0c;这种组合能够大幅提升作战效率&#xff0c;减少人员伤亡。以下是对这一技术的详细解析&#xff1a; 一、无人机的作用 1.空中侦察&#xff1a;无人机能够提供高空视角&#x…

Acwing342

这个代码实现了一种结合 连通块分解、拓扑排序 和 Dijkstra 算法 的复杂图的最短路径计算方法&#xff0c;适用于含有两类边的图结构&#xff1a;普通边&#xff08;在连通块内&#xff09;和特殊边&#xff08;跨连通块&#xff09;。 以下是详细的代码讲解&#xff0c;逐步解…

IDEA旗舰版编辑器器快速⼊门(笔记)

简介&#xff1a;javaweb开发必备软件之IDEA期间版介绍 DEA编辑器器版本介绍 官⽹网&#xff1a;https://www.jetbrains.com/地址&#xff1a;https://www.jetbrains.com/idea/download/#sectionmac DEA 分社区版(Community) 和 旗舰版(Ultimate)&#xff0c;我们做JavaWeb开…

C++中的观察者模式:通俗易懂的讲解与实现

什么是观察者模式&#xff1f; 观察者模式是一种常见的设计模式&#xff0c;它解决了这样一个问题&#xff1a;当某个对象的状态发生变化时&#xff0c;如何通知依赖它的其他对象&#xff1f; 用通俗的话说&#xff0c;观察者模式就像我们日常的“订阅-通知”机制&#xff1a…

【青牛科技】汽车收音机调频中频放大器——D1145

性能优势 内置多种电路&#xff1a;内置芯片中频计数缓冲电路及 ETR 微处理控制开关电路&#xff0c;这些电路的集成有助于提升整体性能和功能的集成度&#xff0c;减少外部电路的复杂性和成本1.线性输出信号好&#xff1a;能够输出质量较高的线性信号&#xff0c;这对于保证收…

为什么VScode不能连服务器,MobaXterm可以连

VSCode无法连接服务器但MobaXterm可以连接的原因可能有以下几种‌&#xff1a; ‌SSH协议问题‌&#xff1a;首先检查SSH协议是否正常工作。可以尝试使用其他终端工具&#xff08;如Xshell或MobaXterm&#xff09;连接服务器&#xff0c;如果这些工具也无法连接&#xff0c;说…

2022数学分析【南昌大学】

2022 数学分析 利用极限定义证明: lim ⁡ n → ∞ 4 n 3 + n − 2 2 n 3 − 10 = 2 \mathop {\lim }\limits_{n \to \infty } \frac{{4{n^3} + n - 2}}{{2{n^3} - 10}} = 2 n→∞lim​2n3−104n3+n−2​=2 ∀ ε > 0 \forall \varepsilon>0 ∀ε>0 要使不等式成立,…