频繁模式挖掘 - 基于 RDD 的 API
挖掘频繁项、项集、子序列或其他子结构通常是分析大规模数据集的第一步,这在数据挖掘领域一直是活跃的研究课题。我们建议用户查阅维基百科的关联规则学习以获取更多信息。spark.mllib
提供了 FP-growth 的并行实现,FP-growth 是一种流行的频繁项集挖掘算法。
FP-growth
FP-growth 算法在论文 Han 等人,无需候选项集生成的频繁模式挖掘 中有所描述,其中“FP”代表频繁模式。给定一个事务数据集,FP-growth 的第一步是计算项的频率并识别频繁项。与旨在实现相同目的的 Apriori 类算法不同,FP-growth 的第二步使用后缀树(FP-tree)结构来编码事务,而无需显式生成候选项集,候选项集通常生成成本很高。第二步之后,可以从 FP-tree 中提取频繁项集。在 spark.mllib
中,我们实现了 FP-growth 的并行版本,称为 PFP,如 Li 等人,PFP:用于查询推荐的并行 FP-growth 中所述。PFP 根据事务的后缀分配 FP-tree 的增长工作,因此比单机实现更具可伸缩性。我们建议用户查阅论文以获取更多详细信息。
spark.mllib
的 FP-growth 实现接受以下(超)参数
minSupport
:项集被识别为频繁项集的最小支持度。例如,如果一个项在 5 个事务中出现 3 次,则其支持度为 3/5=0.6。numPartitions
:用于分配工作的分区数量。
示例
FPGrowth
实现了 FP-growth 算法。它接受一个事务 RDD
,其中每个事务是通用类型项的 List
。调用 FPGrowth.train
并传入事务会返回一个 FPGrowthModel
,它存储了频繁项集及其频率。
有关 API 的更多详细信息,请参阅 FPGrowth
Python 文档。
from pyspark.mllib.fpm import FPGrowth
data = sc.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
print(fi)
FPGrowth
实现了 FP-growth 算法。它接受一个事务 RDD
,其中每个事务是通用类型项的 Array
。调用 FPGrowth.run
并传入事务会返回一个 FPGrowthModel
,它存储了频繁项集及其频率。以下示例说明了如何从 transactions
中挖掘频繁项集和关联规则(有关详细信息,请参阅关联规则)。
有关 API 的详细信息,请参阅 FPGrowth
Scala 文档。
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(s"${itemset.items.mkString("[", ",", "]")},${itemset.freq}")
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(s"${rule.antecedent.mkString("[", ",", "]")}=> " +
s"${rule.consequent .mkString("[", ",", "]")},${rule.confidence}")
}
FPGrowth
实现了 FP-growth 算法。它接受一个事务 JavaRDD
,其中每个事务是通用类型项的 Iterable
。调用 FPGrowth.run
并传入事务会返回一个 FPGrowthModel
,它存储了频繁项集及其频率。以下示例说明了如何从 transactions
中挖掘频繁项集和关联规则(有关详细信息,请参阅关联规则)。
有关 API 的详细信息,请参阅 FPGrowth
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));
FPGrowth fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);
for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}
double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
: model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
关联规则
AssociationRules 实现了一种并行规则生成算法,用于构造以单个项作为结果的规则。
有关 API 的详细信息,请参阅 AssociationRules
Scala 文档。
import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
val freqItemsets = sc.parallelize(Seq(
new FreqItemset(Array("a"), 15L),
new FreqItemset(Array("b"), 35L),
new FreqItemset(Array("a", "b"), 12L)
))
val ar = new AssociationRules()
.setMinConfidence(0.8)
val results = ar.run(freqItemsets)
results.collect().foreach { rule =>
println(s"[${rule.antecedent.mkString(",")}=>${rule.consequent.mkString(",")} ]" +
s" ${rule.confidence}")
}
AssociationRules 实现了一种并行规则生成算法,用于构造以单个项作为结果的规则。
有关 API 的详细信息,请参阅 AssociationRules
Java 文档。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset;
JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
new FreqItemset<>(new String[] {"a"}, 15L),
new FreqItemset<>(new String[] {"b"}, 35L),
new FreqItemset<>(new String[] {"a", "b"}, 12L)
));
AssociationRules arules = new AssociationRules()
.setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);
for (AssociationRules.Rule<String> rule : results.collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
PrefixSpan
PrefixSpan 是一种序列模式挖掘算法,在 Pei 等人,通过模式增长挖掘序列模式:PrefixSpan 方法 中有所描述。我们建议读者查阅参考论文以了解序列模式挖掘问题的形式化定义。
spark.mllib
的 PrefixSpan 实现接受以下参数
minSupport
:被视为频繁序列模式所需的最小支持度。maxPatternLength
:频繁序列模式的最大长度。任何超过此长度的频繁模式将不包含在结果中。maxLocalProjDBSize
:在开始对投影数据库进行局部迭代处理之前,前缀投影数据库中允许的最大项数。此参数应根据执行器的大小进行调整。
示例
以下示例演示了 PrefixSpan 在序列上运行的情况(使用与 Pei 等人相同的表示法)
<(12)3>
<1(32)(12)>
<(12)5>
<6>
PrefixSpan
实现了 PrefixSpan 算法。调用 PrefixSpan.run
返回一个 PrefixSpanModel
,它存储了频繁序列及其频率。
有关 API 的详细信息,请参阅 PrefixSpan
Scala 文档 和 PrefixSpanModel
Scala 文档。
import org.apache.spark.mllib.fpm.PrefixSpan
val sequences = sc.parallelize(Seq(
Array(Array(1, 2), Array(3)),
Array(Array(1), Array(3, 2), Array(1, 2)),
Array(Array(1, 2), Array(5)),
Array(Array(6))
), 2).cache()
val prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
val model = prefixSpan.run(sequences)
model.freqSequences.collect().foreach { freqSequence =>
println(
s"${freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]")}," +
s" ${freqSequence.freq}")
}
PrefixSpan
实现了 PrefixSpan 算法。调用 PrefixSpan.run
返回一个 PrefixSpanModel
,它存储了频繁序列及其频率。
有关 API 的详细信息,请参阅 PrefixSpan
Java 文档 和 PrefixSpanModel
Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.mllib.fpm.PrefixSpan;
import org.apache.spark.mllib.fpm.PrefixSpanModel;
JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
for (PrefixSpan.FreqSequence<Integer> freqSeq: model.freqSequences().toJavaRDD().collect()) {
System.out.println(freqSeq.javaSequence() + ", " + freqSeq.freq());
}