demo代码
String worlds = "flink,spark,hadoop,zk,kafka";streamSource.flatMap(new RichFlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> collector) throws Exception {String[] worlds = value.split(",");for (String world : worlds) {collector.collect(world);}}}).keyBy(new KeySelector<String, String>() {@Overridepublic String getKey(String key) throws Exception {return key;}}).process(new KeyedProcessFunction<String, String, String>() {@Overridepublic void processElement(String key,KeyedProcessFunction<String, String, String>.Context ctx,Collector<String> out) throws Exception {out.collect(key);}}).print("->");env.execute();
运行结果:
源码查看
我们进去keyBy算子,看看计算逻辑是怎样的:
根据调用逻辑,我们抽象出分区数据的计算逻辑:
(MathUtils.murmurHash(key.hashCode()) % maxParallelism) * parallelism / maxParallelism ;
其中:
- maxParallelism:默认128
- parallelism:并行度数
- key:分区键
验证
我们把抽出来的逻辑加到计算结果中查看
process(new KeyedProcessFunction<String, String, String>() {@Overridepublic void processElement(String key,KeyedProcessFunction<String, String, String>.Context ctx,Collector<String> out) throws Exception {String currentKey = ctx.getCurrentKey();int i = (MathUtils.murmurHash(key.hashCode()) % 128) * parallelism / 128 ;out.collect(key + "_" +i);out.collect(key);}
处理结果: