Flink keyBy算子的分区规则

ops/2024/12/14 6:37:58/

demo代码

String worlds = "flink,spark,hadoop,zk,kafka";streamSource.flatMap(new RichFlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> collector) throws Exception {String[] worlds = value.split(",");for (String world : worlds) {collector.collect(world);}}}).keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String key) throws Exception {return key;}}).process(new KeyedProcessFunction<String, String, String>() {@Overridepublic void processElement(String key,KeyedProcessFunction<String, String, String>.Context ctx,Collector<String> out) throws Exception {out.collect(key);}}).print("->");env.execute();

运行结果:
在这里插入图片描述

源码查看
我们进去keyBy算子,看看计算逻辑是怎样的:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
根据调用逻辑,我们抽象出分区数据的计算逻辑:

(MathUtils.murmurHash(key.hashCode()) % maxParallelism) * parallelism / maxParallelism ;

其中:

  • maxParallelism:默认128
  • parallelism:并行度数
  • key:分区键

验证
我们把抽出来的逻辑加到计算结果中查看

process(new KeyedProcessFunction<String, String, String>() {@Overridepublic void processElement(String key,KeyedProcessFunction<String, String, String>.Context ctx,Collector<String> out) throws Exception {String currentKey = ctx.getCurrentKey();int i = (MathUtils.murmurHash(key.hashCode()) % 128) * parallelism / 128 ;out.collect(key + "_" +i);out.collect(key);}

处理结果:
在这里插入图片描述


http://www.ppmy.cn/ops/141744.html

相关文章

Compose TimePicker

预览 Composable import androidx.compose.foundation.background import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.Row import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.p…

深入源码层面:在 Spring Boot 和 Spring MVC 项目中实现全面请求记录与异常处理的拦截器与监听器分析

在现代 Web 应用开发中&#xff0c;了解请求处理和异常管理的底层实现是提升应用稳定性与可维护性的关键。本文将从源码层面深入分析 Spring Boot 和 Spring MVC 中的拦截器&#xff08;Interceptor&#xff09;与监听器&#xff08;Listener&#xff09;&#xff0c;探讨如何通…

Java 环境配置 + IntelliJ IDEA 使用指南

文章目录 一、Java 程序的运行必须经过3 个步骤&#xff1a;编写、编译、运行&#xff08;1&#xff09;Java 和 JavaScript 的区别&#xff08;2&#xff09;JDK、JRE、JVM 的关系&#xff08;3&#xff09;是否需要 Maven&#xff1f; 二、软件下载2.1、JDK下载与安装 —— 是…

ASP.NET|日常开发中连接Sqlite数据库详解

ASP.NET&#xff5c;日常开发中连接Sqlite数据库详解 前言一、安装和引用相关库1.1 安装 SQLite 驱动1.2 引用命名空间 二、配置连接字符串2.1 连接字符串的基本格式 三、建立数据库连接3.1 创建连接对象并打开连接 四、执行数据库操作4.1 创建表&#xff08;以简单的用户表为例…

用线程池,注意避坑

前言 线程池是 Java 中处理多线程的强大工具&#xff0c;但它不仅仅是“直接用就完事”的工具。 很多小伙伴在用线程池时&#xff0c;因为配置不当或忽略细节&#xff0c;踩过许多坑。 今天跟大家一起聊聊线程池中容易踩的 10 个坑&#xff0c;以及如何避免这些坑&#xff0…

【C++初阶】第8课—标准模板库STL(string_2)

文章目录 1. string类对象遍历操作1.1 标准库中的成员函数begin( )和end( )1.2 标准库中的成员函数rbegin( )和rend( )1.3 C11引入的4个标准库中的成员函数 2. string类对象的访问2.1 operator[ ]运算符重载访问字符串字符2.2 公有成员函数at访问字符2.3 公有成员函数back()和f…

基于 Python 的机器学习模型部署到 Flask Web 应用:从训练到部署的完整指南

目录 引言 技术栈 步骤一&#xff1a;数据预处理 步骤二&#xff1a;训练机器学习模型 步骤三&#xff1a;创建 Flask Web 应用 步骤四&#xff1a;测试 Web 应用 步骤五&#xff1a;模型的保存与加载 保存模型 加载模型并在 Flask 中使用 步骤六&#xff1a;Web 应用…

UE5 C+、C++、C# 构造方法区别示例

我们对比一下UE C、C 、C#的构造方法&#xff1a; 1. UE4 C例子&#xff1a; // 声明和构造合并在一起static ConstructorHelpers::FObjectFinder<UTexture2D> CrosshairTexObj(TEXT("/Game/Path"));// 使用加载的资源UTexture2D* Texture CrosshairTexObj.…