flink使用demo

devtools/2025/2/22 22:42:54/

1、添加不同数据源

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.WordCountData;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 从不同数据源读取数据
//        DataStream<String> text = env.fromElements(WordCountData.WORDS);
//        DataStream<String> text = env.fromCollection(Arrays.asList("hello world", "hello flink"));
//        DataStream<String> text = env.fromSource(crateFileSource(), WatermarkStrategy.noWatermarks(), "file source");
//        DataStream<String> text = env.fromSource(crateKafkaSource(), WatermarkStrategy.noWatermarks(), "kafka source");DataStream<String> text = env.fromSource(createDataGeneratorSource(), WatermarkStrategy.noWatermarks(), "datagen source");// 处理逻辑DataStream<Tuple2<String, Integer>> counts =text.flatMap(new WordCount.Tokenizer()).keyBy(0).sum(1);counts.print();env.execute("WordCount");}public static FileSource crateFileSource() {FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build();return fileSource;}public static KafkaSource<String> crateKafkaSource() {KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("ip-port").setTopics("topic").setGroupId("groupId").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();return kafkaSource;}public static DataGeneratorSource<String> createDataGeneratorSource() {DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) value -> "hello" + value,100, // 一共100条数据RateLimiterStrategy.perSecond(5), // 每秒5条Types.STRING);return dataGeneratorSource;}
}

2、数据处理

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DataProcessDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 读取数据DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));// map算子 : 对数据流中的每个元素执行转换操作,一进一出SingleOutputStreamOperator<String> map = sensorDs.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.getId() + " : " + waterSensor.getVc();}});
//        map.print();// filter算子 : 对数据流中的每个元素执行过滤操作SingleOutputStreamOperator<WaterSensor> filter = sensorDs.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return waterSensor.getVc() > 1; // 只保留vc>1的元素}});
//        filter.print();// flatMap算子 : 扁平映射,一个可以有多个输出,在collector里面,然后将其平铺返回SingleOutputStreamOperator<String> flatMap = sensorDs.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {// collector里面是输出的数据if ("s1".equals(waterSensor.getId())) {collector.collect(waterSensor.getId());} else {collector.collect(waterSensor.getId());collector.collect(waterSensor.getVc().toString());}}});
//        flatMap.print();// keyBy 相同key的数据分到同一个分区,用于海量数据聚合操作来提升效率,不对数据进行转换,只是分区KeyedStream<WaterSensor, String> keyBy = sensorDs.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId(); // 按照id分组}});
//        keyBy.print();// 在keyBy后可以使用聚合算子,求sum max min等
//        keyBy.sum("vc").print(); // 传实体类的属性名
//        keyBy.maxBy("vc").print(); // 传实体类的属性名// reduce算子 : 两两聚合,keyBy后才能操作keyBy.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor t2, WaterSensor t1) throws Exception {System.out.println("t1=" + t1);System.out.println("t2=" + t2);return new WaterSensor(t1.getId(), t1.getTs() + t2.getTs(), t1.getVc() + t2.getVc());}}).print();env.execute("WordCount");}
}

3、分流/合流

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FenliuDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 读取数据DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<WaterSensor> oushu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 0);SingleOutputStreamOperator<WaterSensor> jishu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 1);oushu.print("偶数流");jishu.print("奇数流");// 偶数流和奇数流合并oushu.union(jishu).print("合并流");env.execute("WordCount");}
}

4、输出流 sink

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SinkDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 读取数据DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));FileSink<WaterSensor> fileSink = FileSink.<WaterSensor>forRowFormat(new Path("/Users/chengyong03/Downloads/output/flink"),new SimpleStringEncoder<>("UTF-8")).build();sensorDs.sinkTo(fileSink);env.execute("WordCount");}
}

flinkflink_sql_280">5、flink流表互转,flink sql

package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class SqlDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));sensorDs.print();// 2.流转表Table sensorTable = tableEnv.fromDataStream(sensorDs);// 3.注册临时表tableEnv.createTemporaryView("sensorTable", sensorTable);Table resultTable = tableEnv.sqlQuery("select * from sensorTable where vc > 10");// 4. table转流DataStream<WaterSensor> waterSensorDataStream = tableEnv.toDataStream(resultTable, WaterSensor.class);waterSensorDataStream.print();env.execute();}
}

http://www.ppmy.cn/devtools/161045.html

相关文章

3. MySQL 用户与权限管理,角色管理(详细说明操作配置)

3. MySQL 用户与权限管理&#xff0c;角色管理(详细说明操作配置) 文章目录 3. MySQL 用户与权限管理&#xff0c;角色管理(详细说明操作配置)1.用户管理1.1 指定用户登录MySQL服务器1.2 创建用户1.3 修改用户1.4 删除用户1.5 设置“当前”用户密码1.6 修改其它用户密码 2. MyS…

网络安全-js安全知识点与XSS常用payloads

简介 JavaScript 是一种轻量级的编程语言&#xff0c;定义了HTML的行为。它与Java的关系类似周杰和周杰伦的关系&#xff08;即没有关系&#xff09;。 用法 HTML 中的脚本必须位于 <script> 与 </script> 标签之间。 脚本可被放置在 HTML 页面的 <body>…

HTML之JavaScript Form表单事件

HTML之JavaScript Form表单事件 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title…

Selenium实战案例2:东方财富网股吧评论爬取

上一篇文章&#xff0c;我们使用Selenium完成了网页内文件的自动下载,本文我们将使用Selenium来爬取东方财富网股吧内笔记的评论数据。 网页内容分析 网页内容的分析是web自动化中的关键一步。通过分析网页结构&#xff0c;我们可以确定需要抓取的数据位置以及操作元素的方式。…

unity学习49:寻路网格链接 offMeshLinks, 以及传送门效果

目录 1 网格链接 offMeshLinks 功能入口 1.1 unity 2022之前 1.2 unity 2022之后 2 网格链接 offMeshLinks 功能设置 3 点击 offMeshLinks 功能里的bake 3.1 unity 2022之前 3.2 unity 2022之后 3.3 实测link 3.4 跳跃距离增大&#xff0c;可以实现轻功类的效果 4 …

C++STL容器之list

1.介绍 list是标准模版库&#xff08;STL&#xff09;提供的一个双向链表容器。它允许在常数时间内进行插入或删除操作&#xff0c;但不支持随机访问。&#xff08;即不能通过下边直接访问元素&#xff09;。list是一个序列容器&#xff0c;适合需要频繁插入和删除操作的场景。…

PostgreSQL:更新字段慢

目录标题 PostgreSQL 慢查询优化与 pg_stat_statements 使用1. 启用慢查询日志2. 使用 pg_stat_statements 扩展收集查询统计信息3. 查找执行时间较长的查询4. 分析慢查询的执行计划5. 优化查询6. 检查并发连接和系统资源7. 进一步优化8. 查看某条SQL1. **如何生成 query_id**2…

【大模型】量化、剪枝、蒸馏

大模型的量化、剪枝和蒸馏是三种常用的模型优化技术&#xff0c;旨在减少模型的复杂性&#xff0c;降低计算资源消耗&#xff0c;并加速推理过程。下面是每种技术的详细介绍&#xff1a; 1. 量化&#xff08;Quantization&#xff09; 量化是将浮点数表示的模型参数&#xff…