Flink整合Hive、Mysql、Hbase、Kafka

ops/2024/11/17 15:06:25/

注意:Flink整合Hive后,可以用Hive的库和表,以及Hive中的函数方法,但是Hive不能使用Flink sql 里面的表,因为Hive不能进行流处理

这里Flink整合Hive,是将Flink的元数据保存到Hive中,并使用hive, 而其他的整合都只是使用

一、Flink整合Hive

1、上传jar包,开启hive元数据

# 1、上传jar包到flink的lib目录下
flink-sql-connector-hive-3.1.2_2.12-1.15.4.jar
mysql-connector-java-5.1.49.jar
cp /usr/local/soft/hadoop-3.1.1/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.1.jar /usr/local/soft/flink-1.15.4/lib/#替换flink解析器     --如果不替换,后面有时候查询sql会报错
cp flink-table-planner_2.12-1.15.4.jar ../lib/
mv flink-table-planner-loader-1.15.4.jar ../opt/# 2、重启集群(如果之前已经启动的话)
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d# 3、开启hive元数据服务
nohup  hive --service metastore >> metastore.log 2>&1 &# 4、重新进入sql命令行
sql-client.sh

2、创建hive catalog表

-- 创建catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive','hive-conf-dir' = '/usr/local/soft/hive-3.1.3/conf'
);
-- 查看所有catalog
show catalogs;-- 切换catalog
use catalog hive_catalog;-- 创建数据库
create database flink;-- 切换数据库
use flink;-- 创建表
-- 在flink中使用flink建表语句床架弄得表,在hive中不能查询的
CREATE TABLE students_json (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka', -- 之前整合了kafka,自己可以换数据源'topic' = 'students','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset', -- 起始消费的位置'format' = 'json' -- 数据的格式,自动解析数据
);select * from hive_catalog.flink.students_json;

3、使用hive function

-- 加载hive的函数LOAD MODULE hive WITH ('hive-version' = '3.1.2');select split('java,spark',',');

二、Flink整合Mysql

1、上传jar包

# 1、上传jar包到flink的lib目录
flink-connector-jdbc-1.15.4.jar
mysql-connector-java-5.1.49.jar# 2、重启集群
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d# 3、重新进入sql命令行
sql-client.sh

2、Mysql Source

-- 有界流
CREATE TABLE students_jdbc (id BIGINT,name STRING,age BIGINT,gender STRING,clazz STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/Test','table-name' = 'students','username' ='root','password' ='123456'
);
select * from students_jdbc;

3、Mysql Sink

-- sink 表
CREATE TABLE clazz_num_mysql (clazz STRING,num BIGINT,PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键进行更新
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/Test','table-name' = 'clazz_num_mysql','username' ='root','password' ='123456'
);-- mysql建表
CREATE TABLE clazz_num_mysql (clazz varchar(255),num BIGINT,PRIMARY KEY (clazz) -- 按照主键进行更新
);-- 将查询结果保存到mysql
insert into clazz_num_mysql
select clazz,count(1) as num
from 
students_text
where clazz is not null
group by clazz;

三、Flink整合Hbase

1、上传jar包

# 1、上传jar包到flink的lib目录
flink-sql-connector-hbase-2.2-1.15.4.jar# 2、重启集群
yarn application -list
yarn application -kill application_1731138432432_0001
yarn-session.sh -d# 3、重新进入sql命令行
sql-client.sh

2、hbase sink

CREATE TABLE students_hbase (id STRING,info ROW<name STRING,age INT,sex STRING,clazz STRING>,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'hbase-2.2','table-name' = 'students_flink','zookeeper.quorum' = 'master:2181,node1:2181,node2:2181'
);
-- hbase创建表
create 'students_flink','info'insert into students_hbase
select 
id,
ROW(name,age,sex,clazz) as info
from 
students_text
where clazz is not null;-- hbase source
select id,info.age from students_hbase;

四、Flink整合Kafka

1、上传jar包

# 上传依赖包到flink的lib目录下
flink-sql-connector-kafka-1.15.4.jar
# 重启flink集群
yarn application -list
yarn application -kill application_1730969357243_0005
yarn-session.sh -d

2、Kafka Source

CREATE TABLE students_text (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students_text','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset', -- 起始消费的位置'format' = 'csv', -- 数据的格式,需要按照顺序定义字段'csv.ignore-parse-errors' ='true'
);

3、Kafka Sink

  • 将仅追加的流写入kafka

CREATE TABLE students_sink (id STRING,name STRING,age INT,sex STRING,clazz STRING
) WITH ('connector' = 'kafka','topic' = 'students_sink','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset', -- 起始消费的位置'format' = 'json' -- 数据的格式,需要按照顺序定义字段
);--  将仅追加的流写入kafka
insert into students_sink
select * from 
students_text
where name is not null;-- 查看结果
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink
  • 将更新的流写入kafka

CREATE TABLE clazz_num (clazz STRING,num BIGINT
) WITH ('connector' = 'kafka','topic' = 'clazz_num','properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset', -- 起始消费的位置'format' = 'canal-json'
);-- 普通的json格式不支持保存更新的流
-- canal-json格式支持保存更新更改的数据insert into clazz_num
select clazz,count(1) as num
from 
students_text
where clazz is not null
group by clazz;kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num


http://www.ppmy.cn/ops/134453.html

相关文章

说说软件工程中的“协程”

在软件工程中&#xff0c;协程&#xff08;coroutine&#xff09;是一种程序运行的方式&#xff0c;可以理解成“协作的线程”或“协作的函数”。以下是对协程的详细解释&#xff1a; 一、协程的基本概念 定义&#xff1a;协程是一组序列化的子过程&#xff0c;用户能像指挥家…

数据结构C语言描述3(图文结合)--双链表、循环链表、约瑟夫环问题

前言 这个专栏将会用纯C实现常用的数据结构和简单的算法&#xff1b;有C基础即可跟着学习&#xff0c;代码均可运行&#xff1b;准备考研的也可跟着写&#xff0c;个人感觉&#xff0c;如果时间充裕&#xff0c;手写一遍比看书、刷题管用很多&#xff0c;这也是本人采用纯C语言…

Qt对话框与界面设计——常见的对话框

目录 QMessageBox - 提供不同类型的消息对话框 QFileDialog - 文件选择对话框 QColorDialog - 颜色选择对话框 QFontDialog - 字体选择对话框 QInputDialog - 输入对话框 QPrintDialog - 打印机选择对话框 QProgressDialog - 进度对话框 QMessageBox - 异常类型提示 QF…

IPv6路由基础

前言 IETF组织针对IPv6网络制定了路由协议OSPFv3 OSPFv3 ff02::5是为OSPFv3路由协议预留的IPv6组播地址 OSPFv3中的路由条目下一跳地址时链路本地地址. 运行OSPFv3的路由器使用物理接口的链路本地的单播地址为源地址来发送OSPF报文.相同链路上的路由器互相学习与之相连的其他…

【3D Slicer】的小白入门使用指南九

定量医学影像临床研究与实践 任务 定量成像教程 定量成像是从医学影像中提取定量测量的过程。 本教程基于两个定量成像的例子构建: - 形态学:缓慢生长肿瘤中的小体积变化 - 功能:鳞状细胞癌中的代谢活动 第1部分:使用变化跟踪模块测量脑膜瘤的小体积变化第2部分:使用PET标…

AI 编程编辑器和工具

以下是几款与 Cursor 类似的 AI 编程编辑器和工具&#xff0c;以及它们的主要特点和差异&#xff1a; 如果你指的是 Cursor 作为一个特定的 AI 编程编辑器&#xff0c;确实我在上一条回答中没有提到它。其实&#xff0c;Cursor 也是一款相对较新的 AI 编程编辑器&#xff0c;它…

vue3+element-plus==> el-form输入响应式失效踩坑!!!!!!!!!!

坑&#xff1a; 这个坑我是真没想到&#xff0c;找了半天原因... 一开始我是这样写的 <el-form :model"addForm" label-width"100px" ref"addForm"><!-- 表单内容 --> </el-form> 输入框根本输入不了东西&#xff0c;或者…

GitHub Copilot使用指南:助力开发者加速编程创新

GitHub Copilot使用指南&#xff1a;助力开发者加速编程创新 简介 1. GitHub Copilot的诞生背景 近年来&#xff0c;AI技术在各行各业迅速发展&#xff0c;尤其是在编程和开发领域&#xff0c;通过自然语言处理和机器学习&#xff0c;AI逐渐能够理解人类的需求和语言。GitHub…