GraphX 编程指南

GraphX

概述

GraphX 是 Spark 中用于图和图并行计算的新组件。从高层次来看,GraphX 通过引入新的 抽象来扩展 Spark RDD:一个有向多图,每个顶点和边都附带属性。为了支持图计算,GraphX 公开了许多基本操作符(例如,子图joinVerticesaggregateMessages),以及 Pregel API 的优化变体。此外,GraphX 还包含不断增长的图 算法构建器 集合,以简化图分析任务。

入门

要开始使用,您首先需要将 Spark 和 GraphX 导入您的项目,如下所示

import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

如果您没有使用 Spark shell,您还需要一个 SparkContext。要了解有关使用 Spark 入门的更多信息,请参阅 Spark 快速入门指南

属性图

属性图 是一个有向多图,每个顶点和边都附带用户定义的对象。有向多图是有向图,可能有多个平行边共享相同的源顶点和目标顶点。支持平行边的能力简化了对可能存在相同顶点之间多个关系(例如,同事和朋友)的建模场景。每个顶点都由一个唯一的 64 位长标识符 (VertexId) 作为键。GraphX 不会对顶点标识符施加任何排序约束。类似地,边具有相应的源顶点和目标顶点标识符。

属性图根据顶点 (VD) 和边 (ED) 类型进行参数化。这些分别是与每个顶点和边关联的对象的类型。

当顶点和边类型是基本数据类型(例如,int、double 等)时,GraphX 会优化其表示,通过将它们存储在专门的数组中来减少内存占用。

在某些情况下,可能希望在同一个图中具有不同属性类型的顶点。这可以通过继承来实现。例如,要将用户和产品建模为二部图,我们可以执行以下操作

class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

与 RDD 类似,属性图是不可变的、分布式的和容错的。对图的值或结构的更改是通过生成具有所需更改的新图来完成的。请注意,原始图的很大一部分(即不受影响的结构、属性和索引)将在新图中重复使用,从而降低了这种固有函数式数据结构的成本。图使用一系列顶点分区启发式方法在执行器之间进行分区。与 RDD 一样,图的每个分区都可以在发生故障时在不同的机器上重新创建。

从逻辑上讲,属性图对应于一对类型化的集合(RDD),它们对每个顶点和边的属性进行编码。因此,图类包含用于访问图的顶点和边的成员

class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

VertexRDD[VD]EdgeRDD[ED] 扩展了 RDD[(VertexId, VD)]RDD[Edge[ED]],并且是它们的优化版本。VertexRDD[VD]EdgeRDD[ED] 都提供了围绕图计算构建的额外功能,并利用了内部优化。我们在关于 顶点和边 RDD 的部分中更详细地讨论了 VertexRDDVertexRDDEdgeRDDEdgeRDD API,但现在可以简单地将它们视为以下形式的 RDD:RDD[(VertexId, VD)]RDD[Edge[ED]]

示例属性图

假设我们想要构建一个属性图,其中包含 GraphX 项目的各种合作者。顶点属性可能包含用户名和职业。我们可以用描述合作者之间关系的字符串来注释边

The Property Graph

生成的图将具有类型签名

val userGraph: Graph[(String, String), String]

从原始文件、RDD 甚至合成生成器构建属性图的方法有很多,这些将在关于 图构建器 的部分中更详细地讨论。可能最通用的方法是使用 Graph 对象。例如,以下代码从 RDD 集合构建一个图

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

在上面的示例中,我们使用了 Edge 案例类。边具有一个 srcId 和一个 dstId,分别对应于源顶点和目标顶点标识符。此外,Edge 类有一个 attr 成员,用于存储边属性。

我们可以分别使用 graph.verticesgraph.edges 成员将图分解为相应的顶点和边视图。

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

请注意,graph.vertices 返回一个 VertexRDD[(String, String)],它扩展了 RDD[(VertexId, (String, String))],因此我们使用 scala case 表达式来分解元组。另一方面,graph.edges 返回一个包含 Edge[String] 对象的 EdgeRDD。我们也可以使用以下案例类类型构造函数

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

除了属性图的顶点和边视图之外,GraphX 还公开了三元组视图。三元组视图从逻辑上连接顶点和边属性,生成一个包含 EdgeTriplet 类实例的 RDD[EdgeTriplet[VD, ED]]。这种连接可以用以下 SQL 表达式表示

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

或以图形方式表示为

Edge Triplet

EdgeTriplet 类扩展了 Edge 类,并添加了 srcAttrdstAttr 成员,它们分别包含源属性和目标属性。我们可以使用图的三元组视图来呈现描述用户之间关系的字符串集合。

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

图操作符

正如 RDD 具有 mapfilterreduceByKey 等基本操作一样,属性图也有一组基本操作符,它们接受用户定义的函数并生成具有转换后的属性和结构的新图。在 Graph 中定义了具有优化实现的核心操作符,在 GraphOps 中定义了作为核心操作符组合表示的便捷操作符。但是,由于 Scala 隐式,GraphOps 中的操作符会自动作为 Graph 的成员提供。例如,我们可以通过以下方式计算每个顶点的入度(在 GraphOps 中定义)

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

区分核心图操作和 GraphOps 的原因是能够在将来支持不同的图表示。每个图表示都必须提供核心操作的实现,并重复使用在 GraphOps 中定义的许多有用操作。

操作符汇总列表

以下是 GraphGraphOps 中定义的功能的简要总结,为了简单起见,这些功能被呈现为 Graph 的成员。请注意,一些函数签名已被简化(例如,默认参数和类型约束已删除),一些更高级的功能已被删除,因此请参阅 API 文档以获取操作的官方列表。

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexId, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
      (mapFunc: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}

属性操作符

与 RDD 的 map 操作符类似,属性图包含以下内容

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

这些操作符中的每一个都生成一个新的图,其中顶点或边属性由用户定义的 map 函数修改。

请注意,在每种情况下,图结构都不会受到影响。这是这些操作符的一个关键特性,它允许生成的图重用原始图的结构索引。以下代码片段在逻辑上是等效的,但第一个代码片段不保留结构索引,并且不会从 GraphX 系统优化中受益

val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)

相反,使用 mapVertices 来保留索引

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

这些操作符通常用于为特定计算初始化图或投影掉不必要的属性。例如,给定一个以出度作为顶点属性的图(我们将在后面描述如何构建这样的图),我们为 PageRank 初始化它

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

结构操作符

目前,GraphX 只支持一组简单的常用结构操作符,我们预计将来会添加更多。以下是基本结构操作符的列表。

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

The reverse 操作符返回一个新的图,其中所有边的方向都反转。这在尝试计算逆 PageRank 时很有用。由于反转操作不会修改顶点或边属性,也不会改变边的数量,因此它可以在没有数据移动或复制的情况下有效地实现。

The subgraph 操作符接受顶点和边谓词,并返回一个图,该图只包含满足顶点谓词(评估为真)的顶点和满足边谓词的边,并且连接满足顶点谓词的顶点。The subgraph 操作符可以在许多情况下用于将图限制在感兴趣的顶点和边上,或消除断开的链接。例如,在以下代码中,我们删除了断开的链接

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

请注意,在上面的示例中,只提供了顶点谓词。如果未提供顶点或边谓词,则 subgraph 操作符默认为 true

The mask 操作符通过返回一个包含在输入图中也找到的顶点和边的图来构建一个子图。这可以与 subgraph 操作符结合使用,以根据另一个相关图中的属性限制图。例如,我们可能会使用具有缺失顶点的图运行连通分量,然后将答案限制在有效的子图中。

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

The groupEdges 操作符合并多图中的平行边(即,一对顶点之间的重复边)。在许多数值应用中,平行边可以添加(它们的权重组合)到一条边中,从而减小图的大小。

连接操作符

在许多情况下,需要将来自外部集合(RDD)的数据与图连接起来。例如,我们可能有一些额外的用户属性,我们希望将其与现有图合并,或者我们可能希望将顶点属性从一个图拉到另一个图。这些任务可以使用连接操作符来完成。下面列出了关键的连接操作符

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

The joinVertices 操作符将顶点与输入 RDD 连接起来,并返回一个新的图,其中顶点属性是通过将用户定义的 map 函数应用于连接顶点的结果而获得的。没有在 RDD 中找到匹配值的顶点保留其原始值。

请注意,如果 RDD 为给定顶点包含多个值,则只使用其中一个。因此,建议使用以下方法使输入 RDD 唯一,这也会预先索引生成的值,从而大大加快随后的连接。

val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

更通用的 outerJoinVertices 的行为类似于 joinVertices,除了用户定义的 map 函数应用于所有顶点,并且可以更改顶点属性类型。由于并非所有顶点都可能在输入 RDD 中找到匹配值,因此 map 函数接受一个 Option 类型。例如,我们可以通过使用其 outDegree 初始化顶点属性来设置 PageRank 的图。

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

你可能已经注意到在上面的示例中使用的多个参数列表(例如,f(a)(b))柯里化函数模式。虽然我们可以同样地将 f(a)(b) 写成 f(a,b),但这意味着 b 上的类型推断将不依赖于 a。因此,用户需要为用户定义的函数提供类型注释

val joinedGraph = graph.joinVertices(uniqueCosts,
  (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)

邻域聚合

许多图分析任务中的一个关键步骤是聚合有关每个顶点邻域的信息。例如,我们可能想知道每个用户有多少粉丝,或者每个用户的粉丝的平均年龄。许多迭代图算法(例如,PageRank、最短路径和连通分量)反复聚合相邻顶点的属性(例如,当前 PageRank 值、到源的最短路径和可达到的最小顶点 ID)。

为了提高性能,主要聚合操作符已从 graph.mapReduceTriplets 更改为新的 graph.AggregateMessages。虽然 API 中的更改相对较小,但我们提供了以下过渡指南。

聚合消息 (aggregateMessages)

GraphX 中的核心聚合操作是 aggregateMessages。此操作符将用户定义的 sendMsg 函数应用于图中的每个边三元组,然后使用 mergeMsg 函数在其目标顶点处聚合这些消息。

class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}

用户定义的 sendMsg 函数接受一个 EdgeContext,它公开源和目标属性以及边属性和函数(sendToSrcsendToDst)来向源和目标属性发送消息。将 sendMsg 视为 map-reduce 中的map 函数。用户定义的 mergeMsg 函数接受两个发送到同一顶点的消息,并生成一条消息。将 mergeMsg 视为 map-reduce 中的reduce 函数。The aggregateMessages 操作符返回一个 VertexRDD[Msg],其中包含发送到每个顶点的聚合消息(类型为 Msg)。没有收到消息的顶点不包含在返回的 VertexRDDVertexRDD 中。

此外,aggregateMessages 接受一个可选的 tripletsFields,它指示在 EdgeContext 中访问了哪些数据(即,源顶点属性,但不是目标顶点属性)。tripletsFields 的可能选项在 TripletFields 中定义,默认值为 TripletFields.All,它表示用户定义的 sendMsg 函数可以访问 EdgeContext 中的任何字段。The tripletsFields 参数可用于通知 GraphX 只需要 EdgeContext 的一部分,从而允许 GraphX 选择优化的连接策略。例如,如果我们正在计算每个用户的粉丝的平均年龄,我们只需要源字段,因此我们将使用 TripletFields.Src 来指示我们只需要源字段

在 GraphX 的早期版本中,我们使用字节码检查来推断 TripletFields,但我们发现字节码检查略有不稳定,因此选择更明确的用户控制。

在以下示例中,我们使用 aggregateMessages 操作符来计算每个用户的资历更高的粉丝的平均年龄。

import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators

// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst((1, triplet.srcAttr))
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
在 Spark 仓库的“examples/src/main/scala/org/apache/spark/examples/graphx/AggregateMessagesExample.scala”中找到完整的示例代码。

当消息(以及消息的总和)大小恒定(例如,浮点数和加法而不是列表和串联)时,aggregateMessages 操作的性能最佳。

Map Reduce 三元组转换指南(遗留)

在 GraphX 的早期版本中,邻域聚合是使用 mapReduceTriplets 操作符完成的

class Graph[VD, ED] {
  def mapReduceTriplets[Msg](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
      reduce: (Msg, Msg) => Msg)
    : VertexRDD[Msg]
}

mapReduceTriplets 运算符接受一个用户定义的映射函数,该函数应用于每个三元组,并可以生成消息,这些消息使用用户定义的 reduce 函数进行聚合。但是,我们发现返回的迭代器的用户使用成本很高,并且阻碍了我们应用其他优化(例如,本地顶点重新编号)。在 aggregateMessages 中,我们引入了 EdgeContext,它公开了三元组字段,以及显式向源顶点和目标顶点发送消息的函数。此外,我们还删除了字节码检查,而是要求用户指示三元组中实际需要的字段。

以下代码块使用 mapReduceTriplets

val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
  Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

可以使用 aggregateMessages 重写为

val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
  triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)

计算度信息

一个常见的聚合任务是计算每个顶点的度数:与每个顶点相邻的边的数量。在有向图的上下文中,通常需要知道每个顶点的入度、出度和总度数。GraphOps 类包含一组运算符来计算每个顶点的度数。例如,在以下示例中,我们计算最大入度、出度和总度数

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

收集邻居

在某些情况下,通过收集每个顶点的相邻顶点及其属性来表达计算可能更容易。这可以使用 collectNeighborIdscollectNeighbors 运算符轻松完成。

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

这些运算符可能非常昂贵,因为它们会复制信息并需要大量通信。如果可能,请尝试使用 aggregateMessages 运算符直接表达相同的计算。

缓存和取消缓存

在 Spark 中,RDD 默认情况下不会在内存中持久化。为了避免重新计算,在多次使用它们时必须显式缓存它们(请参阅 Spark 编程指南)。GraphX 中的图的行为方式相同。在多次使用图时,请确保先调用 Graph.cache()

在迭代计算中,取消缓存也可能对性能至关重要。默认情况下,缓存的 RDD 和图将保留在内存中,直到内存压力迫使它们以 LRU 顺序被逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终会被逐出,但存储在内存中的不必要数据会减慢垃圾回收速度。取消缓存中间结果,只要它们不再需要,就会更有效率。这涉及在每次迭代时物化(缓存和强制)图或 RDD,取消缓存所有其他数据集,并且只在将来的迭代中使用物化的数据集。但是,由于图由多个 RDD 组成,因此可能难以正确地取消持久化它们。对于迭代计算,我们建议使用 Pregel API,它可以正确地取消持久化中间结果。

Pregel API

图本质上是递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又取决于邻居的属性。因此,许多重要的图算法迭代地重新计算每个顶点的属性,直到达到不动点条件。已经提出了一系列图并行抽象来表达这些迭代算法。GraphX 公开了 Pregel API 的变体。

从高层次上讲,GraphX 中的 Pregel 运算符是受图拓扑约束的批量同步并行消息传递抽象。Pregel 运算符在一系列超级步骤中执行,在这些超级步骤中,顶点接收来自前一个超级步骤的入站消息的总和,计算顶点属性的新值,然后在下一个超级步骤中向相邻顶点发送消息。与 Pregel 不同,消息是作为边缘三元组的函数并行计算的,消息计算可以访问源顶点和目标顶点属性。在超级步骤中,不接收消息的顶点将被跳过。当没有剩余消息时,Pregel 运算符将终止迭代并返回最终图。

请注意,与更标准的 Pregel 实现不同,GraphX 中的顶点只能向相邻顶点发送消息,并且消息构造是使用用户定义的消息传递函数并行完成的。这些约束允许 GraphX 中进行额外的优化。

以下是 Pregel 运算符 的类型签名以及其实现的草图(注意:为了避免由于长血统链导致的 stackOverflowError,pregel 支持通过将“spark.graphx.pregel.checkpointInterval”设置为正数(例如 10)来定期检查点图和消息。并使用 SparkContext.setCheckpointDir(directory: String) 设置检查点目录)

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()

    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages
      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = GraphXUtils.mapReduceTriplets(
        g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

请注意,Pregel 接受两个参数列表(即 graph.pregel(list1)(list2))。第一个参数列表包含配置参数,包括初始消息、最大迭代次数以及发送消息的边缘方向(默认情况下沿着出边)。第二个参数列表包含用于接收消息(顶点程序 vprog)、计算消息(sendMsg)和合并消息 mergeMsg 的用户定义函数。

我们可以使用 Pregel 运算符来表达计算,例如以下示例中的单源最短路径。

import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators

// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/graphx/SSSPExample.scala”中找到完整的示例代码。

图构建器

GraphX 提供了几种方法来从 RDD 中的顶点和边的集合或磁盘上构建图。默认情况下,没有一个图构建器会对图的边进行重新分区;相反,边会保留在它们的默认分区中(例如,它们在 HDFS 中的原始块)。Graph.groupEdges 需要对图进行重新分区,因为它假设相同的边将位于同一个分区上,因此您必须在调用 groupEdges 之前调用 Graph.partitionBy

object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}

GraphLoader.edgeListFile 提供了一种从磁盘上的边列表加载图的方法。它解析以下形式的(源顶点 ID,目标顶点 ID)对的邻接列表,跳过以 # 开头的注释行

# This is a comment
2 1
4 1
1 2

它从指定的边创建 Graph,自动创建边中提到的任何顶点。所有顶点和边属性默认值为 1。canonicalOrientation 参数允许将边重新定向到正方向(srcId < dstId),这是 连通分量 算法所必需的。minEdgePartitions 参数指定要生成的最小边分区数;如果例如 HDFS 文件有更多块,则可能会有比指定更多的边分区。

object Graph {
  def apply[VD, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}

Graph.apply 允许从顶点和边的 RDD 创建图。重复的顶点将被任意选择,在边 RDD 中找到但在顶点 RDD 中未找到的顶点将被分配默认属性。

Graph.fromEdges 允许仅从边的 RDD 创建图,自动创建边中提到的任何顶点,并为它们分配默认值。

Graph.fromEdgeTuples 允许仅从边元组的 RDD 创建图,将边分配值为 1,并自动创建边中提到的任何顶点,并为它们分配默认值。它还支持对边进行去重;要进行去重,请将 SomePartitionStrategy 作为 uniqueEdges 参数传递(例如,uniqueEdges = Some(PartitionStrategy.RandomVertexCut))。分区策略对于将相同的边放在同一个分区上是必要的,以便可以对它们进行去重。

顶点和边 RDD

GraphX 公开了图中存储的顶点和边的 RDD 视图。但是,由于 GraphX 将顶点和边保存在优化的数据结构中,并且这些数据结构提供了额外的功能,因此顶点和边将分别作为 VertexRDDEdgeRDD 返回。在本节中,我们将回顾这些类型中一些额外的有用功能。请注意,这只是一个不完整的列表,有关操作的官方列表,请参阅 API 文档。

VertexRDD

VertexRDD[A] 扩展了 RDD[(VertexId, A)],并添加了额外的约束,即每个 VertexId 仅出现一次。此外,VertexRDD[A] 表示一组顶点,每个顶点都有一个类型为 A 的属性。在内部,这是通过将顶点属性存储在可重用的哈希映射数据结构中来实现的。因此,如果两个 VertexRDD 来自同一个基本 VertexRDD(例如,通过 filtermapValues),它们可以在恒定时间内连接,而无需哈希评估。为了利用此索引数据结构,VertexRDD 公开了以下额外功能

class VertexRDD[VD] extends RDD[(VertexId, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Show only vertices unique to this set based on their VertexId's
  def minus(other: RDD[(VertexId, VD)])
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

例如,请注意 filter 运算符如何返回 VertexRDD。实际上,过滤器是使用 BitSet 实现的,从而重新使用索引并保留与其他 VertexRDD 进行快速连接的能力。同样,mapValues 运算符不允许 map 函数更改 VertexId,从而使相同的 HashMap 数据结构可以重复使用。leftJoininnerJoin 都能够识别何时连接两个来自同一个 HashMapVertexRDD,并通过线性扫描而不是代价高昂的点查找来实现连接。

操作符 aggregateUsingIndex 可用于从 RDD[(VertexId, A)] 高效构建新的 VertexRDDVertexRDD。从概念上讲,如果我已在顶点集上构建了 VertexRDD[B]它是一个超集,包含某些 RDD[(VertexId, A)] 中的顶点,那么我可以重用索引来聚合和随后索引 RDD[(VertexId, A)]。例如

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDD

扩展 RDD[Edge[ED]]EdgeRDD[ED] 将边组织成块,这些块使用 PartitionStrategy 中定义的各种分区策略之一进行分区。在每个分区内,边属性和邻接结构分别存储,在更改属性值时可以最大程度地重复使用。

EdgeRDDEdgeRDD 公开的三个附加函数是

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在大多数应用程序中,我们发现对 EdgeRDDEdgeRDD 的操作是通过图操作完成的,或者依赖于基类 RDD 类中定义的操作。

优化表示

虽然本指南不包含对 GraphX 分布式图表示中使用的优化的详细描述,但对一些高级理解可能有助于设计可扩展算法以及 API 的最佳使用。GraphX 采用顶点切割方法进行分布式图分区

Edge Cut vs. Vertex Cut

GraphX 不是沿着边分割图,而是沿着顶点分割图,这可以减少通信和存储开销。从逻辑上讲,这对应于将边分配给机器,并允许顶点跨越多个机器。边分配的具体方法取决于 PartitionStrategy,并且各种启发式方法之间存在一些权衡。用户可以通过使用 Graph.partitionBy 操作符重新分区图来选择不同的策略。默认分区策略是使用在图构建时提供的边的初始分区。但是,用户可以轻松切换到 2D 分区或 GraphX 中包含的其他启发式方法。

RDD Graph Representation

一旦边被分区,高效图并行计算的关键挑战是有效地将顶点属性与边连接起来。由于现实世界中的图通常比顶点具有更多边,因此我们将顶点属性移动到边。由于并非所有分区都包含与所有顶点相邻的边,因此我们在内部维护一个路由表,该路由表标识在实现诸如 tripletsaggregateMessages 之类的操作所需的连接时,将顶点广播到何处。

图算法

GraphX 包含一组图算法,以简化分析任务。这些算法包含在 org.apache.spark.graphx.lib 包中,可以通过 GraphOps 直接作为 Graph 上的方法访问。本节介绍这些算法及其使用方法。

PageRank

PageRank 衡量图中每个顶点的重要性,假设从 uv 的边表示 uv 重要性的认可。例如,如果一个 Twitter 用户被许多其他人关注,那么该用户将排名很高。

GraphX 在 PageRank 对象 上的方法中提供了 PageRank 的静态和动态实现。静态 PageRank 运行固定次数的迭代,而动态 PageRank 运行直到排名收敛(即,变化量小于指定的容差)。GraphOps 允许直接将这些算法作为 Graph 上的方法调用。

GraphX 还包含一个示例社交网络数据集,我们可以在其上运行 PageRank。一组用户在 data/graphx/users.txt 中给出,一组用户之间的关系在 data/graphx/followers.txt 中给出。我们计算每个用户的 PageRank,如下所示

import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/graphx/PageRankExample.scala”中找到完整的示例代码。

连通分量

连通分量算法使用其编号最低的顶点的 ID 对图的每个连通分量进行标记。例如,在社交网络中,连通分量可以近似地表示集群。GraphX 在 ConnectedComponents 对象 中包含该算法的实现,我们计算来自 PageRank 部分 的示例社交网络数据集的连通分量,如下所示

import org.apache.spark.graphx.GraphLoader

// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
  case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/graphx/ConnectedComponentsExample.scala”中找到完整的示例代码。

三角形计数

当一个顶点有两个相邻顶点之间存在边时,该顶点就是三角形的一部分。GraphX 在 TriangleCount 对象 中实现了一个三角形计数算法,该算法确定通过每个顶点的三角形数量,提供聚类的度量。我们计算来自 PageRank 部分 的社交网络数据集的三角形计数。请注意,TriangleCount 要求边处于规范方向 (srcId < dstId),并且图使用 Graph.partitionBy 进行分区。

import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}

// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
  .partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  (username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/graphx/TriangleCountingExample.scala”中找到完整的示例代码。

示例

假设我想从一些文本文件中构建一个图,将图限制在重要的关系和用户,在子图上运行 PageRank,最后返回与顶级用户相关的属性。我可以用 GraphX 在几行代码中完成所有这些操作

import org.apache.spark.graphx.GraphLoader

// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  // Some users may not have attributes so we set them as empty
  case (uid, deg, None) => Array.empty[String]
}

// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/graphx/ComprehensiveExample.scala”中找到完整的示例代码。