flink sink kafka

ops/2024/12/28 19:18:04/

接上文:一文说清flink从编码到部署上线
之前写了kafka source,现在补充kafka sink。完善kafka相关操作。

环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401;kafka_2.12-2.5.0。

kafka__topic_5">1. kafka 创建 topic

topic:rv-test-sink
在这里插入图片描述

2.添加依赖

<!--flink cdc kafka 相关依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency>

3.创建运行环境

package com.zl.utils;import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}

4.核心代码

package com.zl.kafka;import com.alibaba.fastjson.JSONObject;
import com.zl.utils.EnvUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;public class KafkaExampleSink {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExampleSink");/// ===== 构造kafka sink =====// 相关参数配置可以参考下面这两个文档:①https://cloud.tencent.com/developer/article/2089393// ②https://www.bilibili.com/opus/819228616166473783// kafka配置Properties prop = new Properties();prop.setProperty("bootstrap.servers", "10.86.97.21:9092,10.86.97.21:9093,10.86.97.21:9094");// 当设置为“true”时,生产者将确保流中只写入每条消息的一个副本。prop.setProperty("enable.idempotence", "true");// 指定了生产者在接收到服务器相应之前可以发送多个消息,值越高,占用的内存越大,// 当然也可以提升吞吐量,发生错误时,可能会造成数据的发送顺序改变,其默认值是5.prop.setProperty("max.in.flight.requests.per.connection", "5");prop.setProperty("acks", "all");// 在kafka中消息发送失败时,指定生产者可以重发消息的次数,默认情况下,// 生产者在每次重试之间默认等待100ms,可以通过参数retey.backoff.ms参数来改变这个时间间隔。retries的缺省值:0.prop.setProperty("retries", "5");// 事务超时时间prop.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");String topic = "rv-test-sink";FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(topic,// topicnew KafkaSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {return new ProducerRecord<>(topic, s.getBytes(StandardCharsets.UTF_8));}},prop,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);/// ===== 构造模拟数据 =====JSONObject rvJsonObject = new JSONObject();rvJsonObject.put("dt","2024-12-20");// 日期取当天rvJsonObject.put("uuid","data-stream-1");rvJsonObject.put("report_time",1733881971621L);String mockJson = JSONObject.toJSONString(rvJsonObject);/// ===== sink kafka =====env.fromElements(mockJson).addSink(flinkKafkaProducer).setParallelism(3).name("kafka-sink").uid("kafka-sink");env.execute("kafka-sink-job");}// main}

5.运行

由于不是持续输入流,运行完会结束。
在这里插入图片描述
sinkkafka的数据如下:
在这里插入图片描述

6.完整代码

完整代码见:完整代码


http://www.ppmy.cn/ops/144613.html

相关文章

计算材料学和分子动力学(MD)

文章目录 1. 计算材料学1. 什么是计算材料学2. 计算材料学尺度1. 纳观尺度2. 微观尺度3. 介观尺度4. 宏观尺度 2.分子动力学1.什么是分子动力学1. 历史上第一个分子动力学模拟2.第一个连续势场的分子动力学模拟3.第一个Lennard-Jones势分子动力学模拟 2.分子动力学的并行3.常用…

.net core sdk 项目多版本切换

使用global.json文件指定项目要使用的sdk版本&#xff1a; 在项目根目录下执行cmd命令&#xff08;sdk的版本默认为当前使用的最新的sdk的版本&#xff09; 默认sdk&#xff1a;dotnet new globaljson指定sdk&#xff1a;dotnet new globaljson --sdk-version <version>…

【Git】-- 版本说明

Alpha&#xff1a;是内部测试版,一般不向外部发布,会有很多 Bug .一般只有测试人员使用。Beta&#xff1a;也是测试版&#xff0c;这个阶段的版本会一直加入新的功能。在 Alpha 版之后推出。RC&#xff1a;(Release Candidate) 顾名思义么 ! 用在软件上就是候选版本。系统平台…

QT信号槽

目录 概念 函数原型 实现 3.1 自带信号→自带槽 3.2 自带信号→自定义槽 3.3 自定义信号 信号槽传参 对应关系 5.1 一对多 5.2 多对一 信号槽的优势 信号槽的注意事项 概念 信号和槽是Qt框架在C语言基础上扩展的一种机制&#xff0c;用于对象之间的通信。这一机制类…

SSM 架构下 Vue 电脑测评系统:为电脑性能评估赋能

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常适…

如何在电脑上控制手机?

在现代生活中&#xff0c;通过电脑控制手机已经成为一种高效的工作和娱乐方式。Total Control 是一款实用的电脑端软件&#xff0c;通过USB或Wi-Fi连接&#xff0c;用户可以在电脑上直接操作多台手机,通过电脑键盘输入文字&#xff0c;提高操作效率。特别适合需要大屏操作的用户…

MySQL篇之对MySQL进行参数优化,提高MySQL性能

1. MySQL参数优化说明 MySQL 参数调优是提高数据库性能的重要手段之一。通过调整 MySQL 的配置参数&#xff0c;可以优化查询速度、提升并发处理能力、减少资源消耗等。 MySQL 的性能优化涉及到多个方面&#xff0c;包括内存管理、磁盘 I/O、查询优化、连接管理、复制配置等。…

Apache Tomcat 漏洞CVE-2024-50379条件竞争文件上传漏洞 servlet readonly spring boot 修复方式

1&#xff0c;关于漏洞 Apache Tomcat是一个流行的开源 Web 服务器和 Java Servlet 容器。 二、 漏洞描述 Apache Tomcat中修复了个 TOCTOU 竞争条件远程代码执行漏洞 (CVE-2024-50379)&#xff0c;该漏洞的 CVSS 评分为 9.8。Apache Tomcat 中 JSP 编译期间存在检查时间使用时…