flink--会话模式与应用模式

news/2024/9/18 15:04:12/ 标签: zookeeper, kafka, flink, hdfs, 大数据

flink-会话模式部署

会话情况:

添加依赖

<properties><flink.version>1.17.2</flink.version>
</properties>
​
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
​<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
​<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>
​<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.13.6</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency>
​
</dependencies>
​
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

java代码

package com.demo.day1;
​
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
​
​
import static org.apache.flink.connector.kafka.sink.KafkaSink.builder;
​
public class Demo1_WordCount {
​public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
​KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setGroupId("g1").setStartingOffsets(OffsetsInitializer.latest()).build();
​DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source");ds.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
​String[] arr = value.split(",");for(String s1:arr){out.collect(Tuple2.of(s1,1));}}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).sum("f1").print();
​env.execute();}
​
}

测试:

查看(数据同步):

申请一个yarn会话:

 

打架包:

上传架包:

运行:

测试:

 

修改并行度为3进行测试:

 打架包并上传:

查看:

生产者向topica(3个分区)发送数据:

向topica发送2000条数据

package com.kafka;
​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
​
import java.util.HashMap;
import java.util.Map;
​
public class producer{public static void main(String[] args) throws Exception{Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
​//设置自定义分区configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner");
​
​KafkaProducer<String,String> producer = new KafkaProducer<>(configs);for (int i=0;i<1000;i++){ProducerRecord producerRecord=new ProducerRecord("topica","kafka");producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){System.out.println("发送成功:"+recordMetadata.partition());System.out.println("发送成功:"+recordMetadata.topic());System.out.println("发送成功:"+recordMetadata.offset());}}});}
​producer.close();}
}

发送分区随机:

查看数据:

向topica发送1000000条数据:

package com.kafka;
​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
​
import java.util.HashMap;
import java.util.Map;
​
public class producer{public static void main(String[] args) throws Exception{Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop11:9092,hadoop12:9092,hadoop13:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
​//设置自定义分区configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartitioner");
​
​KafkaProducer<String,String> producer = new KafkaProducer<>(configs);for (int i=0;i<1000000;i++){ProducerRecord producerRecord=new ProducerRecord("topica",i%3,null,"kafka");producer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){System.out.println("发送成功:"+recordMetadata.partition());System.out.println("发送成功:"+recordMetadata.topic());System.out.println("发送成功:"+recordMetadata.offset());}}});}
​producer.close();}
}

查看数据:

 

flink-应用模式部署

1. 上传flink的lib和plugins到HDFS上
hdfs dfs -mkdir /flink-dist
hdfs dfs -put /opt/installs/flink/lib   /flink-dist
hdfs dfs -put /opt/installs/flink/plugins/ /flink-dist
2. 上传自己的jar包到HDFS
hdfs dfs -mkdir /my-flinkjars
hdfs dfs -put /opt/flinkjob/flink-test-1.0-SNAPSHOT.jar /my-flinkjars
3. 提交作业
flink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day1.Demo1_WordCount  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

提交作业后查看yarn:

测试:

写数据:

查看数据(不在一个分区,具有随机性):

 

 

 


http://www.ppmy.cn/news/1517041.html

相关文章

CSS属性

一、CSS列表样式 1、list-style-type属性&#xff08;列表项标记&#xff09; CSS列表属性允许我们设置不同的列表项标记。 在HTML中&#xff0c;有​两种类型​的列表&#xff1a; ​无序列表​&#xff08;<ul>&#xff09; - 列表项目用​项目符号​标记​有序列表…

【Linux】自动化构建工具makefile

目录 背景 makefile简单编写 .PHONY makefile中常用选项 makefile的自动推导 背景 会不会写makefile&#xff0c;从一个侧面说明了一个人是否具备完成大型工程的能力 ​ ◉ 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;mak…

开放式耳机怎么戴?佩戴舒适在线的几款开放式耳机分享

开放式耳机的佩戴方式与传统的入耳式耳机有所不同&#xff0c;它采用了一种挂耳式的设计&#xff0c;提供了一种新颖的佩戴体验&#xff0c;以下是开放式耳机的佩戴方式。 1. 开箱及外观&#xff1a;首先&#xff0c;从包装盒中取出耳机及其配件&#xff0c;包括耳机本体、充电…

使用 FinalShell 链接 Centos

1. 安装 FinalShell 下载地址&#xff1a;https://www.hostbuf.com/t/988.html 2. 查看 IP地址。 2.1 通过命令查询IP 输入 ip addr show 查询&#xff0c;输出效果如下截图&#xff0c;其中的 192.168.1.5 就是 IP 地址。 2.2 通过可视化界面查询IP 点击右上角的网络图标…

LoadBalancer负载均衡

一、概述 1.1、Ribbon目前也进入维护模式 Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡的工具。 简单的说&#xff0c;Ribbon是Netflix发布的开源项目&#xff0c;主要功能是提供客户端的软件负载均衡算法和服务调用。Ribbon客户端组件提供一系列完善的…

企业中需要哪些告警Rules

文章目录 企业中需要哪些告警Rules前言定义告警规则企业中的告警rulesNode.rulesprometheus.ruleswebsite.rulespod.rulesvolume.rulesprocess.rules 总结 企业中需要哪些告警Rules 前言 Prometheus中的告警规则允许你基于PromQL表达式定义告警触发条件&#xff0c;Prometheus…

poi word 添加水印

poi word 添加水印 依赖DocxUtil调用遇到的问题部分客户给的word无法添加水印水印文案 过长会导致字变小变形 超过一定长度就会显示异常。消失等情况 依赖 <!--poi-tl--><dependency><groupId>com.deepoove</groupId><artifactId>poi-tl</art…

捕获神经网络的精髓:深入探索PyTorch的torch.jit.trace方法

标题&#xff1a;捕获神经网络的精髓&#xff1a;深入探索PyTorch的torch.jit.trace方法 在深度学习领域&#xff0c;模型的部署和优化是至关重要的环节。PyTorch作为最受欢迎的深度学习框架之一&#xff0c;提供了多种工具来帮助开发者优化和部署模型。torch.jit.trace是PyTo…

设计模式 10 外观模式

设计模式 10 创建型模式&#xff08;5&#xff09;&#xff1a;工厂方法模式、抽象工厂模式、单例模式、建造者模式、原型模式结构型模式&#xff08;7&#xff09;&#xff1a;适配器模式、桥接模式、组合模式、装饰者模式、外观模式、享元模式、代理模式行为型模式&#xff…

ansible的tags标签

1、tags模块 可以给任务定义标签&#xff0c;可以根据标签来运行指定的任务 2、标签的类型 always&#xff1a;设定了标签名为always&#xff0c;除非指定跳过这个标签&#xff0c;否则该任务将始终会运行&#xff0c;即使指定了标签还会运行never&#xff1a;始终不运行的任…

CPU、MPU、MCU、SOC分别是什么?

CPU、MPU、MCU和SoC都是与微电子和计算机科学相关的术语&#xff0c;它们在功能定位、应用场景以及处理能力等方面有所区别。具体如下&#xff1a; CPU&#xff1a;CPU是中央处理单元的缩写&#xff0c;它通常指计算机内部负责执行程序指令的芯片。CPU是所有类型计算机&#x…

java 读取mysql中的表并按照指定格式导出excel

在Java中读取MySQL中的数据表并将其导出到Excel文件中&#xff0c;你需要以下几个步骤&#xff1a; 连接MySQL数据库&#xff1a;使用JDBC驱动程序连接到MySQL数据库。执行SQL查询&#xff1a;获取表数据。使用Apache POI库生成Excel文件&#xff1a;将数据写入Excel格式。保存…

SpringBoot文档之构建包的阅读笔记

Packaging Spring Boot Applications Efficient Deployments Efficient Deployments 默认情况下&#xff0c;基于SpringBoot框架开发应用时&#xff0c;构建插件spring-boot-maven-plugin将项目打包为fat jar。 执行如下命令&#xff0c;解压构建得到的jar文件。 java -Djarmo…

Python 程序设计基础教程

Python 程序设计基础教程 撰稿人&#xff1a;南星六月雪 第 一 章 变量与简单数据类型 1.1 变量 先来观察以下程序&#xff1a; world "Hello Python!" print(world)world "Hello Python,I love you!" print(world)运行这个程序&#xff0c;将看到两…

0827作业+梳理(c++day01)

一、作业&#xff1a; 1、代码 #include <iostream> using namespace std; int main() {string str;cout<<"请输入一个字符串"<<endl;getline(cin,str);cout<<"str "<<str<<endl;//初始化各类字符个数int size_num …

如何保证Redis与数据库之间的一致性

在现代应用程序架构中&#xff0c;Redis等内存数据库因其高性能和低延迟特性而被广泛用于缓存、会话管理、消息队列等多种场景。然而&#xff0c;当Redis作为数据库&#xff08;如MySQL、PostgreSQL&#xff09;的缓存层时&#xff0c;确保数据在Redis和数据库之间的一致性变得…

jmeter中CSV 数据文件设置用例

1、CSV数据文件的基础使用 线程组->添加->配置远近->CSV数据文件设置 2、多条用例运行CSV数据文件 由于我的csv请求的json数据有“&#xff0c;”所以我这边 分隔符选择了*号 写了两行需要测试的用例&#xff0c;需要添加一个“循环控制器” 线程组->添加-&g…

splunk Enterprise 的HTTP收集器-windows

1.创建HTTP收集器 2.使用HTTP收集器 然后打开全局设置&#xff1a;把ssl给去掉&#xff0c;点保存&#xff08;保存之后&#xff0c;可以看到这些状态全部都是已启用了&#xff09;&#xff1a; 3.测试&#xff1a; curl --location --request POST http://192.168.11.131:808…

List<String> 和 ArrayList<String>的区别

List<String> list new ArrayList<>() 这种形式实际上是一种向上转型&#xff08;upcasting&#xff09;的体现&#xff0c;ArrayList 实现了 List 接口&#xff0c;可以看成是从 List 继承而来&#xff0c;一个子类的对象可以指向它父类。 为什么不是 ArrayList…

国标GB28181视频监控EasyCVR视频汇聚平台国标注册被陌生IP入侵如何处理?

GB28181国标/GA/T1400协议/安防综合管理系统EasyCVR视频汇聚平台能在复杂的网络环境中&#xff0c;将前端设备统一集中接入与汇聚管理。智慧安防/视频存储/视频监控/视频汇聚EasyCVR平台可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级…