GraphX 编程指南

GraphX

概述

GraphX 是 Spark 中用于图和图并行计算的新组件。从高层次看,GraphX 通过引入新的抽象(一种有向多重图,每个顶点和边都附带属性)来扩展 Spark RDD。为了支持图计算,GraphX 暴露了一组基本操作符(例如,子图joinVerticesaggregateMessages),以及 Pregel API 的优化变体。此外,GraphX 还包含不断增长的图算法构建器集合,以简化图分析任务。

入门

要开始使用,您首先需要将 Spark 和 GraphX 导入到您的项目中,如下所示:

import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

如果您不使用 Spark shell,您还需要一个 SparkContext。要了解更多关于 Spark 入门的信息,请参阅Spark 快速入门指南

属性图

属性图是一种有向多重图,每个顶点和边都附带用户定义的对象。有向多重图是一种有向图,可能存在共享相同源顶点和目标顶点的多个并行边。支持并行边的能力简化了建模场景,例如相同顶点之间可能存在多种关系(例如,同事和朋友)。每个顶点都由一个唯一的 64 位长标识符(VertexId)作为键。GraphX 不对顶点标识符施加任何排序约束。类似地,边也有对应的源顶点和目标顶点标识符。

属性图通过顶点(VD)和边(ED)类型进行参数化。这些分别是与每个顶点和边关联的对象的类型。

当顶点和边类型是基本数据类型(例如 int、double 等)时,GraphX 会优化其表示,通过将它们存储在专门的数组中来减少内存占用。

在某些情况下,可能需要同一图中存在具有不同属性类型的顶点。这可以通过继承来实现。例如,要将用户和产品建模为二分图,我们可以执行以下操作:

class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

与 RDD 类似,属性图是不可变的、分布式的且容错的。对图的值或结构进行更改是通过生成一个包含所需更改的新图来完成的。请注意,原始图的大部分(即未受影响的结构、属性和索引)在新图中被重用,从而降低了这种固有的函数式数据结构的成本。图使用一系列顶点分区启发式方法在执行器之间进行分区。与 RDD 一样,当发生故障时,图的每个分区都可以在不同的机器上重新创建。

逻辑上,属性图对应于一对类型化集合(RDD),编码了每个顶点和边的属性。因此,图类包含用于访问图的顶点和边的成员:

class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

VertexRDD[VD]EdgeRDD[ED] 分别扩展并优化了 RDD[(VertexId, VD)]RDD[Edge[ED]]VertexRDD[VD]EdgeRDD[ED] 都提供了围绕图计算构建的额外功能,并利用了内部优化。我们将在顶点和边 RDD 部分更详细地讨论 VertexRDDVertexRDDEdgeRDDEdgeRDD API,但目前它们可以简单地被视为形式为 RDD[(VertexId, VD)]RDD[Edge[ED]] 的 RDD。

属性图示例

假设我们想构建一个包含 GraphX 项目中各种合作者的属性图。顶点属性可能包含用户名和职业。我们可以用一个描述合作者之间关系的字符串来注释边。

The Property Graph

结果图将具有以下类型签名:

val userGraph: Graph[(String, String), String]

有多种方法可以从原始文件、RDD 甚至合成生成器构建属性图,这些将在图构建器部分更详细地讨论。最通用的方法可能是使用Graph 对象。例如,以下代码从 RDD 集合构建了一个图:

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

在上面的示例中,我们使用了Edge case class。边具有 srcIddstId,分别对应源顶点和目标顶点标识符。此外,Edge 类有一个 attr 成员,用于存储边属性。

我们可以分别使用 graph.verticesgraph.edges 成员将图分解为相应的顶点视图和边视图。

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

请注意,graph.vertices 返回一个 VertexRDD[(String, String)],它扩展了 RDD[(VertexId, (String, String))],因此我们使用 Scala 的 case 表达式来解构元组。另一方面,graph.edges 返回一个包含 Edge[String] 对象的 EdgeRDD。我们也可以使用 case class 类型构造函数,如下所示:

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

除了属性图的顶点和边视图之外,GraphX 还公开了三元组视图。三元组视图逻辑上连接了顶点和边属性,生成了一个包含EdgeTriplet 类实例的 RDD[EdgeTriplet[VD, ED]]。这个连接可以通过以下 SQL 表达式表示:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

或以图形方式表示为:

Edge Triplet

EdgeTriplet 类通过添加 srcAttrdstAttr 成员来扩展Edge 类,这些成员分别包含源属性和目标属性。我们可以使用图的三元组视图来渲染描述用户之间关系的一系列字符串。

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

图操作符

就像 RDD 具有 mapfilterreduceByKey 等基本操作一样,属性图也有一系列基本操作符,它们接受用户定义的函数并生成具有转换后属性和结构的新图。具有优化实现的核​​心操作符在Graph 中定义,而作为核心操作符组合表示的便捷操作符则在GraphOps 中定义。然而,由于 Scala 的隐式转换,GraphOps 中的操作符会自动作为 Graph 的成员可用。例如,我们可以通过以下方式计算每个顶点的入度(在 GraphOps 中定义):

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

区分核心图操作和GraphOps 的原因是为了将来能够支持不同的图表示。每种图表示都必须提供核心操作的实现,并重用GraphOps 中定义的许多有用操作。

操作符摘要列表

以下是GraphGraphOps 中定义的功能的快速摘要,为简单起见,此处以 Graph 成员的形式呈现。请注意,某些函数签名已简化(例如,删除了默认参数和类型约束),并且某些更高级的功能已被移除,因此请查阅 API 文档以获取官方操作列表。

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

属性操作符

与 RDD 的 map 操作符类似,属性图包含以下内容:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

每个操作符都会生成一个新图,其中顶点或边的属性已由用户定义的 map 函数修改。

请注意,在每种情况下,图结构都未受影响。这是这些操作符的一个关键特性,它允许结果图重用原始图的结构索引。以下代码片段在逻辑上是等效的,但第一个不保留结构索引,并且无法受益于 GraphX 系统优化。

val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)

相反,使用mapVertices 来保留索引。

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

这些操作符通常用于为特定计算初始化图或移除不必要的属性。例如,给定一个以出度作为顶点属性的图(我们稍后将描述如何构建这样的图),我们将其初始化用于 PageRank。

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

结构操作符

目前 GraphX 只支持一组常用的简单结构操作符,我们预计将来会添加更多。以下是基本结构操作符的列表。

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

reverse 操作符返回一个所有边方向都颠倒的新图。这在例如尝试计算逆 PageRank 时非常有用。因为反向操作不修改顶点或边属性,也不改变边的数量,所以可以高效实现,无需数据移动或复制。

subgraph 操作符接受顶点和边谓词,并返回仅包含满足顶点谓词(评估为 true)的顶点以及满足边谓词并连接满足顶点谓词的顶点的图。 subgraph 操作符可以在多种情况下使用,以将图限制为感兴趣的顶点和边,或消除断开的链接。例如,在以下代码中,我们移除了断开的链接:

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

请注意,在上面的示例中仅提供了顶点谓词。如果未提供顶点或边谓词,subgraph 操作符默认为 true

mask 操作符通过返回一个包含输入图中也存在的顶点和边的图来构建子图。这可以与 subgraph 操作符结合使用,以根据另一个相关图中的属性来限制图。例如,我们可以使用带有缺失顶点的图运行连通分量,然后将结果限制为有效子图。

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

groupEdges 操作符合并多重图中的并行边(即,顶点对之间的重复边)。在许多数值应用中,并行边可以被添加(它们的权重合并)到一条边中,从而减小图的大小。

Join 操作符

在许多情况下,需要将外部集合(RDD)中的数据与图连接。例如,我们可能有一些额外的用户属性想要与现有图合并,或者我们可能想将一个图中的顶点属性提取到另一个图。这些任务可以使用join 操作符来完成。下面我们列出了主要的 join 操作符:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

joinVertices 操作符将顶点与输入 RDD 连接,并返回一个新图,其中顶点属性是通过将用户定义的 map 函数应用于连接后的顶点结果而获得的。在 RDD 中没有匹配值的顶点将保留其原始值。

请注意,如果 RDD 对于给定顶点包含多个值,则只会使用其中一个。因此,建议使用以下方法使输入 RDD 唯一,这还将预索引结果值,以显著加速后续的连接。

val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

更通用的outerJoinVertices 的行为类似于 joinVertices,不同之处在于用户定义的 map 函数应用于所有顶点,并且可以更改顶点属性类型。由于并非所有顶点在输入 RDD 中都可能具有匹配值,因此 map 函数接受一个 Option 类型。例如,我们可以通过使用顶点的 outDegree 初始化顶点属性来为 PageRank 设置图。

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

您可能已经注意到上述示例中使用的多参数列表(例如,f(a)(b))柯里化函数模式。虽然我们也可以将 f(a)(b) 写成 f(a,b),但这将意味着 b 上的类型推断不依赖于 a。因此,用户需要为用户定义的函数提供类型注解。

val joinedGraph = graph.joinVertices(uniqueCosts,
  (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)

邻居聚合

在许多图分析任务中,关键一步是聚合每个顶点邻域的信息。例如,我们可能想知道每个用户拥有的关注者数量,或者每个用户关注者的平均年龄。许多迭代图算法(例如 PageRank、最短路径和连通分量)会重复聚合相邻顶点的属性(例如,当前的 PageRank 值、到源的最短路径和最小可达顶点 ID)。

为了提高性能,主要的聚合操作符已从 graph.mapReduceTriplets 更改为新的 graph.AggregateMessages。尽管 API 的更改相对较小,我们仍在此提供一份迁移指南。

消息聚合 (aggregateMessages)

GraphX 中的核心聚合操作是aggregateMessages。此操作符将用户定义的 sendMsg 函数应用于图中的每个边三元组,然后使用 mergeMsg 函数将这些消息聚合到它们的目标顶点。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

用户定义的 sendMsg 函数接受一个EdgeContext,它暴露了源和目标属性以及边属性,还有用于向源和目标属性发送消息的函数(sendToSrcsendToDst)。可以将 sendMsg 看作是 Map-Reduce 中的 map 函数。用户定义的 mergeMsg 函数接受发送到同一顶点的两条消息并生成一条消息。可以将 mergeMsg 看作是 Map-Reduce 中的 reduce 函数。aggregateMessages 操作符返回一个 VertexRDD[Msg],其中包含发送到每个顶点的聚合消息(类型为 Msg)。未收到消息的顶点不包含在返回的 VertexRDDVertexRDD 中。

此外,aggregateMessages 接受一个可选的 tripletsFields 参数,它指示在EdgeContext 中访问哪些数据(即,源顶点属性而非目标顶点属性)。tripletsFields 的可能选项在TripletFields 中定义,默认值为TripletFields.All,表示用户定义的 sendMsg 函数可以访问EdgeContext 中的任何字段。 tripletFields 参数可用于通知 GraphX 只需要EdgeContext 的一部分,从而允许 GraphX 选择优化的连接策略。例如,如果我们计算每个用户关注者的平均年龄,我们只需要源字段,因此我们将使用TripletFields.Src 来指示我们只需要源字段。

在 GraphX 的早期版本中,我们使用字节码检查来推断TripletFields,但我们发现字节码检查略不可靠,因此转而选择更明确的用户控制。

在以下示例中,我们使用aggregateMessages 操作符来计算每个用户较年长的关注者的平均年龄。

import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst((1, triplet.srcAttr))
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect().foreach(println(_))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala"。

aggregateMessages 操作在消息(和消息的总和)是固定大小(例如,浮点数和加法,而不是列表和连接)时表现最佳。

Map Reduce 三元组迁移指南 (遗留)

在 GraphX 的早期版本中,邻居聚合是通过使用 mapReduceTriplets 操作符完成的。

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]
}

mapReduceTriplets 操作符接受一个用户定义的 map 函数,该函数应用于每个三元组,并且可以产生消息,这些消息使用用户定义的 reduce 函数进行聚合。然而,我们发现返回的迭代器的使用成本很高,并且它限制了我们应用额外优化(例如,局部顶点重新编号)的能力。在aggregateMessages 中,我们引入了 EdgeContext,它暴露了三元组字段以及明确将消息发送到源和目标顶点的函数。此外,我们移除了字节码检查,而是要求用户指明三元组中实际需要的字段。

以下使用 mapReduceTriplets 的代码块:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

可以使用 aggregateMessages 重写为:

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

计算度信息

常见的聚合任务是计算每个顶点的度:即与每个顶点相邻的边的数量。在有向图的上下文中,通常需要知道每个顶点的入度、出度和总度。GraphOps 类包含一系列操作符来计算每个顶点的度。例如,在以下代码中,我们计算了最大入度、出度和总度:

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

收集邻居

在某些情况下,通过在每个顶点收集相邻顶点及其属性来表达计算可能更容易。这可以使用collectNeighborIdscollectNeighbors 操作符轻松完成。

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

这些操作符可能成本很高,因为它们会复制信息并需要大量的通信。如果可能,请尝试直接使用aggregateMessages 操作符来表达相同的计算。

缓存与取消缓存

在 Spark 中,RDD 默认不持久化在内存中。为了避免重复计算,多次使用时必须显式缓存它们(请参阅Spark 编程指南)。GraphX 中的图也是如此。当多次使用图时,请务必先在其上调用Graph.cache()

在迭代计算中,取消缓存也可能对于获得最佳性能是必需的。默认情况下,缓存的 RDD 和图将保留在内存中,直到内存压力迫使它们以 LRU 顺序被逐出。对于迭代计算,先前迭代的中间结果将填满缓存。尽管它们最终会被逐出,但内存中存储的不必要数据会减慢垃圾回收。更有效的方法是在中间结果不再需要时立即取消缓存它们。这涉及在每次迭代中具体化(缓存和强制执行)图或 RDD,取消缓存所有其他数据集,并且只在未来迭代中使用具体化的数据集。然而,由于图由多个 RDD 组成,因此很难正确地取消持久化它们。对于迭代计算,我们建议使用 Pregel API,它可以正确地取消持久化中间结果。

Pregel API

图本质上是递归数据结构,因为顶点的属性依赖于其邻居的属性,而邻居的属性又依赖于它们的邻居的属性。因此,许多重要的图算法会迭代地重新计算每个顶点的属性,直到达到一个固定点条件。已经提出了多种图并行抽象来表达这些迭代算法。GraphX 暴露了 Pregel API 的一个变体。

从高层次看,GraphX 中的 Pregel 操作符是一种批量同步并行消息抽象,受限于图的拓扑结构。Pregel 操作符在一系列超级步骤中执行,其中顶点接收来自上一个超级步骤的入站消息的总和,计算顶点属性的新值,然后将消息发送给下一个超级步骤中的相邻顶点。与 Pregel 不同,消息是并行计算的,作为边三元组的函数,并且消息计算可以访问源和目标顶点属性。未收到消息的顶点在超级步骤中会被跳过。当没有剩余消息时,Pregel 操作符终止迭代并返回最终图。

请注意,与更标准的 Pregel 实现不同,GraphX 中的顶点只能向相邻顶点发送消息,并且消息构建是使用用户定义的消息函数并行完成的。这些约束允许 GraphX 进行额外的优化。

以下是Pregel 操作符的类型签名及其实现的草图(注意:为避免因冗长血缘链导致的 stackOverflowError,Pregel 通过将“spark.graphx.pregel.checkpointInterval”设置为正数(例如 10)来定期检查点图和消息。同时使用 SparkContext.setCheckpointDir(directory: String) 设置检查点目录)。

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()

    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

请注意,Pregel 接受两个参数列表(即 graph.pregel(list1)(list2))。第一个参数列表包含配置参数,包括初始消息、最大迭代次数以及发送消息的边方向(默认为沿出边)。第二个参数列表包含用户定义的功能,用于接收消息(顶点程序 vprog)、计算消息(sendMsg)和组合消息(mergeMsg)。

在以下示例中,我们可以使用 Pregel 操作符来表达诸如单源最短路径之类的计算。

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect().mkString("\n"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala"。

图构建器

GraphX 提供了几种方法,可以从 RDD 或磁盘上的顶点和边集合构建图。默认情况下,所有图构建器都不会对图的边进行重新分区;相反,边保留在其默认分区中(例如它们在 HDFS 中的原始块)。Graph.groupEdges 要求图被重新分区,因为它假定相同的边将位于同一分区上,因此在调用 groupEdges 之前,必须调用Graph.partitionBy

object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}

GraphLoader.edgeListFile 提供了一种从磁盘上的边列表加载图的方法。它解析以下形式的(源顶点 ID,目标顶点 ID)对的邻接列表,跳过以 # 开头的注释行:

# This is a comment
2 1
4 1
1 2

它从指定的边创建了一个 Graph,自动创建了边中提及的任何顶点。所有顶点和边属性默认为 1。canonicalOrientation 参数允许将边重新定向到正向(srcId < dstId),这是连通分量算法所必需的。minEdgePartitions 参数指定要生成的最小边分区数;如果例如 HDFS 文件有更多块,则边分区可能多于指定数量。

object Graph {
  def apply[VD, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}

Graph.apply 允许从顶点和边的 RDD 创建图。重复的顶点是任意选择的,在边 RDD 中找到但不在顶点 RDD 中的顶点被分配默认属性。

Graph.fromEdges 允许仅从边的 RDD 创建图,自动创建边中提及的任何顶点并为其分配默认值。

Graph.fromEdgeTuples 允许仅从边元组的 RDD 创建图,将边赋值为 1,并自动创建边中提及的任何顶点并为其分配默认值。它还支持去重边;要进行去重,请将PartitionStrategySome 作为 uniqueEdges 参数传递(例如,uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。分区策略对于将相同的边放在同一分区上以便进行去重是必需的。

顶点和边 RDD

GraphX 公开了图中存储的顶点和边的 RDD 视图。然而,由于 GraphX 以优化的数据结构维护顶点和边,并且这些数据结构提供了额外功能,因此顶点和边分别作为 VertexRDDVertexRDDEdgeRDDEdgeRDD 返回。在本节中,我们将回顾这些类型中一些有用的额外功能。请注意,这只是一个不完整的列表,请参阅 API 文档以获取官方操作列表。

VertexRDD

VertexRDD[A] 扩展了 RDD[(VertexId, A)],并增加了每个 VertexId 只出现一次的额外约束。此外,VertexRDD[A] 表示一个顶点集,每个顶点都具有类型 A 的属性。在内部,这是通过将顶点属性存储在可重用的哈希图数据结构中实现的。因此,如果两个 VertexRDDs 派生自相同的基本 VertexRDDVertexRDD(例如,通过 filtermapValues),它们可以在常量时间内连接,而无需进行哈希评估。为了利用这种索引数据结构,VertexRDDVertexRDD 暴露了以下额外功能:

class VertexRDD[VD] extends RDD[(VertexId, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Show only vertices unique to this set based on their VertexId's
  def minus(other: RDD[(VertexId, VD)])
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

例如,请注意 filter 操作符如何返回一个 VertexRDDVertexRDD。Filter 实际上是使用 BitSet 实现的,从而重用索引并保留与其他 VertexRDDs 进行快速连接的能力。同样,mapValues 操作符不允许 map 函数更改 VertexId,从而能够重用相同的 HashMap 数据结构。leftJoininnerJoin 都能够识别何时连接来自相同 HashMap 的两个 VertexRDDs,并通过线性扫描而不是昂贵的点查找来实现连接。

aggregateUsingIndex 操作符对于从 RDD[(VertexId, A)] 高效构建新的 VertexRDDVertexRDD 非常有用。从概念上讲,如果我已经在某个顶点集上构建了一个 VertexRDD[B]该顶点集是某个 RDD[(VertexId, A)] 中的顶点集的超集,那么我就可以重用索引来聚合并随后索引 RDD[(VertexId, A)]。例如:

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDD

扩展了 RDD[Edge[ED]]EdgeRDD[ED] 使用PartitionStrategy 中定义的各种分区策略之一将边组织成块。在每个分区内,边属性和邻接结构分别存储,从而在更改属性值时实现最大程度的重用。

EdgeRDDEdgeRDD 暴露的三个附加函数是:

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在大多数应用程序中,我们发现对 EdgeRDDEdgeRDD 的操作通过图操作符完成,或依赖于基本 RDD 类中定义的操作。

优化表示

虽然分布式图的 GraphX 表示中使用的优化细节超出了本指南的范围,但一些高层次的理解可能有助于可伸缩算法的设计以及 API 的优化使用。GraphX 采用顶点切割方法进行分布式图分区。

Edge Cut vs. Vertex Cut

GraphX 不是沿边分割图,而是沿顶点分区图,这可以减少通信和存储开销。逻辑上,这对应于将边分配给机器并允许顶点跨越多个机器。边分配的具体方法取决于PartitionStrategy,各种启发式方法之间存在权衡。用户可以通过使用Graph.partitionBy 操作符对图进行重新分区来选择不同的策略。默认分区策略是使用图构建时提供的边的初始分区。然而,用户可以轻松切换到 2D 分区或 GraphX 中包含的其他启发式方法。

RDD Graph Representation

一旦边被分区,高效图并行计算的关键挑战就是将顶点属性与边高效地连接。由于现实世界中的图通常边多于顶点,我们将顶点属性移动到边上。由于并非所有分区都包含与所有顶点相邻的边,我们在内部维护一个路由表,用于在实现 tripletsaggregateMessages 等操作所需的连接时识别广播顶点的位置。

图算法

GraphX 包含一组图算法来简化分析任务。这些算法包含在 org.apache.spark.graphx.lib 包中,可以通过GraphOps 直接作为 Graph 的方法访问。本节描述了这些算法及其使用方法。

PageRank

PageRank 衡量图中每个顶点的重要性,假设从 uv 的边表示 uv 重要性的认可。例如,如果一个 Twitter 用户被许多其他人关注,则该用户将获得高排名。

GraphX 提供了 PageRank 的静态和动态实现,作为PageRank 对象的方法。静态 PageRank 运行固定次数的迭代,而动态 PageRank 则运行直到排名收敛(即,变化量不超过指定容差)。GraphOps 允许直接将这些算法作为 Graph 的方法调用。

GraphX 还包含一个社交网络数据集示例,我们可以在其上运行 PageRank。一组用户在 data/graphx/users.txt 中给出,用户之间的关系集在 data/graphx/followers.txt 中给出。我们按如下方式计算每个用户的 PageRank:

import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala"。

连通分量

连通分量算法用每个连通分量中编号最小的顶点的 ID 标记该连通分量。例如,在社交网络中,连通分量可以近似地表示集群。GraphX 在ConnectedComponents 对象中包含了该算法的实现,我们从PageRank 部分的社交网络数据集示例中计算连通分量,如下所示:

import org.apache.spark.graphx.GraphLoader

// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
  case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala"。

三角形计数

当一个顶点有两个相邻顶点且它们之间存在边时,该顶点就构成一个三角形的一部分。GraphX 在TriangleCount 对象中实现了一种三角形计数算法,该算法确定通过每个顶点的三角形数量,提供了衡量聚类的指标。我们计算PageRank 部分的社交网络数据集的三角形计数。请注意,TriangleCount 要求边处于规范方向(srcId < dstId)并且图已使用Graph.partitionBy 进行分区。

import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}

// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
  .partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  (username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala"。

示例

假设我想从一些文本文件构建一个图,将该图限制为重要的关系和用户,对子图运行 PageRank,然后最终返回与顶部用户相关的属性。我可以用 GraphX 在几行代码中完成所有这些。

import org.apache.spark.graphx.GraphLoader

// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  // Some users may not have attributes so we set them as empty
  case (uid, deg, None) => Array.empty[String]
}

// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala"。