01 UDAF 聚合函数的使用
自定义聚合函数(UDAF),将多条记录聚合成一条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。
需要注意的是:当前聚合函数仅在流模式下的 GroupBy 聚合和 Group Window 聚合中支持通用的用户定义聚合功能;对于批处理模式,当前不支持该模式,需要使用向量化聚合函数。
1.1 UDAF 的处理逻辑
聚合函数的处理过程以累加器 accumulator
的为中心,累加器是一种中间数据结构,用于存储将多行输入计算出的最终聚合结果,即用来存储聚合的中间结果。
围绕累加器 accumulator
,一个聚合任务还需要如下三个方法:
create_accumulator()
:用来初始化自定义的累加其accumulator
,将内部定义的变量赋值为空或者0。accumulate()
:定义根据输入更新accumulator
的逻辑,主要是编写中间的逻辑代码,根据输入变量来更新输出中间变量。get_value()
:定义如何返回accumulator
中存储的中间结果,作为UDAF的最终结果。
一个聚合处理过程如下图所示:
上例中,我们想要计算出饮品价目表中最高的价格,其中饮品价目表包含三个属性 (id, name, price)
和五条数据。
聚合处理过程中:首先,使用 create_accumulator()
为要处理的数据构造一个空累加器;然后,使用 accumulate()
方法根据输入的每条数据更新累加器中存储的中间结果;在所有数据都处理完成后,使用 get_value()
方法计算中间结果中最大值,即要返回的最终结果。
1.2 聚合函数的使用
如果要定义 Python 聚合函数, 可以通过继承 pyflink.table
中的基类 AggregateFunction,并实现 accumulate()
方法。 聚合函数的返回结果类型和累加器类型可以通过两种方式指定:
- 实现
get_result_type()
方法和get_accumulator_type()
方法 - 使用 udaf 装饰器封装函数实例并指明类型参数
result_type
和accumulator_type
一个 UDAF 的使用示例如下:
定义 UDAF
class WeightedAvg(AggregateFunction):# 为了计算加权平均值,累加器需要存储所有已累计数据的加权和和计数。在本示例中,我们使用行对象作为累加器。def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator):if accumulator[1] == 0:return Noneelse:return accumulator[0] / accumulator[1]def accumulate(self, accumulator, value, weight):accumulator[0] += value * weightaccumulator[1] += weight# retract() 方法常应用在当前聚合操作之前存在可能生成收回消息的操作,例如组聚合、外部联接。def retract(self, accumulator, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightdef get_result_type(self):return DataTypes.BIGINT()def get_accumulator_type(self):return DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())])
使用 UDAF
# 创建流处理环境
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)# 注册 UDAF
# 可以在其中指定结果类型,定义中已经实现了`get_result_type()` 和 `get_accumulator_type()` 方法指明了类型,不需要在重复指明
# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), accumulator_type=...)
weighted_avg = udaf(WeightedAvg())t = table_env.from_elements([(1, 2, "Lee"),(3, 4, "Jay"),(5, 6, "Jay"),(7, 8, "Lee")]).alias("value", "count", "name")# 调用 UDAF
result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).to_pandas()
print(result)
其他调用 UDAF 的方式
- Table API 中注册及调用 UDAF
# 注册 UDAF
table_env.create_temporary_function("weighted_avg", WeightedAvg())# Table API 中调用注册的 UDAF
result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).to_pandas()
print(result)
- SQL 中注册及调用 UDAF
# 注册 UDAF
table_env.create_temporary_view("source", t)# SQL 中调用注册的 UDAF
result = table_env.sql_query("SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY name").to_pandas()
print(result)
- 在 GroupBy Window 聚合中使用 Python 聚合函数
tumble_window = Tumble.over(lit(1).hours) \.on(col("rowtime")) \.alias("w")result = t.window(tumble_window) \.group_by(col('w'), col('name')) \.select("w.start, w.end, weighted_avg(value, count)") \.to_pandas()
print(result)
完整代码
from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.expressions import call
from pyflink.table.udf import udaf
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumbleclass WeightedAvg(AggregateFunction):def create_accumulator(self):# Row(sum, count)return Row(0, 0)def get_value(self, accumulator):if accumulator[1] == 0:return Noneelse:return accumulator[0] / accumulator[1]def accumulate(self, accumulator, value, weight):accumulator[0] += value * weightaccumulator[1] += weightdef retract(self, accumulator, value, weight):accumulator[0] -= value * weightaccumulator[1] -= weightdef get_result_type(self):return DataTypes.BIGINT()def get_accumulator_type(self):return DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())])env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# the result type and accumulator type can also be specified in the udaf decorator:
# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), accumulator_type=...)
weighted_avg = udaf(WeightedAvg())
t = table_env.from_elements([(1, 2, "Lee"),(3, 4, "Jay"),(5, 6, "Jay"),(7, 8, "Lee")]).alias("value", "count", "name")# call function "inline" without registration in Table API
result = t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg")).to_pandas()
print(result)# register function
table_env.create_temporary_function("weighted_avg", WeightedAvg())# call registered function in Table API
result = t.group_by(t.name).select(call("weighted_avg", t.value, t.count).alias("avg")).to_pandas()
print(result)# register table
table_env.create_temporary_view("source", t)# call registered function in SQL
result = table_env.sql_query("SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY name").to_pandas()
print(result)# use the general Python aggregate function in GroupBy Window Aggregation
tumble_window = Tumble.over(lit(1).hours) \.on(col("rowtime")) \.alias("w")result = t.window(tumble_window) \.group_by(col('w'), col('name')) \.select("w.start, w.end, weighted_avg(value, count)") \.to_pandas()
print(result)
1.3 聚合函数的视图 View
PyFlink 提供了更加高效的列表和字典存储结构 ListView
和 MapView
,可以用于存储更大量的数据。
但是将 ListView
和 MapView
用于聚合操作是,累加器 accumulator
必须是 Row
,且 ListView
和 MapView
必须是被声明在第一层。
使用方法入下所示:
from pyflink.table import ListViewclass ListViewConcatAggregateFunction(AggregateFunction):def get_value(self, accumulator):return accumulator[1].join(accumulator[0])def create_accumulator(self):return Row(ListView(), '')def accumulate(self, accumulator, *args):accumulator[1] = args[1]accumulator[0].add(args[0])def get_accumulator_type(self):return DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),DataTypes.FIELD("f1", DataTypes.BIGINT())])def get_result_type(self):return DataTypes.STRING()
1.4 向量化聚合函数
前面我们已经提到当前聚合函数仅在流模式下的 GroupBy 聚合和 Group Window 聚合中支持通用的用户定义聚合功能;对于批处理模式,当前不支持该模式,需要使用向量化聚合函数。
PyFlink 中向量化聚合函数以一个或多个 pandas.Series 类型的参数作为输入,并返回一个标量值作为输出。
向量化聚合函数不支持部分聚合,而且一个组或者窗口内的所有数据, 在执行的过程中,会被同时加载到内存,所以需要确保所配置的内存大小足够容纳这些数据。
如下示例中,展示了如何定义一个自定义向量化聚合函数,并在 GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation
中使用该函数。
定义自定义向量化聚合函数
# func_type="pandas" 输入类型
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def weighted_avg(value):return value.mean()
使用自定义向量化聚合函数
# 创建批处理环境
settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)weighted_avg = udaf(WeightedAvg())my_table = table_env.from_elements([(1, 2, "Lee"),(3, 4, "Jay"),(5, 6, "Jay"),(7, 8, "Lee")]).alias("value", "count", "name")# 在 GroupBy Aggregation 中使用向量化聚合函数
my_table.group_by(my_table.name).select(my_table.name, weighted_avg(add(my_table.value)))# 在 GroupBy Window Aggregation 中使用向量化聚合函数
tumble_window = Tumble.over(expr.lit(1).hours) \.on(expr.col("rowtime")) \.alias("w")my_table.window(tumble_window) \.group_by("w") \.select("w.start, w.end, weighted_avg(value)")# 在 Over Window Aggregation 中使用向量化聚合函数
table_env.create_temporary_function("weighted_avg", weighted_avg)
table_env.sql_query("""SELECT name,weighted_avg(value)over (PARTITION BY a ORDER BY rowtimeROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)FROM MyTable""")
02 Kafka 连接器
Flink 的 Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力
2.1 下载依赖包
为了使用Kafka连接器,使用构建自动化工具(如Maven或SBT)的项目和使用SQL JAR包的SQL Client项目都需要下载依赖项 flink-connector-kafka_2.11。
Kafka 连接器目前并不包含在 Flink 的二进制发行版中,请查阅 这里 了解如何在集群运行中引用 Kafka 连接器。
2.2 创建 Kafka 表
作业中加入上述依赖包之后,使用 SQL / Table API 的 Kafka table 可以按如下定义:
CREATE TABLE KafkaTable (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
)
Kafka 连接器的一些参数与含义如下:
connector
:指定使用什么类型的连接器,这里应该是’kafka’。topic
:Kafka 记录的 Topic 名。properties.bootstrap.servers
:逗号分隔的 Kafka broker 列表。properties.group.id
:Kafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 “KafkaSource-{tableIdentifier}” 作为消费组 ID。scan.startup.mode
:Kafka consumer 的启动模式。有效值为:‘earliest-offset’,‘latest-offset’,‘group-offsets’,‘timestamp’ 和 ‘specific-offsets’。format
:用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘value.format’ 二者必需其一。
03 实时排行榜
本实例使用 Flink 的有状态流处理和滑动窗口,实现实时点击量排行榜。
该实例统计过去 1 分钟内,点击量最高的男女用户各 10 名及其具体的点击数,同时每隔 1 秒(实时)更新统计结果,等到排行榜数据并将结果同步到 kafka 中。
3.1 构建数据模拟器
首先,我们需要模拟实时产生的用户操作数据。
本实例中,我们编写一个 data_producer.py
的脚本,实时随机产生用户操作数据,并批量写入到 Kafka 中
每条写入 kafka 的用户操作数据包含如下字段:
{"ts": "2020-01-01 01:01:01", # 当前时间"name": "刘备", # 从根据性别随机产生的 50 个姓名里随机选择"sex": "男", # 性别,60%概率为“男”,40%概率为“女”"action": "click", # 动作,90%概率为“click”,10%概率为“scroll”"is_delete": 0, # 是否要丢弃,90%概率为“0”(不丢弃),10%概率为1“丢弃”
}
构建候选用户组
我们创建一个用户类,生成候选用户组,并能够随机获取用户信息。
该类用于在向 Kafka 批量写入用户操作数据时随机生成用户信息。
seed = 2020 # 设置随机数种子,保证每次运行的结果都一样
num_users = 50 # 为了使得最后的结果不至于太平均,只初始化了 50 个用户,该 50 个用户有不同的概率来产生上面的数据
fake = Faker(locale='en_US') # fake 第三方库生成随机用户名称
Faker.seed(seed)
random.seed(seed)class UserGroup:def __init__(self):# 为指定数量的用户分配不同的出现概率,每次按概率分布获取用户姓名self.users = [self.gen_male() if random.random() < 0.6 else self.gen_female() for _ in range(num_users)]prob = np.cumsum(np.random.uniform(1, 100, num_users)) # 用户点击次数的累加self.prob = prob / prob.max() # 点击次数归一化,转换成点击率# 静态方法生成男性用户信息和女性用户信息@staticmethoddef gen_male():return {'name': fake.name_male(), 'sex': 'male'}@staticmethoddef gen_female():return {'name': fake.name_female(), 'sex': 'female'}# 获取随机用户信息def get_user(self):r = random.random() # 生成一个 0~1的随机数index = np.searchsorted(self.prob, r)return self.users[index]
生成用户操作数据
使用 Kafka 生产者产生用户操作数据
max_msg_per_second = 20 # 每秒钟的最大消息数
run_seconds = 3600 # 脚本最长运行时间,防止无限写入 kafka
topic = "user_action" # kafka topic
bootstrap_servers = ['localhost:9092']def write_data():group = UserGroup()start_time = datetime.now()# 初始化 kafka 生产者producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda x: dumps(x).encode('utf-8'))# 生产用户操作数据,并发送到 kafkawhile True:# 创建用户操作数据now = datetime.now()user = group.get_user()cur_data = {"ts": now.strftime("%Y-%m-%d %H:%M:%S"),"name": user['name'],"sex": user['sex'],"action": 'click' if random.random() < 0.9 else 'scroll', # 用户的操作"is_delete": 0 if random.random() < 0.9 else 1 # 10% 的概率丢弃这条数据}# 将数据写入 kafka topicproducer.send(topic, value=cur_data)# 终止条件if (now - start_time).seconds > run_seconds:break# 停止时间sleep(1 / max_msg_per_second)
查看用户操作数据
使用 Kafka 消费者查看已经写入的用户操作数据
消费者的初始化参数:
- group_id:在高并发量情况下,则需要有多个消费者协作,此时消费进度由
group_id
统一。例如消费者A与消费者B,在初始化时使用同一个group_id
。在进行消费时,一条消息被消费者A消费后,在kafka中会被标记,如果这条消息被A消费后且正确 commit,则该消息不会再被B消费。 - auto_offset_reset:该参数指定消费者启动的时刻。通常情况下,消息队列中可能会有已经堆积的未消费消息,有时候需求是从上一次未消费的位置开始读(则该参数设置为
earliest
);有时候的需求为从当前时刻开始读之后产生的,之前产生的数据不再消费(则该参数设置为latest
)。
# 读取 kafka 的用户操作数据并打印
def print_data():consumer = KafkaConsumer(topic, # topic的名称group_id= 'group', bootstrap_servers=bootstrap_servers, # 指定kafka服务器auto_offset_reset='latest', )for msg in consumer:print(msg.value.decode('utf-8').encode('utf-8').decode('unicode_escape'))
数据模拟生成完整代码
3.2 根据输入数据和输出结果创建输入输出表
本实例的数据来源于 kafka 并将处理结果也输出到 kafka,所以我们要创建 kafka 表并指定topic, kafka_servers, group_id
等必要参数如下:
kafka_servers = "localhost:9092"
kafka_consumer_group_id = "group1" # group ID
source_topic = "user_action" # 源数据
sink_topic = "click_rank" # 结果
本实例的数据对象就是用户的操作数据,输入数据包含 name:姓名,sex:性别,action:操作,is_delete:删除状态,ts:点击时间
共五个字段,创建源表如下:
source_ddl = """CREATE TABLE source (name VARCHAR, sex VARCHAR, action VARCHAR, is_delete BIGINT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 声明 ts 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark) with ('connector' = 'kafka','topic' = '{source_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','format' = 'json')
"""t_env.execute_sql(source_ddl)
本实例的统计结果包含 male_top10:点击量最高的 10 个男性用户,female_top10:点击量最高的 10 个女性用户,start_time:窗口开始时间,end_time:窗口结束时间
共四个字段,创建结果表如下:
sink_ddl = """CREATE TABLE sink (male_top10 STRING, female_top10 STRING, start_time TIMESTAMP(3), end_time TIMESTAMP(3) ) with ('connector' = 'kafka','topic' = '{sink_topic}','properties.bootstrap.servers' = '{kafka_servers}','properties.group.id' = '{kafka_consumer_group_id}','scan.startup.mode' = 'latest-offset','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','format' = 'json')
"""t_env.execute_sql(sink_ddl)
3.3 编写用户自定义聚合函数 UDAF
在 PyFlink 中定义 UDAF 需要 Flink >= 1.12
,使用 UDAF 可以将多行的标量值映射到新的标量值。
本例子中我们需要使用滑动窗口计算点击率前十的用户,使用向量化的 Python 聚合函数( Pandas UDAF )进行 windows 聚合,即在使用 UDAF 时,指定参数 func_type=“pandas”。
用于统计点击量最多的 10 个男性和女性的向量化聚合函数如下所示:
# 统计点击量最多的 10 个男人(只统计 sex=male、action=click 的数量,忽略 is_delete=1 的数据)
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def male_click_top10(name, sex):names = name[sex == 'male']return names.value_counts().iloc[:10].to_json()# 统计点击量最多的 10 个女人(只统计 sex=female、action=click 的数量,忽略 is_delete=1 的数据)
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def female_click_top10(name, sex, action, is_delete):names = name[sex == 'female']return names.value_counts().iloc[:10].to_json()
3.4 流处理完整代码
除了上述源表和结果表的创建,以及定义 UDAF 聚合函数,流处理过程中要需要完成如下任务:
- 创建流处理环境
- 指定 kafka 依赖
- 注册 UDAF
- 使用 UDAF 完成流处理任务
3.5 打印实时排行榜
完成流处理任务之后,实时排行结果被写入到 kafka 的 click_rank
topic 中,我们从该 topic 中读取用户操作数据并打印
"""
读取 kafka 的用户操作数据并打印
"""
from kafka import KafkaConsumer
from reprint import output
import jsontopic = 'click_rank'
bootstrap_servers = ['localhost:9092']
group_id = 'group1'def sink_output():consumer = KafkaConsumer(topic, group_id=group_id, bootstrap_servers=bootstrap_servers, auto_offset_reset='latest', )with output(output_type="list", initial_len=22, interval=0) as output_lines:# 初始化打印行 for i in range(14):if i==0 :output_lines[i] = '=== 窗口时间 ==='elif i==2 :output_lines[i] = '=== 男 ==='elif i == 8 :output_lines[i] = '=== 女 ==='else:output_lines[i] = 'name click'for msg in consumer:# 解析结果data = json.loads(msg.value)male_rank = json.loads(data['male_top10'])female_rank = json.loads(data['female_top10'])start_time = data['start_time']end_time = data['end_time']output_lines[1] = f'开始时间{start_time:6s} 结束时间{end_time}'# 逐行打印for i in range(5):if i < len(male_rank):name = list(male_rank.keys())[i]value = list(male_rank.values())[i]output_lines[i+3] = f'{name:6s} {value}'else:output_lines[i+3] = ''for i in range(5):if i < len(female_rank):name = list(female_rank.keys())[i]value = list(female_rank.values())[i]output_lines[i+9] = f'{name:6s} {value}'else:output_lines[i+9] = ''if __name__ == "__main__":sink_output()
3.6 运行实例
首先我们使用 docker 按照如下容器编排创建一个 kafka,同时构建一个 zookeeper 与 kafka 结合一起使用,用于管理 kafka 的 broker,以及实现负载均衡。
version: "3.5"
services:zookeeper:image: zookeeper:3.6.2ports:- "2181:2181" ## 对外暴露的 zookeeper 端口号container_name: zookeeperkafka:image: wurstmeister/kafka:2.13-2.6.0volumes:- /etc/localtime:/etc/localtime ## kafka 镜像和宿主机器之间时间保持一致ports:- "9092:9092" ## 对外暴露的 kafka 端口号depends_on:- zookeeperenvironment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_PORT: 9092KAFKA_BROKER_ID: 1KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1" ## 自动创建 topicscontainer_name: kafka
1 启动容器环境
docker-compose up -d
2 运行数据模拟程序
python data_producer.py
3 运行流处理任务程序
flink run -m localhost:8081 -python ranklist.py
4 运行排行榜打印程序
python data_comsumer.py
5 运行结果
参考资料
Flink 官方文档:向量化聚合函数官方文档
Flink 官方文档:Apache Kafka SQL 连接器
PyFlink 从入门到精通