GraphX 编程指南
概述
GraphX 是 Spark 中用于图和图并行计算的新组件。从高层次来看,GraphX 通过引入新的 图 抽象来扩展 Spark RDD:一个有向多图,每个顶点和边都附带属性。为了支持图计算,GraphX 公开了许多基本操作符(例如,子图、joinVertices 和 aggregateMessages),以及 Pregel API 的优化变体。此外,GraphX 还包含不断增长的图 算法 和 构建器 集合,以简化图分析任务。
入门
要开始使用,您首先需要将 Spark 和 GraphX 导入您的项目,如下所示
如果您没有使用 Spark shell,您还需要一个 SparkContext
。要了解有关使用 Spark 入门的更多信息,请参阅 Spark 快速入门指南。
属性图
属性图 是一个有向多图,每个顶点和边都附带用户定义的对象。有向多图是有向图,可能有多个平行边共享相同的源顶点和目标顶点。支持平行边的能力简化了对可能存在相同顶点之间多个关系(例如,同事和朋友)的建模场景。每个顶点都由一个唯一的 64 位长标识符 (VertexId
) 作为键。GraphX 不会对顶点标识符施加任何排序约束。类似地,边具有相应的源顶点和目标顶点标识符。
属性图根据顶点 (VD
) 和边 (ED
) 类型进行参数化。这些分别是与每个顶点和边关联的对象的类型。
当顶点和边类型是基本数据类型(例如,int、double 等)时,GraphX 会优化其表示,通过将它们存储在专门的数组中来减少内存占用。
在某些情况下,可能希望在同一个图中具有不同属性类型的顶点。这可以通过继承来实现。例如,要将用户和产品建模为二部图,我们可以执行以下操作
与 RDD 类似,属性图是不可变的、分布式的和容错的。对图的值或结构的更改是通过生成具有所需更改的新图来完成的。请注意,原始图的很大一部分(即不受影响的结构、属性和索引)将在新图中重复使用,从而降低了这种固有函数式数据结构的成本。图使用一系列顶点分区启发式方法在执行器之间进行分区。与 RDD 一样,图的每个分区都可以在发生故障时在不同的机器上重新创建。
从逻辑上讲,属性图对应于一对类型化的集合(RDD),它们对每个顶点和边的属性进行编码。因此,图类包含用于访问图的顶点和边的成员
类 VertexRDD[VD]
和 EdgeRDD[ED]
扩展了 RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
,并且是它们的优化版本。VertexRDD[VD]
和 EdgeRDD[ED]
都提供了围绕图计算构建的额外功能,并利用了内部优化。我们在关于 顶点和边 RDD 的部分中更详细地讨论了 VertexRDD
VertexRDD 和 EdgeRDD
EdgeRDD API,但现在可以简单地将它们视为以下形式的 RDD:RDD[(VertexId, VD)]
和 RDD[Edge[ED]]
。
示例属性图
假设我们想要构建一个属性图,其中包含 GraphX 项目的各种合作者。顶点属性可能包含用户名和职业。我们可以用描述合作者之间关系的字符串来注释边
生成的图将具有类型签名
从原始文件、RDD 甚至合成生成器构建属性图的方法有很多,这些将在关于 图构建器 的部分中更详细地讨论。可能最通用的方法是使用 Graph 对象。例如,以下代码从 RDD 集合构建一个图
在上面的示例中,我们使用了 Edge
案例类。边具有一个 srcId
和一个 dstId
,分别对应于源顶点和目标顶点标识符。此外,Edge
类有一个 attr
成员,用于存储边属性。
我们可以分别使用 graph.vertices
和 graph.edges
成员将图分解为相应的顶点和边视图。
请注意,
graph.vertices
返回一个VertexRDD[(String, String)]
,它扩展了RDD[(VertexId, (String, String))]
,因此我们使用 scalacase
表达式来分解元组。另一方面,graph.edges
返回一个包含Edge[String]
对象的EdgeRDD
。我们也可以使用以下案例类类型构造函数
除了属性图的顶点和边视图之外,GraphX 还公开了三元组视图。三元组视图从逻辑上连接顶点和边属性,生成一个包含 EdgeTriplet
类实例的 RDD[EdgeTriplet[VD, ED]]
。这种连接可以用以下 SQL 表达式表示
或以图形方式表示为
EdgeTriplet
类扩展了 Edge
类,并添加了 srcAttr
和 dstAttr
成员,它们分别包含源属性和目标属性。我们可以使用图的三元组视图来呈现描述用户之间关系的字符串集合。
图操作符
正如 RDD 具有 map
、filter
和 reduceByKey
等基本操作一样,属性图也有一组基本操作符,它们接受用户定义的函数并生成具有转换后的属性和结构的新图。在 Graph
中定义了具有优化实现的核心操作符,在 GraphOps
中定义了作为核心操作符组合表示的便捷操作符。但是,由于 Scala 隐式,GraphOps
中的操作符会自动作为 Graph
的成员提供。例如,我们可以通过以下方式计算每个顶点的入度(在 GraphOps
中定义)
区分核心图操作和 GraphOps
的原因是能够在将来支持不同的图表示。每个图表示都必须提供核心操作的实现,并重复使用在 GraphOps
中定义的许多有用操作。
操作符汇总列表
以下是 Graph
和 GraphOps
中定义的功能的简要总结,为了简单起见,这些功能被呈现为 Graph 的成员。请注意,一些函数签名已被简化(例如,默认参数和类型约束已删除),一些更高级的功能已被删除,因此请参阅 API 文档以获取操作的官方列表。
属性操作符
与 RDD 的 map
操作符类似,属性图包含以下内容
这些操作符中的每一个都生成一个新的图,其中顶点或边属性由用户定义的 map
函数修改。
请注意,在每种情况下,图结构都不会受到影响。这是这些操作符的一个关键特性,它允许生成的图重用原始图的结构索引。以下代码片段在逻辑上是等效的,但第一个代码片段不保留结构索引,并且不会从 GraphX 系统优化中受益
相反,使用
mapVertices
来保留索引
这些操作符通常用于为特定计算初始化图或投影掉不必要的属性。例如,给定一个以出度作为顶点属性的图(我们将在后面描述如何构建这样的图),我们为 PageRank 初始化它
结构操作符
目前,GraphX 只支持一组简单的常用结构操作符,我们预计将来会添加更多。以下是基本结构操作符的列表。
The reverse
操作符返回一个新的图,其中所有边的方向都反转。这在尝试计算逆 PageRank 时很有用。由于反转操作不会修改顶点或边属性,也不会改变边的数量,因此它可以在没有数据移动或复制的情况下有效地实现。
The subgraph
操作符接受顶点和边谓词,并返回一个图,该图只包含满足顶点谓词(评估为真)的顶点和满足边谓词的边,并且连接满足顶点谓词的顶点。The subgraph
操作符可以在许多情况下用于将图限制在感兴趣的顶点和边上,或消除断开的链接。例如,在以下代码中,我们删除了断开的链接
请注意,在上面的示例中,只提供了顶点谓词。如果未提供顶点或边谓词,则
subgraph
操作符默认为true
。
The mask
操作符通过返回一个包含在输入图中也找到的顶点和边的图来构建一个子图。这可以与 subgraph
操作符结合使用,以根据另一个相关图中的属性限制图。例如,我们可能会使用具有缺失顶点的图运行连通分量,然后将答案限制在有效的子图中。
The groupEdges
操作符合并多图中的平行边(即,一对顶点之间的重复边)。在许多数值应用中,平行边可以添加(它们的权重组合)到一条边中,从而减小图的大小。
连接操作符
在许多情况下,需要将来自外部集合(RDD)的数据与图连接起来。例如,我们可能有一些额外的用户属性,我们希望将其与现有图合并,或者我们可能希望将顶点属性从一个图拉到另一个图。这些任务可以使用连接操作符来完成。下面列出了关键的连接操作符
The joinVertices
操作符将顶点与输入 RDD 连接起来,并返回一个新的图,其中顶点属性是通过将用户定义的 map
函数应用于连接顶点的结果而获得的。没有在 RDD 中找到匹配值的顶点保留其原始值。
请注意,如果 RDD 为给定顶点包含多个值,则只使用其中一个。因此,建议使用以下方法使输入 RDD 唯一,这也会预先索引生成的值,从而大大加快随后的连接。
更通用的 outerJoinVertices
的行为类似于 joinVertices
,除了用户定义的 map
函数应用于所有顶点,并且可以更改顶点属性类型。由于并非所有顶点都可能在输入 RDD 中找到匹配值,因此 map
函数接受一个 Option
类型。例如,我们可以通过使用其 outDegree
初始化顶点属性来设置 PageRank 的图。
你可能已经注意到在上面的示例中使用的多个参数列表(例如,
f(a)(b)
)柯里化函数模式。虽然我们可以同样地将f(a)(b)
写成f(a,b)
,但这意味着b
上的类型推断将不依赖于a
。因此,用户需要为用户定义的函数提供类型注释
邻域聚合
许多图分析任务中的一个关键步骤是聚合有关每个顶点邻域的信息。例如,我们可能想知道每个用户有多少粉丝,或者每个用户的粉丝的平均年龄。许多迭代图算法(例如,PageRank、最短路径和连通分量)反复聚合相邻顶点的属性(例如,当前 PageRank 值、到源的最短路径和可达到的最小顶点 ID)。
为了提高性能,主要聚合操作符已从
graph.mapReduceTriplets
更改为新的graph.AggregateMessages
。虽然 API 中的更改相对较小,但我们提供了以下过渡指南。
聚合消息 (aggregateMessages)
GraphX 中的核心聚合操作是 aggregateMessages
。此操作符将用户定义的 sendMsg
函数应用于图中的每个边三元组,然后使用 mergeMsg
函数在其目标顶点处聚合这些消息。
用户定义的 sendMsg
函数接受一个 EdgeContext
,它公开源和目标属性以及边属性和函数(sendToSrc
和 sendToDst
)来向源和目标属性发送消息。将 sendMsg
视为 map-reduce 中的map 函数。用户定义的 mergeMsg
函数接受两个发送到同一顶点的消息,并生成一条消息。将 mergeMsg
视为 map-reduce 中的reduce 函数。The aggregateMessages
操作符返回一个 VertexRDD[Msg]
,其中包含发送到每个顶点的聚合消息(类型为 Msg
)。没有收到消息的顶点不包含在返回的 VertexRDD
VertexRDD 中。
此外,aggregateMessages
接受一个可选的 tripletsFields
,它指示在 EdgeContext
中访问了哪些数据(即,源顶点属性,但不是目标顶点属性)。tripletsFields
的可能选项在 TripletFields
中定义,默认值为 TripletFields.All
,它表示用户定义的 sendMsg
函数可以访问 EdgeContext
中的任何字段。The tripletsFields
参数可用于通知 GraphX 只需要 EdgeContext
的一部分,从而允许 GraphX 选择优化的连接策略。例如,如果我们正在计算每个用户的粉丝的平均年龄,我们只需要源字段,因此我们将使用 TripletFields.Src
来指示我们只需要源字段
在 GraphX 的早期版本中,我们使用字节码检查来推断
TripletFields
,但我们发现字节码检查略有不稳定,因此选择更明确的用户控制。
在以下示例中,我们使用 aggregateMessages
操作符来计算每个用户的资历更高的粉丝的平均年龄。
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst((1, triplet.srcAttr))
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
当消息(以及消息的总和)大小恒定(例如,浮点数和加法而不是列表和串联)时,
aggregateMessages
操作的性能最佳。
Map Reduce 三元组转换指南(遗留)
在 GraphX 的早期版本中,邻域聚合是使用 mapReduceTriplets
操作符完成的
mapReduceTriplets
运算符接受一个用户定义的映射函数,该函数应用于每个三元组,并可以生成消息,这些消息使用用户定义的 reduce
函数进行聚合。但是,我们发现返回的迭代器的用户使用成本很高,并且阻碍了我们应用其他优化(例如,本地顶点重新编号)。在 aggregateMessages
中,我们引入了 EdgeContext,它公开了三元组字段,以及显式向源顶点和目标顶点发送消息的函数。此外,我们还删除了字节码检查,而是要求用户指示三元组中实际需要的字段。
以下代码块使用 mapReduceTriplets
可以使用 aggregateMessages
重写为
计算度信息
一个常见的聚合任务是计算每个顶点的度数:与每个顶点相邻的边的数量。在有向图的上下文中,通常需要知道每个顶点的入度、出度和总度数。GraphOps
类包含一组运算符来计算每个顶点的度数。例如,在以下示例中,我们计算最大入度、出度和总度数
收集邻居
在某些情况下,通过收集每个顶点的相邻顶点及其属性来表达计算可能更容易。这可以使用 collectNeighborIds
和 collectNeighbors
运算符轻松完成。
这些运算符可能非常昂贵,因为它们会复制信息并需要大量通信。如果可能,请尝试使用
aggregateMessages
运算符直接表达相同的计算。
缓存和取消缓存
在 Spark 中,RDD 默认情况下不会在内存中持久化。为了避免重新计算,在多次使用它们时必须显式缓存它们(请参阅 Spark 编程指南)。GraphX 中的图的行为方式相同。在多次使用图时,请确保先调用 Graph.cache()
。
在迭代计算中,取消缓存也可能对性能至关重要。默认情况下,缓存的 RDD 和图将保留在内存中,直到内存压力迫使它们以 LRU 顺序被逐出。对于迭代计算,来自先前迭代的中间结果将填满缓存。虽然它们最终会被逐出,但存储在内存中的不必要数据会减慢垃圾回收速度。取消缓存中间结果,只要它们不再需要,就会更有效率。这涉及在每次迭代时物化(缓存和强制)图或 RDD,取消缓存所有其他数据集,并且只在将来的迭代中使用物化的数据集。但是,由于图由多个 RDD 组成,因此可能难以正确地取消持久化它们。对于迭代计算,我们建议使用 Pregel API,它可以正确地取消持久化中间结果。
Pregel API
图本质上是递归数据结构,因为顶点的属性取决于其邻居的属性,而邻居的属性又取决于其邻居的属性。因此,许多重要的图算法迭代地重新计算每个顶点的属性,直到达到不动点条件。已经提出了一系列图并行抽象来表达这些迭代算法。GraphX 公开了 Pregel API 的变体。
从高层次上讲,GraphX 中的 Pregel 运算符是受图拓扑约束的批量同步并行消息传递抽象。Pregel 运算符在一系列超级步骤中执行,在这些超级步骤中,顶点接收来自前一个超级步骤的入站消息的总和,计算顶点属性的新值,然后在下一个超级步骤中向相邻顶点发送消息。与 Pregel 不同,消息是作为边缘三元组的函数并行计算的,消息计算可以访问源顶点和目标顶点属性。在超级步骤中,不接收消息的顶点将被跳过。当没有剩余消息时,Pregel 运算符将终止迭代并返回最终图。
请注意,与更标准的 Pregel 实现不同,GraphX 中的顶点只能向相邻顶点发送消息,并且消息构造是使用用户定义的消息传递函数并行完成的。这些约束允许 GraphX 中进行额外的优化。
以下是 Pregel 运算符 的类型签名以及其实现的草图(注意:为了避免由于长血统链导致的 stackOverflowError,pregel 支持通过将“spark.graphx.pregel.checkpointInterval”设置为正数(例如 10)来定期检查点图和消息。并使用 SparkContext.setCheckpointDir(directory: String) 设置检查点目录)
请注意,Pregel 接受两个参数列表(即 graph.pregel(list1)(list2)
)。第一个参数列表包含配置参数,包括初始消息、最大迭代次数以及发送消息的边缘方向(默认情况下沿着出边)。第二个参数列表包含用于接收消息(顶点程序 vprog
)、计算消息(sendMsg
)和合并消息 mergeMsg
的用户定义函数。
我们可以使用 Pregel 运算符来表达计算,例如以下示例中的单源最短路径。
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
图构建器
GraphX 提供了几种方法来从 RDD 中的顶点和边的集合或磁盘上构建图。默认情况下,没有一个图构建器会对图的边进行重新分区;相反,边会保留在它们的默认分区中(例如,它们在 HDFS 中的原始块)。Graph.groupEdges
需要对图进行重新分区,因为它假设相同的边将位于同一个分区上,因此您必须在调用 groupEdges
之前调用 Graph.partitionBy
。
GraphLoader.edgeListFile
提供了一种从磁盘上的边列表加载图的方法。它解析以下形式的(源顶点 ID,目标顶点 ID)对的邻接列表,跳过以 #
开头的注释行
# This is a comment
2 1
4 1
1 2
它从指定的边创建 Graph
,自动创建边中提到的任何顶点。所有顶点和边属性默认值为 1。canonicalOrientation
参数允许将边重新定向到正方向(srcId < dstId
),这是 连通分量 算法所必需的。minEdgePartitions
参数指定要生成的最小边分区数;如果例如 HDFS 文件有更多块,则可能会有比指定更多的边分区。
Graph.apply
允许从顶点和边的 RDD 创建图。重复的顶点将被任意选择,在边 RDD 中找到但在顶点 RDD 中未找到的顶点将被分配默认属性。
Graph.fromEdges
允许仅从边的 RDD 创建图,自动创建边中提到的任何顶点,并为它们分配默认值。
Graph.fromEdgeTuples
允许仅从边元组的 RDD 创建图,将边分配值为 1,并自动创建边中提到的任何顶点,并为它们分配默认值。它还支持对边进行去重;要进行去重,请将 Some
的 PartitionStrategy
作为 uniqueEdges
参数传递(例如,uniqueEdges = Some(PartitionStrategy.RandomVertexCut)
)。分区策略对于将相同的边放在同一个分区上是必要的,以便可以对它们进行去重。
顶点和边 RDD
GraphX 公开了图中存储的顶点和边的 RDD
视图。但是,由于 GraphX 将顶点和边保存在优化的数据结构中,并且这些数据结构提供了额外的功能,因此顶点和边将分别作为 VertexRDD
和 EdgeRDD
返回。在本节中,我们将回顾这些类型中一些额外的有用功能。请注意,这只是一个不完整的列表,有关操作的官方列表,请参阅 API 文档。
VertexRDD
VertexRDD[A]
扩展了 RDD[(VertexId, A)]
,并添加了额外的约束,即每个 VertexId
仅出现一次。此外,VertexRDD[A]
表示一组顶点,每个顶点都有一个类型为 A
的属性。在内部,这是通过将顶点属性存储在可重用的哈希映射数据结构中来实现的。因此,如果两个 VertexRDD
来自同一个基本 VertexRDD
(例如,通过 filter
或 mapValues
),它们可以在恒定时间内连接,而无需哈希评估。为了利用此索引数据结构,VertexRDD
公开了以下额外功能
例如,请注意 filter
运算符如何返回 VertexRDD
。实际上,过滤器是使用 BitSet
实现的,从而重新使用索引并保留与其他 VertexRDD
进行快速连接的能力。同样,mapValues
运算符不允许 map
函数更改 VertexId
,从而使相同的 HashMap
数据结构可以重复使用。leftJoin
和 innerJoin
都能够识别何时连接两个来自同一个 HashMap
的 VertexRDD
,并通过线性扫描而不是代价高昂的点查找来实现连接。
操作符 aggregateUsingIndex
可用于从 RDD[(VertexId, A)]
高效构建新的 VertexRDD
VertexRDD。从概念上讲,如果我已在顶点集上构建了 VertexRDD[B]
,它是一个超集,包含某些 RDD[(VertexId, A)]
中的顶点,那么我可以重用索引来聚合和随后索引 RDD[(VertexId, A)]
。例如
EdgeRDD
扩展 RDD[Edge[ED]]
的 EdgeRDD[ED]
将边组织成块,这些块使用 PartitionStrategy
中定义的各种分区策略之一进行分区。在每个分区内,边属性和邻接结构分别存储,在更改属性值时可以最大程度地重复使用。
EdgeRDD
EdgeRDD 公开的三个附加函数是
在大多数应用程序中,我们发现对 EdgeRDD
EdgeRDD 的操作是通过图操作完成的,或者依赖于基类 RDD
类中定义的操作。
优化表示
虽然本指南不包含对 GraphX 分布式图表示中使用的优化的详细描述,但对一些高级理解可能有助于设计可扩展算法以及 API 的最佳使用。GraphX 采用顶点切割方法进行分布式图分区
GraphX 不是沿着边分割图,而是沿着顶点分割图,这可以减少通信和存储开销。从逻辑上讲,这对应于将边分配给机器,并允许顶点跨越多个机器。边分配的具体方法取决于 PartitionStrategy
,并且各种启发式方法之间存在一些权衡。用户可以通过使用 Graph.partitionBy
操作符重新分区图来选择不同的策略。默认分区策略是使用在图构建时提供的边的初始分区。但是,用户可以轻松切换到 2D 分区或 GraphX 中包含的其他启发式方法。
一旦边被分区,高效图并行计算的关键挑战是有效地将顶点属性与边连接起来。由于现实世界中的图通常比顶点具有更多边,因此我们将顶点属性移动到边。由于并非所有分区都包含与所有顶点相邻的边,因此我们在内部维护一个路由表,该路由表标识在实现诸如 triplets
和 aggregateMessages
之类的操作所需的连接时,将顶点广播到何处。
图算法
GraphX 包含一组图算法,以简化分析任务。这些算法包含在 org.apache.spark.graphx.lib
包中,可以通过 GraphOps
直接作为 Graph
上的方法访问。本节介绍这些算法及其使用方法。
PageRank
PageRank 衡量图中每个顶点的重要性,假设从 u 到 v 的边表示 u 对 v 重要性的认可。例如,如果一个 Twitter 用户被许多其他人关注,那么该用户将排名很高。
GraphX 在 PageRank
对象 上的方法中提供了 PageRank 的静态和动态实现。静态 PageRank 运行固定次数的迭代,而动态 PageRank 运行直到排名收敛(即,变化量小于指定的容差)。GraphOps
允许直接将这些算法作为 Graph
上的方法调用。
GraphX 还包含一个示例社交网络数据集,我们可以在其上运行 PageRank。一组用户在 data/graphx/users.txt
中给出,一组用户之间的关系在 data/graphx/followers.txt
中给出。我们计算每个用户的 PageRank,如下所示
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
连通分量
连通分量算法使用其编号最低的顶点的 ID 对图的每个连通分量进行标记。例如,在社交网络中,连通分量可以近似地表示集群。GraphX 在 ConnectedComponents
对象 中包含该算法的实现,我们计算来自 PageRank 部分 的示例社交网络数据集的连通分量,如下所示
import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
三角形计数
当一个顶点有两个相邻顶点之间存在边时,该顶点就是三角形的一部分。GraphX 在 TriangleCount
对象 中实现了一个三角形计数算法,该算法确定通过每个顶点的三角形数量,提供聚类的度量。我们计算来自 PageRank 部分 的社交网络数据集的三角形计数。请注意,TriangleCount
要求边处于规范方向 (srcId < dstId
),并且图使用 Graph.partitionBy
进行分区。
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
示例
假设我想从一些文本文件中构建一个图,将图限制在重要的关系和用户,在子图上运行 PageRank,最后返回与顶级用户相关的属性。我可以用 GraphX 在几行代码中完成所有这些操作
import org.apache.spark.graphx.GraphLoader
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))