5_SparkGraphX讲解

devtools/2024/12/28 19:46:58/

SparkGraphX讲解

1、为何使用SparkGraphiX图处理?

许多大数据大规模图网络的形式呈现,尤其是许多的非图结构的大数据,常会被转换为图模型进行分析。
图数据结构能够很好地表达数据之间的关联性

2、图——基本术语认知

概念:图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构,可以对事物之间的关系建模。通常表示为二元组:Graph =(V,E)

图中基本知识点

  • 图可以分为有向图无向图有环图无环图等。
  • 度:指与这个顶点相关联的边的数量

    对于有向图而言:

    • 出度:指从当前顶点指向其他顶点的边的数量
      入度:其他顶点指向当前顶点的边的数量
    • 度 = 出度+入度

3、SparkGraphX简介

3.1:基本概念

Spark GraphX 是一个分布式图处理框架,为图计算和图挖掘提供了简洁易用且丰富多彩的接口。

3.2:Graph X 特点

  • 基于内存实现了数据的复用与快速读取

  • Resilient Distributed Property Graph(弹性分布式属性图)

    通过弹性分布式属性图统一了图视图(Graph)与表视图(Table)

    • GraphX的核心抽象是Resilient Distributed Property Graph(弹性分布式属性图),是一种顶点和边都带属性的有向多重图。

    • 其扩展了Spark RDD的抽象,一份物理存储,两种视图(Table和Graph)

    • 两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。

  • GraphX能与Spark Streaming、Spark sql 和 Spark MLib等无缝衔接

3.3:Graph X中有重要的概念

1Vertices(顶点):对应的RDD名称为VertexRDD【RDD[Vertex]】,包括顶点ID(VertexId)和顶点属性(VD,Vertex Data)

2Edges(边):对应的RDD名称为EdgeRDD【RDD[Edge[(String,Int)]]】,包括源顶点ID(srcId)、目标顶点ID(dstId)和边属性(ED,Edge Data)

3Triplets(三元组):对应的是关系RDD【RDD[EdgeTriplet]】,包括源顶点ID【srcId:VertexId】、源顶点属性【srcAttr:(String,Int,String)】、边属性【attr:(String,Int)】、目标顶点ID【dstId:VertexId】、目标顶点属性【dstAttr:(String,Int,String)】

4度(Degree):包括inDegrees(每个顶点入度)、outDegrees(每个顶点的出度)、degrees(每个顶点的度)

3.4:存储模式——分区切割策略

分区切割策略(PartitionStrategy):1、EdgeCut => 保证点在同一分区EdgePartition1D => 同点边(出入度)同分区EdgePartition2D => 邻接矩阵:与顶点关联的边最多被分配到(2 * sqrt(分区总数))个分区中2、VertexCut => 保证边在同一分区(Spark优选切割策略,因为边较为复杂)RandomVertexCut => 两同点同方向(同边)同分区CanonicalRandomVertexCut => 两同点边同分区

4、常用API

class Graph[VD,ED]{// 图信息val numEdges: Long					//   边数val numVertices: Long					//   点数val inDegrees: VertexRDD[Int]			//   入度val outDegrees: VertexRDD[Int]		//   出度val degrees: VertexRDD[Int]			//   度数// 属性算子def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]// 结构算子def reverse: Graph[VD, ED]def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,vpred: (VertexId, VD) => Boolean): Graph[VD, ED]// 关联算子def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
}class GraphOps[VD, ED](graph : org.apache.spark.graphx.Graph[VD, ED]){// 👉同分区👈内相同边(顶点ID顺序值都相同)合并def groupEdges(merge : scala.Function2[ED, ED, ED]) : org.apache.spark.graphx.Graph[VD, ED]
}

5、算法

页面等级(Page Rank)

用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
从本质上讲,Page Rank是找出图中顶点(网页链接)的重要性(值越大越重要)

过程简单讲解:尽可能的在分散的点中选取几个,然后这几个点进行拉取周围的点形成多个圈,在这几个圈内部会去寻找绝对中心点。依次类推,达到tol阈值时会停止迭代。

def pageRank(
tol : scala.Double, 		// 收敛最小误差,越小越好,确定迭代是否结束的参数resetProb : scala.Double	// 随机【重复概率】
) : org.apache.spark.graphx.Graph[scala.Double, scala.Double]

案列:顶点质量

graph.pageRank(0.0001,0.2).vertices.foreach(println)
-----------------------------
(1,0.6533816355047241)
(4,0.5437786042062481)
(6,0.8615213520717165)
(3,1.2889169066237742)
(2,1.0530712359694594)
(5,1.5993302656240773)
-----------------------------
三角数量

计算经过每个顶点的三角形数量
用于评估:社区、团体、机构、组织内部关系的紧密程度或稳定性,三角形越少越松散

def triangleCount() 
: org.apache.spark.graphx.Graph[scala.Int, ED]

案例:稳定性

graph.triangleCount().vertices.foreach(println) // (顶点id,参与的三角形数量)
-------------------------
(3,5)
(1,7)
(5,5)
(6,3)
(2,3)
(4,7)
-------------------------
连通分量

连通分量是一个子图,其中任何两个顶点通过一条边或一系列边相互连接,其顶点是原始图顶点集的子集,其边是原始图边集的子集

用于反应图中顶点间的连通性

def connectedComponents() : org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId, ED]
def stronglyConnectedComponents(numIter : scala.Int) : org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId, ED]

案例

// 强连通图(每两个顶点之间都存在路径)
graph.connectedComponents().vertices.foreach(println)
-----------------------------
(5,1) // 顶点 5 属于连通分量 1
(4,1) // 顶点 4 属于连通分量 1
(1,1) // 顶点 1 属于连通分量 1
(2,1) // 顶点 2 属于连通分量 1
(3,1) // 顶点 3 属于连通分量 1
(6,1) // 顶点 6 属于连通分量 1
-----------------------------
PREGEL

是Google提出的用于大规模分布式图计算框架

  • 图遍历(BFS)
  • 单源最短路径(SSSP)
  • Page Rank计算

Pregel的计算由一系列迭代组成,称为 super steps

  • 每个顶点从上一个 super step 接收入站消息
  • 计算顶点新的属性值
  • 在下一个 super step 中向相邻的顶点发送消息
  • 当没有剩余消息时,迭代结束
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection
)(vprog: (VertexID, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],mergeMsg: (A, A) => A
) : Graph[VD, ED]

案例:求最短路径(srcId为1L,到其他点的最短路径)

// 1、初始化操作
val srcId = 1L
// 初始化(点):对顶点重新定义(VertexId,Distance),保留边
// 除了 srcId本身点是(1,0),其他点为(VertexId,2100000000)
val initialVertices: Graph[Long, Int] = graph.mapVertices({// 点重新定义,边依旧是原来的边// 偏函数+模式匹配case (vertexId, t2) if (vertexId == srcId) => 0L // 若指向自己,则为0case _ => Long.MaxValue // 指向其他人距离为最大值
})// 2、生成结果图
val vertexCount: Int = initialVertices.vertices.count().toInt
val rstGraph: Graph[Long, Int] = initialVertices.pregel[Long](Int.MaxValue, // 初始化值vertexCount, // 最大迭代次数(按照顶点数量)EdgeDirection.Out
)(// 初始化:每一轮迭代初始化节点值// VD:上一轮计算的值, A:新值(VertexId, VD, A) => {val min: Long = Math.min(VD, A)println(s"$VertexId ($VD,$A) = $min")min},// 发送消息:EdgeTriplet(srcId,srcAttr,attr,dstId,dstAttr)e => {// (VertexId,距离)// srcAttr:前一个顶点的距离 -- attr:边长 --> dstAttr:后一个顶点的距离// srcAttr + attr < dstAttr 则发消息进行替换,否则不发送消息val distance: Long = e.srcAttr + e.attrif (distance < e.dstAttr) {// 发送消息进行替换println(s"sendMsg from ${e.srcId} to ${e.dstId} for ($distance,${e.dstAttr}) = $distance")Iterator((e.dstId, distance))} else {// 不发消息Iterator.empty}},// 合并(a, b) => {val min: Long = Math.min(a, b)println(s"merge($a,$b),最终选择$min")min}
)
rstGraph.vertices.foreach(println) // (到xx点,距离)
----------------------------------
(2,1)
(4,3)
(5,5)
(6,2)
(1,0)
(3,3)
----------------------------------

6、应用场景

在地图应用中寻找最短路径
社交网络关系
网页间超链接关系

7、小型案例

7.1.准备工作

数据模拟
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Set
import scala.util.Random
import scala.util.control.Breaks.{breakable,break}val buffer:ArrayBuffer[Edge[Int]] = ArrayBuffer()
val rand = new Random()
val array = Array(1L,2L,3L,4L,5L,6L) // 六个点
val set:Set[(Long,Long)] = Set() // 去重
var cnt = 0while(cnt<21){array.foreach(one=>{ // 点1breakable({while (true){val two = array(rand.nextInt(array.size)) // 点2if(one != two){val relation = (one,two) // 构成关系if(!set.contains(relation)){buffer.append(Edge(relation._1,relation._2,rand.nextInt(6)))set.add(relation)cnt += 1break() // 跳出循环}}}})})
}
------------- 输出结果形式 ------------
Edge(1,5,0)
Edge(2,1,2)
Edge(3,6,3)
Edge(4,2,1)
Edge(5,2,4)
-------------------------------------
创建图对象
// 边数据:描述点与点间关系【有方向】
val es = Seq(Edge(1L,6L,4),Edge(1L,6L,4),Edge(2L,5L,4),Edge(3L,6L,4),Edge(4L,5L,3),Edge(5L,1L,5),Edge(6L,2L,5),//Edge(5L,1L,5),//Edge(6L,2L,5),Edge(1L,5L,1),Edge(2L,3L,2),Edge(3L,4L,0),Edge(4L,2L,4),Edge(5L,2L,4),//Edge(2L,3L,2),Edge(6L,5L,3),Edge(1L,2L,1),Edge(2L,1L,0),Edge(3L,5L,3),//Edge(6L,5L,3),Edge(4L,6L,0),//Edge(2L,1L,0),Edge(5L,3L,0),Edge(6L,3L,4),Edge(1L,3L,5),Edge(2L,6L,1),Edge(3L,1L,1),//Edge(2L,6L,1),//Edge(3L,1L,1),Edge(4L,1L,5),Edge(5L,6L,3),//Edge(5L,6L,3),Edge(6L,1L,3)
)
// 点数据
val vs = Seq((1L,("jack",22)),(2L,("pola",12)),(3L,("mike",26)),(4L,("wang",17)),(5L,("liu",19)),(6L,("frimiku",18)),
)val conf: SparkConf = new SparkConf().setAppName("graphx-01").setMaster("local[4]")
val sc = new SparkContext(conf)// 点:描述的是人信息(L来表示点)
val vertex: RDD[(Long, (String, Int))] = sc.makeRDD(vs)
// 边
val edge: RDD[Edge[Int]] = sc.makeRDD(es)
// 构建图 (源码 => 基础:Graph,高阶:GraphOps)
val graph: Graph[(String, Int), Int] = Graph(vertex, edge)

7.2.实际演示

遍历
// 1、查看所有顶点
graph.vertices.foreach(println)
-----------------------------
(1,(jack,22))
(2,(pola,12))
(4,(wang,17))
(3,(mike,26))
(5,(liu,19))
(6,(frimiku,18))
-----------------------------// 2、查看所有的边【简略】
graph.edges.foreach(println)
-----------------------------
Edge(4,6,0)
Edge(4,1,5)
Edge(1,2,1)
Edge(1,3,5)
-----------------------------// 3、查看所有的边(将点带入)【完整】
graph.triplets.foreach(println)
------------------------------------
((5,(liu,19)),(6,(frimiku,18)),3)
((4,(wang,17)),(5,(liu,19)),3)
((3,(mike,26)),(4,(wang,17)),0)
((4,(wang,17)),(2,(pola,12)),4)
------------------------------------// 4、入度:指向自己的个数
graph.inDegrees.foreach(println) // (id,入度)
---------------------------
(4,1)
(3,4)
(2,4)
(5,5)
(6,5)
(1,5)
---------------------------// 5、出度:指向别人的个数
graph.outDegrees.foreach(println) // (id,出度)
---------------------------
(6,4)
(5,4)
(3,4)
(4,4)
(1,4)
(2,4)
---------------------------// 4,5组合:将入度和出度一起显示
graph.inDegrees.join(graph.outDegrees).foreach(println) // (id,(入度,出度))
---------------------------
(4,(1,4))
(3,(4,4))
(2,(4,4))
(1,(5,4))
(6,(5,4))
(5,(5,4))
---------------------------// 6、度:入度+出度
graph.degrees.foreach(println)
---------------------------
(3,8)
(4,5)
(5,9)
(6,9)
(2,8)
(1,9)
---------------------------
案例一:长一岁
// 长一岁:每个顶点中的人物的年龄长一岁
graph.mapVertices((id,t2)=>(id,(t2._1,t2._2+1))) // (id,(名字,年龄)).vertices.foreach(println)
--------------------------
(5,(5,(liu,20)))
(4,(4,(wang,18)))
(6,(6,(frimiku,19)))
(1,(1,(jack,23)))
(2,(2,(pola,13)))
(3,(3,(mike,27)))
--------------------------
案例二:找好友
val old: RDD[((VertexId, VertexId), Int)] = graph.edges.map(e => ((e.srcId, e.dstId), e.attr))
val cur: RDD[((VertexId, VertexId), Int)] = graph.edges.map(e => ((e.dstId, e.srcId), e.attr))
old.join(cur).filter(t=>{val a1: Int = t._2._1val a2: Int = t._2._2a1>=3 && a2>=3 && Math.abs(a1-a2)<=1})// ((源顶点(srcId),目标顶点(dstId)),(srcId至dstId的关系,dstId至srcId的关系)).foreach(println)
----------------------------
((6,3),(4,4))
((1,6),(4,3))
((6,1),(3,4))
((5,2),(4,4))
----------------------------
案例三:合并重复边(限同分区内)
// 合并重复边:【同分区】内相同边(顶点ID顺序值都相同【如:(1,2),(1,3)】)合并
graph.partitionBy(PartitionStrategy.EdgePartition2D,1).groupEdges((a, b)=>{val c: Int = a + bprintln(s"$a + $b = $c")c
}).triplets.foreach(println)
---------------------------
原有:Edge(1L,6L,4),Edge(1L,6L,4),
合并后:Edge(1,6,8)
---------------------------

http://www.ppmy.cn/devtools/146189.html

相关文章

《操作系统真象还原》第十章(二)—— 键盘驱动程序的编写与输入系统

章节任务介绍 在上一节中&#xff0c;我们介绍了操作系统的同步机制互斥锁的内容&#xff0c;并手动实现了互斥锁&#xff0c;同时实现了线程安全的屏幕打印。 至此&#xff0c;我们算是基本完成了操作系统的“输出”功能&#xff0c;但目前为止我们的输入仍旧依赖于程序&…

Bash 脚本教程

注&#xff1a;本文为 “Bash 脚本编写” 相关文章合辑。 BASH 脚本编写教程 as good as well于 2017-08-04 22:04:28 发布 这里有个老 American 写的 BASH 脚本编写教程&#xff0c;非常不错&#xff0c;至少没接触过 BASH 的也能看懂&#xff01; 建立一个脚本 Linux 中有…

面试知识点汇总_03

解释一下同步电路和异步电路 同步电路和异步电路是指同步时序电路和异步时序电路。由于存储电路中触发器的动作特点不同,因此可以把时序电路分为同步时序电路和异步时序电路两种。同步时序电路所有的触发器状态的变化都是在同一时钟信号操作下同时发生的;而在异步时序电路中…

FPGA多路MIPI转FPD-Link视频缩放拼接显示,基于IMX327+FPD953架构,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐本博主所有FPGA工程项目-->汇总目录我这里已有的 MIPI 编解码方案我这里已有的FPGA图像缩放方案本博已有的已有的FPGA视频拼接叠加融合方案 3、本 MIPI CSI-RX IP 介绍4、详细设计方案设计原理框图IMX327 及其配置FPD-Link视频…

JVM - JVM调优

JVM - JVM调优 文章目录 JVM - JVM调优一&#xff1a;JVM参数调优1&#xff1a;JVM选项规则2&#xff1a;JVM参数2.1&#xff1a;最常用的四个参数2.2&#xff1a;其他参数 3&#xff1a;补充说明和生产经验4&#xff1a;垃圾回收4.1&#xff1a;垃圾回收算法4.2&#xff1a;GC…

向bash shell脚本传参

例子&#xff1a; ~ script % touch parameter.sh ~ script % chmod 755 parameter.sh ~ % vim parameter.shparameter.sh: #!/usr/bin/env bashecho the name of current script is $0echo the first parameter is $1echo the second parameter is $2echo all parameters: $…

线性代数期末总复习的点点滴滴(1)

一、可逆矩阵、行列式、秩的关系 1.行列式与可逆矩阵的关系 所以&#xff0c;不难看出矩阵可逆的充分必要条件是该矩阵的行列式不为0。 2.接着来看&#xff0c;满秩和矩阵行列式的关系 不难看出满秩和行列式不为0是等价的。 3.再来看&#xff0c;满秩和矩阵可逆的关系 说明了…

大数据-258 离线数仓 - Griffin架构 配置安装 Livy 架构设计 解压配置 Hadoop Hive

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; Java篇开始了&#xff01; 目前开始更新 MyBatis&#xff0c;一起深入浅出&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff0…