Flink快速上手

devtools/2024/9/20 1:26:29/ 标签: flink, c#, 大数据

Flink快速上手

  • 批处理
    • Maven配置pom文件
    • java编写wordcount代码
  • 有界流处理
  • 无界流处理

批处理

Maven配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>com.atguigu</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies></project>

java编写wordcount代码

基于DataSet API(过时的,不推荐)
之后用 DataStream API

package com.atguigu.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountBatchDemo {public static void main(String[] args) throws Exception {//1.创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.读取数据,从文件中读取DataSource<String> lineDS = env.readTextFile("input/word.txt");//3.切分、转换(word,1)FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//Todo3.1 按照空格 切分单词String[] words = value.split(" ");//Todo3.2 将单词转换为(word,1)for (String word : words) {Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);//Todo3.3 调用采集器collector 向下游发送数据out.collect(wordTuple2);}}});//4.按照word分组UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);//5.各分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroupBy.sum(1);//6.输出sum.print();}
}

在这里插入图片描述

有界流处理

package com.atguigu.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountStreamDemo {public static void main(String[] args) throws Exception {//TODO 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//TODO 2. 读取数据DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");//TODO 3. 处理数据:切分/转换/分组/聚合//TODO 3.1 处理数据:切分/转换SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//切分String[] words = value.split(" ");for (String word : words) {//转换为二元组(word,1)Tuple2<String, Integer> wordAndOne = Tuple2.of(word, 1);//通过采集器 向下游发送数据out.collect(wordAndOne);}}});//TODO 3.2 处理数据:分组KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//TODO 3.3 处理数据:聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);//TODO 4. 输出数据sumDS.print();//TODO 5. 执行:sparkstreaming 最后 ssc.start()env.execute();}
}

在这里插入图片描述

无界流处理

事件驱动

package com.atguigu.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountSocketStream {public static void main(String[] args) throws Exception {//TODO 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {//切分String[] words = value.split(" ");for (String word : words) {//转换为二元组(word,1)//通过采集器 向下游发送数据out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}
}

事件触发
来一个处理一个
在这里插入图片描述

在这里插入图片描述


http://www.ppmy.cn/devtools/111485.html

相关文章

数据结构(6)哈希表和算法

一、哈希表 哈希表的基本概念 哈希函数&#xff1a; 哈希函数是将输入&#xff08;键&#xff09;转换为固定大小的输出&#xff08;哈希值&#xff09;的函数。这个输出通常是一个整数&#xff0c;表示在哈希表中的索引位置。理想的哈希函数应该能够均匀分布输入&#xff0c;…

华普微特殊“射频芯片”-隔离芯片品质卓越

HOPERF隔离器技术含量“超标”&#xff01; 作为信号链芯片中的重要一环&#xff0c;隔离芯片的主要作用就是在电力配电系统、工业自动化、医疗设备、可再生能源系统以及通信与数据传输等存在弱电控制强电的场景中实现电气隔离、信号传输与安全防护。 在弱电控制强电的过程中…

SprinBoot+Vue民宿预约微信小程序的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质…

springboot 的共享session方案?

问&#xff1a;springboot 的共享session方案&#xff1f; 参考&#xff1a; https://juejin.cn/post/7195227930077691963分布式之session共享问题 4种解决方案及spring session的使用_分布式session共享方案-CSDN博客 什么是 Session &#xff1f; 答&#xff1a;因为Http协…

基于开源WQ装备数据的知识图谱全流程构建

随着大数据和人工智能技术的快速发展&#xff0c;构建领域特定的知识图谱已成为信息管理和决策支持的重要手段。武器装备知识图谱不仅能够对复杂的武器系统进行结构化展示&#xff0c;还可以通过关系推理揭示武器与装备之间的潜在联系。 1、技术路线 本文将详细介绍如何基于开…

数据库系列之GaussDB数据库中逻辑对象关系简析

初次接触openGauss或GaussDB数据库的逻辑对象&#xff0c;被其中的表空间、数据库、schema和用户之间的关系&#xff0c;以及授权管理困惑住了&#xff0c;与熟悉的MySQL数据库的逻辑对象又有明显的不同。本文旨在简要梳理下GaussDB数据库逻辑对象之间的关系&#xff0c;以加深…

python 函数 封装

封装 函数的参数是&#xff1a;变量 def 函数(参数):print(参数)if __name__ __main__:函数(参数)函数(参数2)函数的参数是&#xff1a; 字典 import requests# 定义一个字典 data {} 地址 "https://webdriveruniversity.com/" 请求方法 getdata["url"…

测试-Gatling 与性能测试

Gatling 与性能测试详解 一、什么是性能测试&#xff1f; 性能测试是一种软件测试类型&#xff0c;旨在评估系统在负载下的响应时间、吞吐量和资源利用率等性能指标。通过性能测试&#xff0c;开发者和运维团队能够识别出系统的瓶颈、优化系统性能&#xff0c;并确保其在实际…

JSON报文根据正则过滤消息

有时候业务系统在接收外部传过来的JSON报文&#xff0c;可能需要根据某个标识来判断是否是自己系统的消息&#xff0c;不是需要过滤。正常我们可能是先将JSON反序列化为具体实体类(例: A a JSON.parseObject(body,A.class))&#xff0c;然后获取具体字段来判断。此方法面对接收…

从0开始学习RocketMQ:领域模型

队列是先进先出&#xff08;FIFO&#xff09;的线性表&#xff0c;通常用链表或者数组来实现。队列只允许在后端&#xff08;称为 rear&#xff09;进行插入操作&#xff0c;在前端&#xff08;称为 front&#xff09;进行删除操作。 主流的消息中间件的传输模型主要为队列模型…

ARCGIS PRO DSK MapTool

MapTool用于自定义地图操作工具&#xff0c;使用户能够在ArcGIS Pro中执行特定的地图交互操作。添加 打开MapTool1.vb文件&#xff0c;可以看到系统已经放出MapTool1类&#xff1a; Public Sub New()将 IsSketchTool 设置为 true 以使此属性生效IsSketchTool TrueSketchTyp…

MySQL中的约束

约束概述 1.1 为什么需要约束 数据完整性&#xff08;Data Integrity&#xff09;是指数据的精确性&#xff08;Accuracy&#xff09;和可靠性&#xff08;Reliability&#xff09;。它是防止数据库中存在不符合语义规定的数据和防止因错误信息的输入输出造成无效操作或错误信…

Python自带日志库实现springboot彩色效果

整体目标 涉及的库均为Python3自带库实现 loggingsysenum 终端显示彩色基本原理参考&#x1f449;Terminal里的颜色的那些事 Python打印日志可以直接借用logging自带的库实现&#xff0c;但是默认的打印实在太丑了&#xff0c;长下面这样 这只是一条日志看着还好比较清爽&…

elementUI中el-form 嵌套el-from 如何进行表单校验?

在el-form中嵌套另一个el-form进行表单校验和添加规则&#xff0c;首先&#xff0c;需要确保每个嵌套的el-form都有自己的model、rules和ref。 以下是一个简化的示例&#xff1a; <template><el-form :model"parentForm" :rules"parentRules" r…

Tomcat目录及测试

Tomcat目录及测试 C:\Program4java\apache-tomcat-10.1.7 这个目录下直接包含Tomcat的bin目录&#xff0c;conf目录等&#xff0c;我们称之为Tomcat的安装目录或根目录。 bin&#xff1a;该目录下存放的是二进制可执行文件&#xff0c;如果是安装版&#xff0c;那么这个目录下会…

Zabbix企业级应用案列

随着业务的越发复杂&#xff0c;对软件系统的要求越来越高&#xff0c;这意味着我们需要随时掌控系统的运行情况。因此&#xff0c;对系统的实时监控以及可视化展示&#xff0c;就成了基础架构的必须能力。 一、zabbix可视化 1.Grafana 简介 Grafana 是一个开源的指标量监测和…

Java XML

1、XML文件介绍 配置文件&#xff1a;用来保存设置的一些东西。 拿IDEA来举例&#xff0c;比如设置的背景图片&#xff0c;字体信息&#xff0c;字号信息和主题信息等等。 &#xff08;1&#xff09;以前是用txt保存的&#xff0c;没有任何优点&#xff0c;而且不利于阅读&a…

burp suite professional 产品介绍

Burp Suite Professional 是一款强大的网络安全工具&#xff0c;专为渗透测试和 Web 应用程序安全测试而设计。它由 PortSwigger 开发&#xff0c;并广泛应用于安全专家、开发人员和网络安全从业者。Burp Suite 提供了多种功能来帮助发现和修复 Web 应用程序中的漏洞&#xff0…

【网络安全 | 甲方建设】SaaS平台、Jira工具及Jenkins服务器

原创文章,不得转载。 文章目录 SaaS平台友好性Jira友好性Jenkins友好性SaaS平台 SaaS,全称为 “Software as a Service”(软件即服务),是一种基于云计算的软件交付模型。在这种模型中,软件不需要用户在本地安装和维护,而是通过互联网访问和使用。软件通常由服务提供商托…

通义灵码获得国产 AI 编码工具最高成绩丨阿里云云原生 8 月产品月报

云原生月度动态 云原生是企业数字创新的最短路径。 《阿里云云原生每月动态》&#xff0c;从趋势热点、产品新功能、服务客户、开源与开发者动态等方面&#xff0c;为企业提供数字化的路径与指南。 趋势热点 &#x1f947; Gartner 首次发布 AI 代码助手魔力象限&#xff0…