Flink 入门案例介绍

server/2024/9/23 0:54:59/

一、工程搭建

  • 在 IDEA 中创建一个 Maven 工程:FlinkTutorial

  • 在 pom 文件中引入依赖:

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!-- 2.12 是scala版本 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
    </dependencies>
    

二、批处理 WordCount 案例

java">package com.app.wc// 批处理 WordCount
public class WordCount {public static void main(String[] args) throws Exception {// 1.创建 flink 执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2.读取文件数据// DataSource 是 Operator 的子类,Operator 是 DataSet 的子类// Flink 的批处理是基于 DataSet 类型的 API 来处理DataSource<String> inputData = env.readTextFile("datas/word.txt");// 3.执行数据处理(按空格分词并转换成 (word, 1) 这样的二元组格式),分组聚合DataSet<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap())  //需要传入FlatMapFunction接口的实现类.groupBy(0)  //可以传入KeySelector实现类或位置索引或字段名.sum(1);  // 传入进行聚合计算的位置索引// 4.输出result.print();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

三、有界流处理 WordCount 案例

java">package com.app.wc// 流处理WordCount
public class StreamWordCount {public static void main(String[] args) throws Exception {// 1.创建flink流处理执行环境对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(8); // 设置并发度// 2.读取文件StreamDataSource<String> inputData = env.readTextFile("datas/word.txt");// 3.处理数据(分词,转换结构),并分组聚合DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);// 4.输出result.print();// 5.执行任务(流处理是事件触发的)env.execute();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

四、无界流处理 WordCount 案例

方便生产环境部署

java">package com.app.wcpublic class StreamWordCount2 {public static void main(String[] args) throws Exception {// 1.创建flink流处理执行环境对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(8); // 设置并发度// 2.监听 7777 端口服务(nc -lk 7777)// 2.1 使用 ParameterTool 类从启动参数中获取配置项ParameterTool tool = ParameterTool.formArgs(args);String hostname = tool.get("hostname");int port = tool.getInt("port");// 2.2 获取数据流DataStream<String> inputData = env.socketTextFile(hostname, port);// 3.处理数据(分词,转换结构),并分组聚合DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);// 4.输出result.print();// 5.执行任务(流处理是事件触发的)env.execute();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

http://www.ppmy.cn/server/47947.html

相关文章

单列集合--List

方法演示&#xff1a; package exercise;import java.util.ArrayList; import java.util.List;public class ListDemo1 {public static void main(String[] args) {List<String> list new ArrayList<>();list.add("hello");list.add("world"…

门面模式Api网关(SpringCloudGateway)

1. 前言 当前通过Eureka、Nacos解决了服务注册和服务发现问题&#xff0c;使用Spring Cloud LoadBalance解决了负载均衡的需求&#xff0c;同时借助OpenFeign实现了远程调用。然而&#xff0c;现有的微服务接口都直接对外暴露&#xff0c;容易被外部访问。为保障对外服务的安全…

电脑丢失api-ms-win-crt-runtime-l1-1-0.dll的多种修复方法

在计算机使用过程中&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“api-ms-win-crt-runtime-l1-1-0.dll丢失”。这个错误通常发生在Windows操作系统中&#xff0c;它表示一个动态链接库文件丢失或损坏。这个问题可能会导致某些应用程序无法正常运行&#xf…

Spring Security 注册过滤器关键点与最佳实践

在 Spring Security 框架中&#xff0c;注册过滤器是实现身份验证和授权的关键组件。正确配置和使用注册过滤器对于确保应用程序的安全性至关重要。以下是一些关于 Spring Security 注册过滤器的注意事项和最佳实践。 过滤器链顺序&#xff1a; 注册过滤器通常位于过滤器链的末…

C++模板类与Java泛型类的实战应用及对比分析

C模板类和Java泛型类都是用于实现代码重用和类型安全性的重要工具&#xff0c;但它们在实现方式和应用上有一些明显的区别。下面&#xff0c;我将先分别介绍它们的实战应用&#xff0c;然后进行对比分析。 C模板类的实战应用 C模板类允许你定义一种通用的类&#xff0c;其中类…

Llama改进之——分组查询注意力

引言 今天介绍LLAMA2模型引入的关于注意力的改进——分组查询注意力(Grouped-query attention,GQA)1。 Transformer中的多头注意力在解码阶段来说是一个性能瓶颈。多查询注意力2通过共享单个key和value头&#xff0c;同时不减少query头来提升性能。多查询注意力可能导致质量下…

iOS Hittest 机制和实际应用之一 hittest方法

Hittest 机制原理 hitTest的原理就是&#xff0c;当我们点击的时候&#xff0c;会触发 window的 hittest方法&#xff0c;在该方法中会首先使用point inside方法判断 点击的地方是否在window范围内&#xff0c;如果在的话&#xff0c;就倒序遍历姿子视图&#xff0c;然后将poi…

动手学深度学习(Pytorch版)代码实践 -深度学习基础-03线性回归简洁版

03线性回归简洁版 主要内容 生成数据集&#xff1a;使用给定的权重和偏置&#xff0c;以及一些噪声&#xff0c;生成模拟数据。读取数据集&#xff1a;将数据打乱&#xff0c;并按批次读取数据。初始化模型参数&#xff1a;随机初始化模型的权重和偏置&#xff0c;并启用自动…