GraphX 编程指南
概述
GraphX 是 Spark 中用于图和图并行计算的新组件。 从高层次上讲,GraphX 通过引入新的 Graph 抽象来扩展 Spark RDD:一个有向多重图,每个顶点和边都附加了属性。 为了支持图计算,GraphX 公开了一组基本运算符(例如,subgraph、joinVertices 和 aggregateMessages)以及 Pregel API 的优化变体。 此外,GraphX 还包含一个不断增长的图算法和构建器集合,以简化图分析任务。
入门
要开始使用,您首先需要将 Spark 和 GraphX 导入到您的项目中,如下所示
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
如果您不使用 Spark shell,您还需要一个 SparkContext
。 要了解有关 Spark 入门的更多信息,请参阅Spark 快速入门指南。
属性图
属性图是一个有向多重图,每个顶点和边都附加了用户定义的对象。 有向多重图是一个有向图,可能具有共享相同源和目标顶点的多个平行边。 支持平行边的能力简化了可以在相同顶点之间存在多个关系(例如,同事和朋友)的建模场景。 每个顶点都由唯一的 64 位长整型标识符 (VertexId
) 键控。 GraphX 不对顶点标识符施加任何排序约束。 类似地,边具有相应的源和目标顶点标识符。
属性图通过顶点 (VD
) 和边 (ED
) 类型进行参数化。 这些分别是与每个顶点和边关联的对象类型。
当顶点和边类型是原始数据类型(例如,int、double 等)时,GraphX 会优化顶点和边类型的表示,通过将它们存储在专用数组中来减少内存占用。
在某些情况下,可能需要在同一图中具有具有不同属性类型的顶点。 这可以通过继承来实现。 例如,要将用户和产品建模为二分图,我们可以执行以下操作
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
与 RDD 类似,属性图是不可变的、分布式的和容错的。 通过生成具有所需更改的新图来完成对图的值或结构的更改。 请注意,原始图的很大一部分(即,未受影响的结构、属性和索引)在新图中被重用,从而降低了这种固有功能数据结构的成本。 该图使用一系列顶点分区启发式方法在执行器之间进行分区。 与 RDD 类似,如果发生故障,可以不同机器上重新创建图的每个分区。
从逻辑上讲,属性图对应于一对类型化的集合 (RDD),用于编码每个顶点和边的属性。 因此,图类包含访问图的顶点和边的成员
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
类 VertexRDD[VD]
和 EdgeRDD[ED]
扩展了 RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
并且是它们的优化版本。 VertexRDD[VD]
和 EdgeRDD[ED]
都提供了围绕图计算构建的附加功能,并利用了内部优化。 我们将在有关 顶点和边 RDD 的部分中更详细地讨论 VertexRDD
VertexRDD 和 EdgeRDD
EdgeRDD API,但目前可以将它们简单地视为以下形式的 RDD:RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
。
属性图示例
假设我们要构建一个包含 GraphX 项目的各种协作者的属性图。 顶点属性可能包含用户名和职业。 我们可以用描述协作者之间关系的字符串来注释边
生成的图将具有类型签名
val userGraph: Graph[(String, String), String]
有很多方法可以从原始文件、RDD 甚至合成生成器构造属性图,这些将在图构建器部分中进行更详细的讨论。 可能最通用的方法是使用 Graph 对象。 例如,以下代码从 RDD 的集合构造一个图
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
在上面的示例中,我们使用了 Edge
case 类。 边具有与源和目标顶点标识符相对应的 srcId
和 dstId
。 此外,Edge
类具有存储边属性的 attr
成员。
我们可以分别使用 graph.vertices
和 graph.edges
成员将图解构为各自的顶点和边视图。
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
请注意,
graph.vertices
返回一个VertexRDD[(String, String)]
,它扩展了RDD[(VertexId, (String, String))]
,因此我们使用 scalacase
表达式来解构元组。 另一方面,graph.edges
返回一个包含Edge[String]
对象的EdgeRDD
。 我们也可以使用 case 类类型构造函数,如下所示
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
除了属性图的顶点和边视图之外,GraphX 还公开了一个三元组视图。 三元组视图在逻辑上连接顶点和边属性,从而生成一个包含 EdgeTriplet
类实例的 RDD[EdgeTriplet[VD, ED]]
。 此连接可以用以下 SQL 表达式表示
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
或以图形方式表示为
EdgeTriplet
类通过添加包含源和目标属性的 srcAttr
和 dstAttr
成员来扩展 Edge
类。 我们可以使用图的三元组视图来呈现描述用户之间关系的字符串集合。
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
图运算符
正如 RDD 具有诸如 map
、filter
和 reduceByKey
之类的基本操作一样,属性图也有一系列基本运算符,这些运算符采用用户定义的函数并生成具有转换属性和结构的新图。 具有优化实现的核心运算符在 Graph
中定义,而表示为核心运算符组合的便捷运算符在 GraphOps
中定义。 但是,由于 Scala 隐式转换,GraphOps
中的运算符自动可用作 Graph
的成员。 例如,我们可以通过以下方式计算每个顶点的入度(在 GraphOps
中定义)
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
区分核心图操作和 GraphOps
的原因是能够在将来支持不同的图表示。 每个图表示都必须提供核心操作的实现,并重用在 GraphOps
中定义的许多有用操作。
运算符摘要列表
以下是 Graph
和 GraphOps
中定义的功能的快速摘要,但为了简单起见,将其表示为 Graph 的成员。 请注意,某些函数签名已简化(例如,删除了默认参数和类型约束),并且删除了一些更高级的功能,因此请查阅 API 文档以获取操作的官方列表。
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}
属性运算符
与 RDD map
运算符一样,属性图包含以下内容
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
这些运算符中的每一个都会生成一个新图,该图的顶点或边属性由用户定义的 map
函数修改。
请注意,在每种情况下,图的结构都不受影响。这是这些算子的一个关键特性,允许生成的图重用原始图的结构索引。以下代码片段在逻辑上是等效的,但第一个片段不保留结构索引,因此无法从 GraphX 系统的优化中受益。
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
相反,使用
mapVertices
来保留索引。
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
这些算子通常用于初始化图以进行特定计算,或去除不必要的属性。例如,给定一个以顶点出度作为顶点属性的图(我们稍后会介绍如何构造这样的图),我们可以将其初始化为 PageRank。
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
结构运算符
目前,GraphX 仅支持一组常用的结构算子,我们希望将来添加更多。以下是基本结构算子的列表。
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
reverse
算子返回一个所有边方向都反转的新图。例如,在尝试计算反向 PageRank 时,这可能很有用。由于 reverse 操作不修改顶点或边属性,也不改变边的数量,因此可以高效地实现,无需数据移动或复制。
subgraph
算子接受顶点和边谓词,并返回仅包含满足顶点谓词(求值为 true)的顶点以及满足边谓词且连接满足顶点谓词的顶点的图。subgraph
算子可用于多种情况,以将图限制为感兴趣的顶点和边,或消除断开的链接。例如,在下面的代码中,我们删除断开的链接。
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
请注意,在上面的示例中,仅提供了顶点谓词。如果未提供顶点或边谓词,则
subgraph
算子默认为true
。
mask
算子通过返回一个包含也在输入图中找到的顶点和边的图来构造子图。这可以与 subgraph
算子结合使用,以基于另一个相关图中的属性来限制图。例如,我们可以使用具有缺失顶点的图运行连通分量算法,然后将答案限制为有效的子图。
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
groupEdges
算子合并多重图中的并行边(即,顶点对之间的重复边)。在许多数值应用中,可以将并行边相加(组合它们的权重)成一条边,从而减小图的大小。
连接运算符
在许多情况下,需要将来自外部集合(RDD)的数据与图连接。例如,我们可能有一些额外的用户属性想要与现有的图合并,或者我们可能想要将顶点属性从一个图拉到另一个图中。这些任务可以使用 join 算子完成。下面我们列出了关键的 join 算子。
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
joinVertices
算子将顶点与输入 RDD 连接,并返回一个新图,其顶点属性是通过将用户定义的 map
函数应用于连接的顶点的结果获得的。RDD 中没有匹配值的顶点保留其原始值。
请注意,如果 RDD 包含给定顶点的多个值,则只会使用一个值。因此,建议使用以下方法使输入 RDD 唯一,这也将预索引结果值,以大大加快后续连接。
val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
更通用的 outerJoinVertices
的行为与 joinVertices
类似,只是用户定义的 map
函数应用于所有顶点,并且可以更改顶点属性类型。由于并非所有顶点在输入 RDD 中都可能具有匹配的值,因此 map
函数采用 Option
类型。例如,我们可以通过使用顶点的 outDegree
初始化顶点属性来设置 PageRank 的图。
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // No outDegree means zero outDegree
}
}
您可能已经注意到上面示例中使用的多个参数列表(例如,
f(a)(b)
)柯里化函数模式。虽然我们同样可以将f(a)(b)
编写为f(a,b)
,但这将意味着b
上的类型推断不依赖于a
。因此,用户需要为用户定义的函数提供类型注解。
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
邻域聚合
许多图分析任务中的一个关键步骤是聚合每个顶点邻域的信息。例如,我们可能想知道每个用户拥有的关注者数量,或者每个用户的关注者的平均年龄。许多迭代图算法(例如,PageRank、最短路径和连通分量)重复聚合相邻顶点的属性(例如,当前 PageRank 值、到源的最短路径和最小的可达顶点 ID)。
为了提高性能,主要的聚合算子从
graph.mapReduceTriplets
更改为新的graph.AggregateMessages
。虽然 API 中的更改相对较小,但我们在下面提供了过渡指南。
聚合消息 (aggregateMessages)
GraphX 中的核心聚合操作是 aggregateMessages
。此算子将用户定义的 sendMsg
函数应用于图中的每个边三元组,然后使用 mergeMsg
函数将这些消息聚合到它们的目标顶点。
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
用户定义的 sendMsg
函数采用 EdgeContext
,它公开源和目标属性以及边属性和函数(sendToSrc
和 sendToDst
)以将消息发送到源和目标属性。将 sendMsg
视为 map-reduce 中的 map 函数。用户定义的 mergeMsg
函数采用两个发往同一顶点的消息,并产生一条消息。将 mergeMsg
视为 map-reduce 中的 reduce 函数。aggregateMessages
算子返回一个 VertexRDD[Msg]
,其中包含发往每个顶点的聚合消息(类型为 Msg
)。未收到消息的顶点不包含在返回的 VertexRDD
VertexRDD 中。
此外,aggregateMessages
接受一个可选的 tripletsFields
,它指示在 EdgeContext
中访问哪些数据(即,源顶点属性,但不包括目标顶点属性)。tripletsFields
的可能选项在 TripletFields
中定义,默认值为 TripletFields.All
,它指示用户定义的 sendMsg
函数可以访问 EdgeContext
中的任何字段。tripletFields
参数可用于通知 GraphX 只需要 EdgeContext
的一部分,允许 GraphX 选择优化的连接策略。例如,如果我们要计算每个用户的关注者的平均年龄,我们只需要源字段,因此我们将使用 TripletFields.Src
来指示我们只需要源字段。
在早期版本的 GraphX 中,我们使用字节码检查来推断
TripletFields
,但我们发现字节码检查略微不可靠,因此选择了更明确的用户控制。
在下面的示例中,我们使用 aggregateMessages
算子来计算每个用户的年长关注者的平均年龄。
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst((1, triplet.srcAttr))
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
当消息(以及消息的总和)的大小恒定时,
aggregateMessages
操作表现最佳(例如,浮点数和加法,而不是列表和连接)。
Map Reduce Triplets 迁移指南 (旧版)
在早期版本的 GraphX 中,邻域聚合是使用 mapReduceTriplets
算子完成的。
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
mapReduceTriplets
操作符接受一个用户定义的 map 函数,该函数应用于每个三元组,并且可以产生 *消息*,这些消息使用用户定义的 reduce
函数进行聚合。然而,我们发现返回的迭代器的使用成本很高,并且它阻碍了我们应用额外的优化(例如,本地顶点重编号)。在 aggregateMessages
中,我们引入了 EdgeContext,它公开了三元组字段,以及显式地向源顶点和目标顶点发送消息的函数。此外,我们移除了字节码检查,而是要求用户指明三元组中实际需要的字段。
以下代码块使用 mapReduceTriplets
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
可以使用 aggregateMessages
重写为
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
计算度信息
一个常见的聚合任务是计算每个顶点的度:与每个顶点相邻的边的数量。在有向图的上下文中,通常需要知道每个顶点的入度、出度和总度。GraphOps
类包含一组用于计算每个顶点度的操作符。例如,在下面我们计算最大入度、出度和总度
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
收集邻居
在某些情况下,通过在每个顶点收集相邻顶点及其属性来表达计算可能更容易。可以使用 collectNeighborIds
和 collectNeighbors
操作符轻松完成此操作。
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
这些操作符的代价可能非常高,因为它们会复制信息并需要大量的通信。如果可能,请尝试使用
aggregateMessages
操作符直接表达相同的计算。
缓存和取消缓存
在 Spark 中,RDD 默认不持久化在内存中。为了避免重新计算,在多次使用它们时必须显式地缓存它们(参见 Spark 编程指南)。GraphX 中的图的行为方式相同。**当多次使用一个图时,请务必首先调用 Graph.cache()
。**
在迭代计算中,*取消缓存* 对于获得最佳性能也是必要的。默认情况下,缓存的 RDD 和图将保留在内存中,直到内存压力迫使它们以 LRU 顺序被逐出。对于迭代计算,先前迭代的中间结果将填满缓存。尽管它们最终会被逐出,但存储在内存中的不必要数据会减慢垃圾回收的速度。立即取消缓存不再需要的中间结果会更有效。这涉及在每次迭代中物化(缓存和强制执行)一个图或 RDD,取消缓存所有其他数据集,并且仅在未来的迭代中使用物化的数据集。但是,由于图由多个 RDD 组成,因此很难正确地取消持久化它们。**对于迭代计算,我们建议使用 Pregel API,它会正确地取消持久化中间结果。**
Pregel API
图本质上是递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又取决于 *其* 邻居的属性。因此,许多重要的图算法会迭代地重新计算每个顶点的属性,直到达到固定点条件。已经提出了各种图并行抽象来表达这些迭代算法。GraphX 公开了一个 Pregel API 的变体。
在较高的层次上,GraphX 中的 Pregel 操作符是一种批量同步并行消息传递抽象,*约束到图的拓扑*。Pregel 操作符在一系列超级步骤中执行,其中顶点接收来自前一个超级步骤的入站消息的 *总和*,计算顶点属性的新值,然后在下一个超级步骤中将消息发送到相邻顶点。与 Pregel 不同,消息是作为边三元组的函数并行计算的,并且消息计算可以访问源顶点和目标顶点属性。在超级步骤中跳过未收到消息的顶点。当没有剩余消息时,Pregel 操作符终止迭代并返回最终图。
请注意,与更标准的 Pregel 实现不同,GraphX 中的顶点只能将消息发送到相邻顶点,并且消息构建是使用用户定义的消息传递函数并行完成的。这些约束允许在 GraphX 中进行额外的优化。
以下是 Pregel 操作符 的类型签名以及其实现的 *草图* (注意:为了避免由于长 lineage 链引起的 stackOverflowError,pregel 支持通过将 “spark.graphx.pregel.checkpointInterval” 设置为正数(例如 10)来定期检查点图和消息。并使用 SparkContext.setCheckpointDir(directory: String) 设置检查点目录)
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
请注意,Pregel 接受两个参数列表(即,graph.pregel(list1)(list2)
)。第一个参数列表包含配置参数,包括初始消息、最大迭代次数和发送消息的边方向(默认为沿着出边)。第二个参数列表包含用户定义的用于接收消息(顶点程序 vprog
)、计算消息 (sendMsg
) 和合并消息 mergeMsg
的函数。
我们可以使用 Pregel 操作符来表达诸如以下示例中的单源最短路径之类的计算。
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
图构建器
GraphX 提供了几种从 RDD 或磁盘中的顶点和边的集合构建图的方法。默认情况下,没有一个图构建器会重新分区图的边;而是将边保留在其默认分区中(例如,它们在 HDFS 中的原始块)。Graph.groupEdges
需要重新分区图,因为它假定相同的边将位于同一分区上,因此你必须在调用 groupEdges
之前调用 Graph.partitionBy
。
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
}
GraphLoader.edgeListFile
提供了一种从磁盘上的边列表中加载图的方法。它解析以下形式的(源顶点 ID,目标顶点 ID)对的邻接表,跳过以 #
开头的注释行
# This is a comment
2 1
4 1
1 2
它从指定的边创建一个 Graph
,自动创建边提到的任何顶点。所有顶点和边属性默认为 1。 canonicalOrientation
参数允许在正方向上重新定向边 (srcId < dstId
),这是 connected components 算法所必需的。 minEdgePartitions
参数指定要生成的最小边分区数;如果 HDFS 文件有更多块,则可能存在比指定的更多的边分区。
object Graph {
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
def fromEdges[VD, ED](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
Graph.apply
允许从顶点和边的 RDD 创建图。重复的顶点被任意选择,并且在边 RDD 中找到但不在顶点 RDD 中的顶点被分配默认属性。
Graph.fromEdges
允许仅从边的 RDD 创建图,自动创建边提到的任何顶点并为其分配默认值。
Graph.fromEdgeTuples
允许仅从边元组的 RDD 创建图,为边分配值 1,并自动创建边提到的任何顶点并为其分配默认值。它还支持删除重复的边;要删除重复项,请传递 PartitionStrategy
的 Some
作为 uniqueEdges
参数(例如,uniqueEdges = Some(PartitionStrategy.RandomVertexCut)
)。分区策略对于将相同的边放置在同一分区上以便可以删除重复项是必要的。
顶点和边 RDD
GraphX 公开了存储在图中的顶点和边的 RDD
视图。但是,由于 GraphX 将顶点和边维护在优化的数据结构中,并且这些数据结构提供了额外的功能,因此顶点和边分别作为 VertexRDD
VertexRDD 和 EdgeRDD
EdgeRDD 返回。在本节中,我们将回顾这些类型中的一些额外的有用功能。请注意,这只是一个不完整的列表,请参阅 API 文档以获取操作的官方列表。
VertexRDD
VertexRDD[A]
扩展了 RDD[(VertexId, A)]
,并添加了每个 VertexId
仅出现 *一次* 的额外约束。此外,VertexRDD[A]
表示一个顶点 *集合*,每个顶点都具有类型 A
的属性。在内部,这是通过将顶点属性存储在可重用的哈希映射数据结构中来实现的。因此,如果两个 VertexRDD
是从同一个基本 VertexRDD
VertexRDD 派生的(例如,通过 filter
或 mapValues
),它们可以在恒定时间内连接,而无需哈希评估。为了利用这种索引的数据结构,VertexRDD
VertexRDD 公开了以下附加功能
class VertexRDD[VD] extends RDD[(VertexId, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Show only vertices unique to this set based on their VertexId's
def minus(other: RDD[(VertexId, VD)])
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
例如,请注意 filter
算子如何返回一个 VertexRDD
VertexRDD。 实际上,Filter 使用 BitSet
实现,从而重用索引并保留与其他 VertexRDD
进行快速连接的能力。 同样,mapValues
算子不允许 map
函数更改 VertexId
,从而能够重用相同的 HashMap
数据结构。leftJoin
和 innerJoin
都能识别何时连接来自同一 HashMap
的两个 VertexRDD
,并通过线性扫描而不是代价高昂的点查找来实现连接。
aggregateUsingIndex
算子对于从 RDD[(VertexId, A)]
有效地构造新的 VertexRDD
VertexRDD 非常有用。 从概念上讲,如果我在一组顶点上构造了一个 VertexRDD[B]
,*它是某个 RDD[(VertexId, A)]
中顶点的超集*,那么我可以重用该索引来聚合,然后对 RDD[(VertexId, A)]
进行索引。 例如
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
EdgeRDD
EdgeRDD[ED]
扩展了 RDD[Edge[ED]]
,它使用 PartitionStrategy
中定义的各种分区策略之一,以块的形式组织边。 在每个分区中,边属性和邻接结构分别存储,从而在更改属性值时实现最大程度的重用。
EdgeRDD
EdgeRDD 公开的三个附加函数是
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
在大多数应用程序中,我们发现对 EdgeRDD
EdgeRDD 的操作是通过图算子完成的,或者依赖于基本 RDD
类中定义的操作。
优化表示
虽然对 GraphX 分布式图表示中使用的优化进行详细描述超出了本指南的范围,但一些高级理解可能有助于设计可扩展的算法以及优化 API 的使用。 GraphX 采用顶点切割方法进行分布式图分区
GraphX 不是沿边分割图,而是沿顶点对图进行分区,这可以减少通信和存储开销。 从逻辑上讲,这对应于将边分配给机器,并允许顶点跨越多台机器。 分配边的确切方法取决于 PartitionStrategy
,并且各种启发式方法都有一些权衡。 用户可以通过使用 Graph.partitionBy
算子对图进行重新分区,从而在不同的策略之间进行选择。 默认分区策略是使用图构造时提供的边的初始分区。 但是,用户可以轻松切换到 2D 分区或 GraphX 中包含的其他启发式方法。
一旦对边进行了分区,高效图并行计算的关键挑战就是有效地将顶点属性与边连接起来。 因为现实世界的图中通常边的数量多于顶点的数量,所以我们将顶点属性移动到边。 因为并非所有分区都将包含与所有顶点相邻的边,所以我们在内部维护一个路由表,该表标识在实现诸如 triplets
和 aggregateMessages
之类的操作所需的连接时,在哪里广播顶点。
图算法
GraphX 包括一组图算法,以简化分析任务。 这些算法包含在 org.apache.spark.graphx.lib
包中,并且可以通过 GraphOps
作为 Graph
上的方法直接访问。 本节介绍这些算法以及如何使用它们。
PageRank
PageRank 衡量图中每个顶点的重要性,假设从 *u* 到 *v* 的边表示 *u* 对 *v* 的重要性的认可。 例如,如果一个 Twitter 用户被许多其他人关注,则该用户将被排名很高。
GraphX 带有 PageRank 的静态和动态实现,作为 PageRank
对象上的方法。 静态 PageRank 运行固定次数的迭代,而动态 PageRank 运行直到排名收敛(即,停止更改超过指定的容差)。 GraphOps
允许直接在 Graph
上调用这些算法作为方法。
GraphX 还包括一个示例社交网络数据集,我们可以在其上运行 PageRank。 一组用户在 data/graphx/users.txt
中给出,一组用户之间的关系在 data/graphx/followers.txt
中给出。 我们按如下方式计算每个用户的 PageRank
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
连通分量
连接组件算法用其编号最低的顶点的 ID 标记图的每个连接组件。 例如,在社交网络中,连接的组件可以近似于集群。 GraphX 在 ConnectedComponents
对象中包含该算法的实现,并且我们按如下方式计算来自 PageRank 部分的示例社交网络数据集的连接组件
import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
三角形计数
当一个顶点具有两个相邻顶点,并且它们之间存在一条边时,该顶点就是三角形的一部分。 GraphX 在 TriangleCount
对象中实现了一个三角形计数算法,该算法确定通过每个顶点的三角形的数量,从而提供了一种聚类度量。 我们计算来自 PageRank 部分的社交网络数据集的三角形计数。 *请注意,TriangleCount
要求边处于规范方向 (srcId < dstId
),并且该图使用 Graph.partitionBy
进行分区。*
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
示例
假设我想从一些文本文件中构建一个图,将该图限制为重要的关系和用户,在子图上运行 page-rank,然后最终返回与顶级用户关联的属性。 我可以使用 GraphX 只需几行代码即可完成所有这些操作
import org.apache.spark.graphx.GraphLoader
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))