flume对kafka中数据的导入导出、datax对mysql数据库数据的抽取

embedded/2024/11/22 2:10:22/

目录

flume%E8%87%AA%E5%AE%9A%E4%B9%89%E6%8B%A6%E6%88%AA%E5%99%A8-toc" style="margin-left:120px;">1、flume自定义拦截器

flume%E5%B0%86%E6%95%B0%E6%8D%AE%E6%8A%BD%E5%8F%96%E5%88%B0%E8%AF%A5%E4%B8%BB%E9%A2%98%E7%9A%84kafka%E4%B8%AD-toc" style="margin-left:120px;">2、创建topic为yuekao的主题,并使用flume将数据抽取到该主题的kafka

kafka%E4%B8%AD%E7%9A%84%E6%95%B0%E6%8D%AE%E6%94%BE%E5%85%A5%E5%88%B0hdfs%E4%B8%8A%EF%BC%8C%E7%9B%AE%E5%BD%95%E4%B8%BA%EF%BC%9A%2Fyuekao%2Fods%2Fzhuanzhang%C2%A0-toc" style="margin-left:120px;">3、将kafka中的数据放入到hdfs上,目录为:/yuekao/ods/zhuanzhang 

 4、 通过datax,对MySQL数据库中的表进行抽取,落入hdfs指定的目录中: /yuekao/ods/user_info

 要求: 

flume%E8%87%AA%E5%AE%9A%E4%B9%89%E6%8B%A6%E6%88%AA%E5%99%A8" style="text-align:justify;">1、flume自定义拦截器

抽取trans_info.json的数据到kafka上,对其中的tr_flag=0的数据进行过滤抛弃,只保留正常的状态数据

在pom.xml中放入依赖包: 

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.5</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-sdk</artifactId><version>1.9.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.48</version></dependency>

使用java代码,自定义拦截器: 

package com.bigdata.yuekao04;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;public class DemoInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {try {// 获取事件体中的数据(假设数据是JSON格式存储在事件体中)String data = new String(event.getBody());// 使用Jackson将JSON字符串解析为JsonNode对象ObjectMapper objectMapper = new ObjectMapper();JsonNode jsonNode = objectMapper.readTree(data);// 获取tr_flag的值int trFlag = jsonNode.get("tr_flag").asInt();// 如果tr_flag不等于0,保留该事件if (trFlag!= 0) {return event;}} catch (IOException e) {e.printStackTrace();}// 如果tr_flag等于0,返回null,表示过滤掉该事件return null;}@Overridepublic List<Event> intercept(List<Event> list) {return null;}@Overridepublic void close() {}public static class BuilderEvent implements Builder{@Overridepublic Interceptor build() {return new DemoInterceptor();}@Overridepublic void configure(Context context) {}}
}

打包java代码,放入/flume/lib下面

flume%E5%B0%86%E6%95%B0%E6%8D%AE%E6%8A%BD%E5%8F%96%E5%88%B0%E8%AF%A5%E4%B8%BB%E9%A2%98%E7%9A%84kafka%E4%B8%AD" style="text-align:justify;">2、创建topic为yuekao的主题,并使用flume将数据抽取到该主题的kafka

 编写conf文件(yuekao04.conf),将数据抽取到kafka新创建的主题中:

# 定义Flume agent名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置source
a1.sources.r1.type = TAILDIR
#以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件
a1.sources.s1.filegroups = f1
#文件组的绝对路径
a1.sources.s1.filegroups.f1=/home/trans_info1.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.bigdata.DemoInterceptor$Builder# 配置channel
a1.channels.c1.type = file# 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = yuekao04
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.channel = c1
kafka%E4%B8%AD%E7%9A%84%E6%95%B0%E6%8D%AE%E6%94%BE%E5%85%A5%E5%88%B0hdfs%E4%B8%8A%EF%BC%8C%E7%9B%AE%E5%BD%95%E4%B8%BA%EF%BC%9A%2Fyuekao%2Fods%2Fzhuanzhang%C2%A0" style="text-align:justify;">3、将kafka中的数据放入到hdfs上,目录为:/yuekao/ods/zhuanzhang 

编写conf文件,然后执行该文件,将kafka中的数据放入hdfs中: 

a1.sources = r1
a1.channels = c1
a1.sinks=k1a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = yuekao04
a1.sources.r1.kafka.consumer.group.id =yuekao
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /yuekao/ods/zhuanzhang/%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream

 结果展示:

 

 4、 通过datax,对MySQL数据库中的表进行抽取,落入hdfs指定的目录中: /yuekao/ods/user_info

先在mysql中建表,然后将user_info.sql表中数据插入:

CREATE TABLE `user_info` (
`name` VARCHAR (255) ,
phone_num VARCHAR (255) ,
email VARCHAR (255) ,
addr_info VARCHAR (255) ,
gender VARCHAR (255) ,
idno VARCHAR (255) ,
create_time VARCHAR (255) ,
user_id int
);

编写json文件(demo.json),然后执行,将数据库中的数据放入hdfs中:

{"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"writeMode": "insert","username": "root","password": "123456","column": ["name","phone_num","email","addr_info","gender","idno","create_time","user_id"],"splitPk": "user_id","connection": [{"table": ["user_info"],"jdbcUrl": ["jdbc:mysql://bigdata01:3306/yuekao"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://bigdata01:9820","fileType": "text","path": "/yuekao/ods/user_info","fileName": "user_info.txt","column": [{"name": "name","type": "string"},{"name": "phone_num","type": "string"},{"name": "email","type": "string"},{"name": "addr_info","type": "string"},{"name": "gender","type": "string"},{"name": "idno","type": "string"},{"name": "create_time","type": "string"},{"name": "user_id","type": "int"}],"writeMode": "append","fieldDelimiter": ","}}}]}
}

执行json文件: 

datax.py demo.json

 结果展示:

 数据放不进来,有需要的小伙伴可以私我!!!

 


http://www.ppmy.cn/embedded/139490.html

相关文章

Java数据库连接(Java Database Connectivity,JDBC)

1.JDBC介绍 Java数据库连接&#xff08;Java Database Connectivity&#xff0c;JDBC&#xff09;是SUN公司为了简化、统一对数据库的操作&#xff0c;定义的一套Java操作数据库的规范&#xff08;接口&#xff09;。这套接口由数据库厂商去实现&#xff0c;这样&#xff0c;开…

【软考】系统架构设计师-信息安全技术基础

信息安全核心知识点 信息安全5要素&#xff1a;机密性、完整性、可用性、可控性、审查性 信息安全范围&#xff1a;设备安全、数据安全、内容安全、行为安全 网络安全 网络安全的隐患体现在&#xff1a;物理安全性、软件安全漏洞、不兼容使用安全漏洞、选择合适的安全哲理 …

模拟实现STL中的list

目录 1.设计list的结点 2.设计list的迭代器 3.list类的设计总览 4.list类的迭代器操作 5.list类的四个特殊的默认成员函数 无参的默认构造函数 拷贝构造函数 赋值运算符重载函数 析构函数 6.list类的插入操作 7.list类的删除操作 8.list.hpp源代码 1.设计list的结点…

淘宝 NPM 镜像源

npm i vant/weapp -S --production npm config set registry https://registry.npmmirror.com 要在淘宝 NPM 镜像站下载项目或依赖&#xff0c;你可以按照以下步骤操作&#xff1a; 1. 设置淘宝 NPM 镜像源 首先&#xff0c;你需要设置淘宝 NPM 镜像源以加速下载。可以通过…

多用户Markdown笔记应用Many Notes

什么是 Many Notes &#xff1f; Many Notes 是一款专为简约设计的 Markdown 笔记应用&#xff01;您可以轻松创建或导入您的笔记库&#xff0c;并立即整理您的想法。 软件特点&#xff1a; 多用户支持每个用户多个笔记库文件搜索树形视图浏览器&#xff0c;便于快速导航导入/…

ElasticSearch-全文检索(一)基本介绍

简介 Elasticsearch&#xff1a;官方分布式搜索和分析引擎 | Elastic 全文搜索属于最常见的需求&#xff0c;开源的Elasticsearch是目前全文搜索引擎的首选。 它可以快速地储存、搜索和分析海量数据。维基百科、StackOverflow、Github都采用它 Elastic的底层是开源库Lucene。但…

深度学习实验十一 卷积神经网络(2)——基于LeNet实现手写体数字识别实验

目录 一、数据 二、模型构建 三、模型训练及评价 四、打印参数量和计算量 五、模型预测 附&#xff1a;完整可运行代码 实验大致步骤&#xff1a; 一、数据 下载网站&#xff1a;MNIST数据集 之前的官网不能下载数据集了&#xff0c;403了&#xff0c;所以找到一个类似…

容器运行时 AND Docker

容器运行时 and Docker 什么是Docker Docker 使用 Google 公司推出的 Go 语言 进行开发实现&#xff0c;基于 Linux 内核的 cgroup&#xff0c;namespace&#xff0c;以及 AUFS 类的 Union FS 等技术&#xff0c;对进程进行封装隔离&#xff0c;属于 操作系统层面的虚拟化技术…