频繁模式挖掘
挖掘频繁项、项集、子序列或其他子结构通常是分析大规模数据集的第一步,多年来这在数据挖掘领域一直是一个活跃的研究课题。有关更多信息,请参阅维基百科的关联规则学习。
目录
FP-Growth
FP-growth 算法在论文Han 等人,《不生成候选模式的频繁模式挖掘》中有所描述,其中“FP”代表频繁模式(frequent pattern)。给定一个事务数据集,FP-growth 的第一步是计算项的频率并识别频繁项。与为相同目的设计的Apriori 族算法不同,FP-growth 的第二步使用后缀树 (FP-tree) 结构来编码事务,而无需显式生成候选集,因为生成候选集通常成本高昂。第二步之后,可以从 FP-tree 中提取频繁项集。在 spark.mllib
中,我们实现了一个并行版本的 FP-growth,称为 PFP,如Li 等人,《PFP: 用于查询推荐的并行 FP-growth》中所述。PFP 基于事务的后缀分配增长 FP-tree 的工作,因此比单机实现更具可伸缩性。有关更多详细信息,请参阅相关论文。
FP-growth 作用于项集。项集是唯一项的无序集合。Spark 没有集合类型,因此项集表示为数组。
spark.ml
的 FP-growth 实现采用以下(超)参数:
minSupport
: 项集被识别为频繁项集的最小支持度。例如,如果一个项在 5 个事务中出现 3 次,则其支持度为 3/5=0.6。minConfidence
: 生成关联规则的最小置信度。置信度表示关联规则被发现为真的频率。例如,如果在事务中项集X
出现 4 次,X
和Y
共同出现只有 2 次,那么规则X => Y
的置信度就是 2/4 = 0.5。此参数不会影响频繁项集的挖掘,但会指定从频繁项集生成关联规则的最小置信度。numPartitions
: 用于分布式工作的分区数。默认情况下,此参数未设置,将使用输入数据集的分区数。
The FPGrowthModel
提供
freqItemsets
: 频繁项集,以 DataFrame 格式表示,包含以下列:items: array
: 给定的项集。freq: long
: 在给定配置模型参数的情况下,此项集被看到的次数计数。
associationRules
: 置信度高于minConfidence
生成的关联规则,以 DataFrame 格式表示,包含以下列:antecedent: array
: 作为关联规则假设的项集。consequent: array
: 一个项集,它总是包含一个元素,表示关联规则的结论。confidence: double
:confidence
的定义请参见上文的minConfidence
。lift: double
: 衡量前件对后件预测能力的指标,计算公式为support(antecedent U consequent) / (support(antecedent) x support(consequent))
support: double
:support
的定义请参见上文的minSupport
。
transform
: 对于itemsCol
中的每个事务,transform
方法会将其项与每条关联规则的前件进行比较。如果记录包含特定关联规则的所有前件,则该规则将被视为适用,其后件将添加到预测结果中。transform
方法会将所有适用规则的后件汇总为预测。预测列与itemsCol
具有相同的数据类型,并且不包含itemsCol
中的现有项。
示例
有关更多详细信息,请参阅 Python API 文档。
from pyspark.ml.fpm import FPGrowth
df = spark.createDataFrame([
(0, [1, 2, 5]),
(1, [1, 2, 3, 5]),
(2, [1, 2])
], ["id", "items"])
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)
# Display frequent itemsets.
model.freqItemsets.show()
# Display generated association rules.
model.associationRules.show()
# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df).show()
有关更多详细信息,请参阅 Scala API 文档。
import org.apache.spark.ml.fpm.FPGrowth
val dataset = spark.createDataset(Seq(
"1 2 5",
"1 2 3 5",
"1 2")
).map(t => t.split(" ")).toDF("items")
val fpgrowth = new FPGrowth().setItemsCol("items").setMinSupport(0.5).setMinConfidence(0.6)
val model = fpgrowth.fit(dataset)
// Display frequent itemsets.
model.freqItemsets.show()
// Display generated association rules.
model.associationRules.show()
// transform examines the input items against all the association rules and summarize the
// consequents as prediction
model.transform(dataset).show()
有关更多详细信息,请参阅 Java API 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.fpm.FPGrowth;
import org.apache.spark.ml.fpm.FPGrowthModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("1 2 5".split(" "))),
RowFactory.create(Arrays.asList("1 2 3 5".split(" "))),
RowFactory.create(Arrays.asList("1 2".split(" ")))
);
StructType schema = new StructType(new StructField[]{ new StructField(
"items", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> itemsDF = spark.createDataFrame(data, schema);
FPGrowthModel model = new FPGrowth()
.setItemsCol("items")
.setMinSupport(0.5)
.setMinConfidence(0.6)
.fit(itemsDF);
// Display frequent itemsets.
model.freqItemsets().show();
// Display generated association rules.
model.associationRules().show();
// transform examines the input items against all the association rules and summarize the
// consequents as prediction
model.transform(itemsDF).show();
有关更多详细信息,请参阅 R API 文档。
# Load training data
df <- selectExpr(createDataFrame(data.frame(rawItems = c(
"1,2,5", "1,2,3,5", "1,2"
))), "split(rawItems, ',') AS items")
fpm <- spark.fpGrowth(df, itemsCol="items", minSupport=0.5, minConfidence=0.6)
# Extracting frequent itemsets
spark.freqItemsets(fpm)
# Extracting association rules
spark.associationRules(fpm)
# Predict uses association rules to and combines possible consequents
predict(fpm, df)
PrefixSpan
PrefixSpan 是一种序列模式挖掘算法,在Pei 等人,《通过模式增长挖掘序列模式:PrefixSpan 方法》中有所描述。有关序列模式挖掘问题的形式化,请参阅引用的论文。
spark.ml
的 PrefixSpan 实现采用以下参数:
minSupport
: 被视为频繁序列模式所需的最小支持度。maxPatternLength
: 频繁序列模式的最大长度。任何超过此长度的频繁模式都不会包含在结果中。maxLocalProjDBSize
: 在对投影数据库进行本地迭代处理之前,前缀投影数据库中允许的最大项数。应根据执行器的大小调整此参数。sequenceCol
: 数据集中序列列的名称(默认值为“sequence”),此列中包含 null 的行将被忽略。
示例
有关更多详细信息,请参阅 Python API 文档。
from pyspark.ml.fpm import PrefixSpan
df = sc.parallelize([Row(sequence=[[1, 2], [3]]),
Row(sequence=[[1], [3, 2], [1, 2]]),
Row(sequence=[[1, 2], [5]]),
Row(sequence=[[6]])]).toDF()
prefixSpan = PrefixSpan(minSupport=0.5, maxPatternLength=5,
maxLocalProjDBSize=32000000)
# Find frequent sequential patterns.
prefixSpan.findFrequentSequentialPatterns(df).show()
有关更多详细信息,请参阅 Scala API 文档。
import org.apache.spark.ml.fpm.PrefixSpan
val smallTestData = Seq(
Seq(Seq(1, 2), Seq(3)),
Seq(Seq(1), Seq(3, 2), Seq(1, 2)),
Seq(Seq(1, 2), Seq(5)),
Seq(Seq(6)))
val df = smallTestData.toDF("sequence")
val result = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
.setMaxLocalProjDBSize(32000000)
.findFrequentSequentialPatterns(df)
.show()
有关更多详细信息,请参阅 Java API 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.fpm.PrefixSpan;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3))),
RowFactory.create(Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1,2))),
RowFactory.create(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5))),
RowFactory.create(Arrays.asList(Arrays.asList(6)))
);
StructType schema = new StructType(new StructField[]{ new StructField(
"sequence", new ArrayType(new ArrayType(DataTypes.IntegerType, true), true),
false, Metadata.empty())
});
Dataset<Row> sequenceDF = spark.createDataFrame(data, schema);
PrefixSpan prefixSpan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5);
// Finding frequent sequential patterns
prefixSpan.findFrequentSequentialPatterns(sequenceDF).show();
有关更多详细信息,请参阅 R API 文档。
# Load training data
df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
list(list(list(1L), list(3L, 2L), list(1L, 2L))),
list(list(list(1L, 2L), list(5L))),
list(list(list(6L)))),
schema = c("sequence"))
# Finding frequent sequential patterns
frequency <- spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L,
maxLocalProjDBSize = 32000000L)
showDF(frequency)