9、Flink 用户自定义 Functions 及 累加器详解

devtools/2024/10/19 12:37:30/
1)用户自定义函数
1.实现接口

最基本的方法是实现提供的接口。

# 根据提供的接口创建自定义函数
class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}# 调用创建的自定义函数
data.map(new MyMapFunction());
2.匿名类

可以将 function 当做匿名类传递。

data.map(new MapFunction<String, Integer> () {public Integer map(String value) { return Integer.parseInt(value); }
});
3.Java 8 Lambdas

Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。

data.filter(s -> s.startsWith("http://"));data.reduce((i1,i2) -> i1 + i2);
4.Rich functions

所有需要用户自定义 function 的转化操作都可以将 rich function 作为参数。

class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

替换成

class MyMapFunction extends RichMapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

并将 function 照常传递给 map transformation。

data.map(new MyMapFunction());

Rich functions 也可以定义成匿名类:

data.map (new RichMapFunction<String, Integer>() {public Integer map(String value) { return Integer.parseInt(value); }
});
2)累加器 和 计数器
1.概述

累加器是具有加法运算最终累加结果的一种简单结构,可在作业结束后使用

最简单的累加器是计数器: 可以使用 Accumulator.add(V value) 方法将其递增。

在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端,Flink 目前有如下内置累加器,每个都实现了累加器接口。

  • IntCounter , LongCounterDoubleCounter
  • Histogram(直方图): 离散数量的柱状直方图实现;在内部,它只是整形到整形的映射,可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况【详见 Metrics】。
2.使用累加器

首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。

private IntCounter numLines = new IntCounter();

其次,必须在 rich function 的 open() 方法中注册累加器对象,也可以在此处定义累加器的名称。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

在操作 function 中的任何位置(包括 open()close() 方法中)使用累加器。

this.numLines.add(1);

最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中(当前只有等待作业完成后执行才起作用)。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器共享一个命名空间,因此可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用;Flink 计划在下一次迭代中提供上一次的迭代结果;可以使用 聚合器 来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

3.定制累加器

自定义累加器只需要实现累加器接口,可以选择实现 Accumulator 或 SimpleAccumulator。

Accumulator 的实现十分灵活: 它定义了将要添加的值类型 V,并定义了最终的结果类型 R;例如,对于直方图,V 是一个数字且 R 是一个直方图。

SimpleAccumulator 适用于两种类型都相同的情况,例如计数器。

4.总结
1.通过调用 execute() 方法返回的 JobExecutionResult 对象获得累加器结果(只有等待作业完成后执行才起作用)。2.单个作业的所有累加器共享一个命名空间,可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。


http://www.ppmy.cn/devtools/27381.html

相关文章

【C++】模板初阶

&#x1f525;个人主页&#xff1a; Forcible Bug Maker &#x1f525;专栏&#xff1a; C 目录 前言泛型编程模板函数模板概念及简单使用函数模板的原理函数模板的实例化模板参数的匹配原则 类模板概念及简单使用类模板的实例化 结语 前言 本篇博客主要内容&#xff1a;初步接…

3D头模加载

目录 mesh参数 psbody加载 psbody示例 trimesh加载 加载动画&#xff1a; openmesh mesh参数 5023个顶点 9976个面。 from psbody.mesh import Mesh选择相机参数&#xff1a; python Copy code if template_type "flame":camera_params {c: np.array([400…

vscode连接远程Linux服务器时,没有权限新建文件夹或者文件

参考链接&#xff1a; VS code 保存或新建文件没有权限的问题 vscode连接远程Linux服务器时&#xff0c;没有权限新建文件夹或者文件&#xff1a; 用一条命令解决&#xff1a; sudo chown -R myuser /path/to/foldermyuser是当前用户名&#xff0c; /path/to/folder是 需要操…

排序-八大排序FollowUp

FollowUp 1.插入排序 (1).直接插入排序 时间复杂度:最坏情况下:0(n^2) 最好情况下:0(n)当数据越有序 排序越快 适用于: 待排序序列 已经基本上趋于有序了! 空间复杂度:0(1) 稳定性:稳定的 public static void insertSort(int[] array){for (int i 1; i < array.length; i…

区块链 | IPFS 工作原理入门

&#x1f98a;原文&#xff1a;What is the InterPlanetary File System (IPFS), and how does it work? &#x1f98a;写在前面&#xff1a;本文属于搬运博客&#xff0c;自己留存学习。 1 去中心化互联网 尽管万维网是一个全球性的网络&#xff0c;但在数据存储方面&#…

MT3608B 航天民芯代理 1.2Mhz 24V输入 升压转换器

深圳市润泽芯电子有限公司为航天民芯一级代理商 技术支持欢迎试样~Tel&#xff1a;18028786817 简述 MT3608B是恒定频率的6针SOT23电流模式升压转换器&#xff0c;用于小型、低功耗应用。MT3608B开关频率为1.2MHz&#xff0c;允许使用微小、低电平成本电容器和电感器高度不…

Web前后端交互

前端与后端之间的交互是Web应用程序中至关重要的组成部分&#xff0c;它们通过一系列技术和协议进行数据交换&#xff0c;以实现用户界面与服务器端业务逻辑及数据存储的协同工作。以下是一些常见的交互方式&#xff1a; ### 1. HTTP请求&#xff08;主要包括AJAX、Fetch API&…

flutter开发实战-混淆minifyEnabled及shrinkResources

flutter开发实战-混淆minifyEnabled及shrinkResources 最近开发中&#xff0c;出现了在Debug模式下完全正常&#xff0c;打包build后出现插件代码调用提示未实现。 No implementation found for method login on channel app_plugin 经过查找发现在build apk时候出现了混淆的问…