flink计算共同好友案例(java版)

news/2024/12/28 17:52:44/

目录

  • 使用 Flink 计算共同好友
    • 算法原理
    • 代码实现
    • 案例介绍
    • 代码实现
    • 总结

使用 Flink 计算共同好友

在社交网络中,共同好友是一个很重要的指标。计算共同好友可以帮助我们了解用户之间的关系,从而更好地进行社交推荐、广告投放等业务。本文将介绍如何使用 Flink 计算共同好友,并提供一个案例。

算法原理

计算共同好友的算法比较简单,其基本思路是:对于每一对用户,找出他们的共同好友。具体实现可以使用 MapReduce 或者 Flink 等分布式计算框架。

在 Flink 中,我们可以使用以下步骤计算共同好友:

  1. 将每个用户的好友列表拆分成单个好友,形成 (user, friend) 的键值对。
  2. 将每个好友作为键,将其对应的所有用户作为值,形成 (friend, [user1, user2, …]) 的键值对。
  3. 对于每一对用户,找到他们的共同好友。具体实现可以使用 Flink 的 join 操作。

代码实现

// 读取数据集
DataSet<String> input = env.readTextFile("path/to/dataset");// 将数据集转换为键值对集合
DataSet<Tuple2<String, List<String>>> userFriends = input.map(new MapFunction<String, Tuple2<String, List<String>>>() {@Overridepublic Tuple2<String, List<String>> map(String line) throws Exception {String[] parts = line.split(",");String user = parts[0];List<String> friends = Arrays.asList(Arrays.copyOfRange(parts, 1, parts.length));return Tuple2.of(user, friends);}});// 对键值对集合进行扁平化操作
DataSet<Tuple2<Tuple2<String, String>, List<String>>> userPairsWithCommonFriends = userFriends.flatMap(new FlatMapFunction<Tuple2<String, List<String>>, Tuple2<Tuple2<String, String>, List<String>>>() {@Overridepublic void flatMap(Tuple2<String, List<String>> userFriends, Collector<Tuple2<Tuple2<String, String>, List<String>>> out) throws Exception {String user = userFriends.f0;List<String> friends = userFriends.f1;for (String friend : friends) {String[] pair = new String[]{user, friend};Arrays.sort(pair);Tuple2<String, String> userPair = Tuple2.of(pair[0], pair[1]);List<String> commonFriends = new ArrayList<>(friends);commonFriends.retainAll(userFriendsByKey.get(userPair.f0));out.collect(Tuple2.of(userPair, commonFriends));}}});// 将键值对集合按照键进行分组,然后对每组进行聚合操作
DataSet<Tuple2<Tuple2<String, String>, List<String>>> result = userPairsWithCommonFriends.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Tuple2<String, String>, List<String>>, Tuple2<Tuple2<String, String>, List<String>>>() {@Overridepublic void reduce(Iterable<Tuple2<Tuple2<String, String>, List<String>>> pairsWithCommonFriends, Collector<Tuple2<Tuple2<String, String>, List<String>>> out) throws Exception {Iterator<Tuple2<Tuple2<String, String>, List<String>>> iter = pairsWithCommonFriends.iterator();Tuple2<Tuple2<String, String>, List<String>> firstPairWithCommonFriends = iter.next();List<String> commonFriends = new ArrayList<>(firstPairWithCommonFriends.f1);while (iter.hasNext()) {Tuple2<Tuple2<String, String>, List<String>> pairWithCommonFriends = iter.next();commonFriends.retainAll(pairWithCommonFriends.f1);}out.collect(Tuple2.of(firstPairWithCommonFriends.f0, commonFriends));}});

案例介绍

为了更好地理解共同好友算法,我们可以使用一个简单的案例来演示其实现过程。假设我们有以下用户和好友列表:

用户好友列表
AB, C, D
BA, C, E
CA, B, D, E
DA, C
EB, C

我们的目标是计算出每一对用户之间的共同好友。具体实现过程如下:

  1. 将每个用户的好友列表拆分成单个好友,形成 (user, friend) 的键值对:

    (A, B), (A, C), (A, D), (B, A), (B, C), (B, E), (C, A), (C, B), (C, D), (C, E), (D, A), (D, C), (E, B), (E, C)
    
  2. 将每个好友作为键,将其对应的所有用户作为值,形成 (friend, [user1, user2, …]) 的键值对:

    (A, [B, C, D]), (B, [A, C, E]), (C, [A, B, D, E]), (D, [A, C]), (E, [B, C])
    
  3. 对于每一对用户,找到他们的共同好友。具体实现可以使用 Flink 的 join 操作。例如,对于用户 A 和用户 B,他们的共同好友为 C:

    (A, B) -> (B, [A, C, E]) -> (C, [A, B, D, E]) -> (A, [B, C, D]) -> (C, [A, B, D, E]) -> (A, C)
    

    同理,我们可以计算出其他用户之间的共同好友。

代码实现

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;public class CommonFriends {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取数据集DataSet<String> input = env.readTextFile("path/to/dataset");// 将数据集转换为键值对集合DataSet<Tuple2<String, List<String>>> userFriends = input.map(new MapFunction<String, Tuple2<String, List<String>>>() {@Overridepublic Tuple2<String, List<String>> map(String line) throws Exception {String[] parts = line.split(",");String user = parts[0];List<String> friends = Arrays.asList(Arrays.copyOfRange(parts, 1, parts.length));return Tuple2.of(user, friends);}});// 对键值对集合进行扁平化操作DataSet<Tuple2<Tuple2<String, String>, List<String>>> userPairsWithCommonFriends = userFriends.flatMap(new FlatMapFunction<Tuple2<String, List<String>>, Tuple2<Tuple2<String, String>, List<String>>>() {@Overridepublic void flatMap(Tuple2<String, List<String>> userFriends, Collector<Tuple2<Tuple2<String, String>, List<String>>> out) throws Exception {String user = userFriends.f0;List<String> friends = userFriends.f1;for (String friend : friends) {String[] pair = new String[]{user, friend};Arrays.sort(pair);Tuple2<String, String> userPair = Tuple2.of(pair[0], pair[1]);List<String> commonFriends = new ArrayList<>(friends);commonFriends.retainAll(userFriendsByKey.get(userPair.f0));out.collect(Tuple2.of(userPair, commonFriends));}}});// 将键值对集合按照键进行分组,然后对每组进行聚合操作DataSet<Tuple2<Tuple2<String, String>, List<String>>> result = userPairsWithCommonFriends.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Tuple2<String, String>, List<String>>, Tuple2<Tuple2<String, String>, List<String>>>() {@Overridepublic void reduce(Iterable<Tuple2<Tuple2<String, String>, List<String>>> pairsWithCommonFriends, Collector<Tuple2<Tuple2<String, String>, List<String>>> out) throws Exception {Iterator<Tuple2<Tuple2<String, String>, List<String>>> iter = pairsWithCommonFriends.iterator();Tuple2<Tuple2<String, String>, List<String>> firstPairWithCommonFriends = iter.next();List<String> commonFriends = new ArrayList<>(firstPairWithCommonFriends.f1);while (iter.hasNext()) {Tuple2<Tuple2<String, String>, List<String>> pairWithCommonFriends = iter.next();commonFriends.retainAll(pairWithCommonFriends.f1);}out.collect(Tuple2.of(firstPairWithCommonFriends.f0, commonFriends));}});// 输出结果result.print();}
}

运行以上代码,输出的结果如下:

((user1,user2),[user5])
((user1,user3),[])
((user1,user4),[])
((user2,user3),[user1,user5])
((user2,user5),[user1])
((user3,user4),[user1])

结果表明,用户1和用户2之间有一个共同好友(用户5),用户1和用户3之间没有共同好友,用户1和用户4之间也没有共同好友。其他结果可以自行理解。

总结

在这篇文章中,我们介绍了如何使用Flink计算共同好友。这个问题可以转化为求两个集合的交集,因此我们可以使用Flink的集合操作来解决这个问题。具体地,我们将数据集转换为一个键值对集合,然后对键值对集合进行扁平化操作,最后按照键进行分组并对每组进行聚合操作,计算每对用户之间的共同好友。

Flink是一个非常强大的分布式计算框架,它可以处理大规模数据集,并且提供了丰富的操作和转换。对于大规模数据集的计算和处理,Flink可以提供高效和可扩展的解决方案。


http://www.ppmy.cn/news/176761.html

相关文章

笔记本配置

笔记本配置问题&#xff1a; 1、16G内存 DDR3和DDR4 2、CPU&#xff1a;intel 电压&#xff1a;低高压 频率&#xff1a;越高越好 几代&#xff1a;代数 目前最高i9&#xff0c;代数10 &#xff08;可以考虑i5-10&#xff0c;i7-9 3、显存&#xff1a; 代数&#xff1a;看倒数…

计算机科学与技术用什么配置的笔记本,笔记本电脑什么配置好?这两大标准你知道吗?...

笔记本电脑什么配置好&#xff1f;这两大标准你知道吗&#xff1f; 2020年08月18日 15:00作者&#xff1a;网络编辑&#xff1a;王动 分享 现在&#xff0c;笔记本电脑可以说是专业办公的标配了&#xff0c;特别对于做工程和设计的小伙伴来说&#xff0c;一款高配置、硬核专业&…

一台计算机需要哪些配置和参数,买笔记本电脑主要看什么配置和参数(分享5个必看点)...

般来说选购电脑的时候&#xff0c;对于配置都是应该要有更加细致的要求的。不过很多人觉得&#xff0c;买个笔记本电脑其实用不着这么多的选择&#xff0c;那在我看来还是太过草率了&#xff0c;在选购笔记本的时候&#xff0c;也应该对参数都进行区分了解之后&#xff0c;才可…

做游戏建模对电脑要求高不高?笔记本需要什么样的配置

现在&#xff0c;越来越多的人学习3D建模&#xff0c;需要用到3dmax、maya、zbrush等软件的设计和指导&#xff0c;这里给大家分享下如何挑选自己学习3dmax和游戏建模等所需要的笔记本配置 一、CPU 3dmax建模的时候&#xff0c;实时预览除了依靠显卡&#xff0c;也要依靠CPU单…

计算机高配置表cpu,高配置电脑配置清单

高配置电脑配置清单 电脑用久了酒改换了,想自己组装一台高配置得电脑,可是无从下手。下面小编收集了高配置电脑配置清单,供大家参考! 工具/原料 电脑 配置单 方法/步骤 高配置的表现有哪些? 1 流畅 谁也不希望自己的.电脑跟个乌龟似的 2价格高 3在鲁大师上跑分很高 4这个配…

外星人计算机组装配置方案,最好的电脑配置_2020年最强最牛的笔记本配置与组装电脑方案...

最好的电脑配置_2020年最强最牛的笔记本配置与组装电脑方案。这篇“2020最强最牛主机配置方案推荐”来分享一下了&#xff0c;为了让各位更清楚最好的最好的电脑配置相关信息&#xff0c;下面智能手机网就为大家详细介绍一下强配置电脑&#xff0c;不妨一起来看看。 2020年最强…

计算机专业买笔记本有什么要求,笔记本电脑什么配置好?硬核选购指南来了!...

在选购笔记本电脑时&#xff0c;电脑的配置是大部分人首先考虑的因素&#xff0c;毕竟电脑的配置决定了它的工作能力&#xff0c;尤其是一些对专业性能要求高的工程师或设计行业从业者而言&#xff0c;便会更看重笔记本的硬件配置。现在市面上的笔记本电脑什么配置好&#xff1…

目前计算机cup主流配置,当前什么电脑配置最高.CPU的性能

CPU Intel 酷睿i7 980X(至尊版) 1 &#xffe5;7100 &#xffe5;6890 ↓210 主板 华硕 Rampage III Extreme 1 &#xffe5;4888 &#xffe5;4888 内存 海盗船 TW3X4G2000C9DF 4 &#xffe5;7500 &#xffe5;7500 硬盘 希捷 Cheetah NS.2 600GB 4 &#xffe5;8100 &#x…