Kafka与Flink的整合 -- sink、source

news/2024/12/22 0:50:06/
1、首先导入依赖:
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {public static void main(String[] args) throws Exception{//构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//构建kafka source 环境KafkaSource<String> source = KafkaSource.<String>builder()//指定broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定topic.setTopics("bigdata")//指定消费组.setGroupId("my-group")//指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据.setStartingOffsets(OffsetsInitializer.earliest())//读取数据格式:.setValueOnlyDeserializer(new SimpleStringSchema()).build();//使用kafka数据源DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSourceDS.print();//启动flinkenv.execute();}
}
        启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {public static void main(String[] args) throws Exception{//构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据文件:DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder()//指定flink broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定数据的格式:.setRecordSerializer(KafkaRecordSerializationSchema.builder()//指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic.setTopic("student")//指定数据的格式.setValueSerializationSchema(new SimpleStringSchema()).build())//指定数据处理的语义:.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//执行flinkstudentDS.sinkTo(sink);//构建flink环境env.execute();}
}
        启动消费kafka:
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student


http://www.ppmy.cn/news/1204831.html

相关文章

verilog——移位寄存器

在Verilog中&#xff0c;你可以使用移位寄存器来实现数据的移位操作。移位寄存器是一种常用的数字电路&#xff0c;用于将数据向左或向右移动一个或多个位置。这在数字信号处理、通信系统和其他应用中非常有用。以下是一个使用Verilog实现的简单移位寄存器的示例&#xff1a; m…

ESP32网络开发实例-Web服务器RGB LED调光

Web服务器RGB LED调光 文章目录 Web服务器RGB LED调光1、RGB LED介绍3、软件准备4、硬件准备4、代码实现在本文中,我们将创建一个 RGB LED 控制器网络服务器。 Web 服务器将显示用于设置 RGB LED 颜色的色谱。 颜色将主要分为三种:红色、绿色和蓝色。 用户将从光谱中选择一种…

PDF Expert for mac(苹果电脑专业pdf编辑器)兼容12系统

PDF Expert是macOS平台上的一款优秀的PDF阅读和编辑工具&#xff0c;由Readdle公司开发。它不仅拥有方便、易用的界面&#xff0c;还具备诸多功能&#xff0c;比如编辑PDF文件、添加批注、填写表格、签署文件、合并文档等。安装:PDF Expert for Mac(PDF编辑阅读转换器)v3.5.2中…

为什么HTTP用得很好的,开始普及HTTPS呢?

显而易见&#xff0c;现在的HTTP早已不安全&#xff0c;当我们在浏览各个网站时会发现HTTP前面都会显示不安全&#xff0c;因为HTTP是明文传输&#xff0c;一旦电脑被植入了木马&#xff0c;木马程序就会主动周期性发消息给Internet的控制终端&#xff0c;这样NAT小洞会一直敞开…

星岛专栏|从Web3发展看金融与科技的融合之道

11月起&#xff0c;欧科云链与香港主流媒体星岛集团开设Web3.0安全技术专栏&#xff0c;该专栏主要面向香港从业者、交易机构、监管机构输出专业性的安全合规建议&#xff0c;旨在促进香港Web3.0行业向安全与合规发展。 出品&#xff5c;欧科云链研究院 自2016年首届香港金融…

京东数据分析(京东销量):2023年9月京东投影机行业品牌销售排行榜

鲸参谋监测的京东平台9月份投影机市场销售数据已出炉&#xff01; 根据鲸参谋电商数据分析平台的相关数据数据显示&#xff0c;9月份&#xff0c;京东平台投影机的销量为13万&#xff0c;环比下滑约17%&#xff0c;同比下滑约25%&#xff1b;销售额将近2.6亿&#xff0c;环比下…

8.spark自适应查询-AQE之自适应调整Shuffle分区数量

目录 概述主要功能自适应调整Shuffle分区数量原理默认环境配置修改配置 结束 概述 自适应查询执行&#xff08;AQE&#xff09;是 Spark SQL中的一种优化技术&#xff0c;它利用运行时统计信息来选择最高效的查询执行计划&#xff0c;自Apache Spark 3.2.0以来默认启用该计划。…

MySQL进阶_2.存储引擎

文章目录 第一节、存储引擎简介第二节、设置表的存储引擎2.1、创建表时指定存储引擎2.2、修改表的存储引擎 第三节、存储引擎介绍3.1、InnoDB3.2、MyISAM3.3、InnoDB VS MyISAM 第一节、存储引擎简介 简而言之&#xff0c;存储引擎就是指表的类型。其实存储引擎以前叫做表处理…