特征提取和转换 - 基于 RDD 的 API
TF-IDF
注意 我们推荐使用基于 DataFrame 的 API,详细信息请参见 ML 用户指南中的 TF-IDF 部分。
词频-逆文档频率 (TF-IDF) 是一种在文本挖掘中广泛使用的特征向量化方法,用于反映一个词语在语料库中对文档的重要性。将词语表示为 $t$
,文档表示为 $d$
,语料库表示为 $D$
。词频 $TF(t, d)$
是词语 $t$
在文档 $d$
中出现的次数,而文档频率 $DF(t, D)$
是包含词语 $t$
的文档数量。如果我们只使用词频来衡量重要性,就很容易过度强调那些出现频繁但对文档信息量很少的词语,例如“a”、“the”和“of”。如果一个词语在整个语料库中出现得很频繁,这意味着它对特定文档不具有特殊信息。逆文档频率是一个数值度量,表示一个词语提供的信息量:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
其中 $|D|$
是语料库中文档的总数。由于使用了对数,如果一个词语出现在所有文档中,其 IDF 值将变为 0。注意,应用了一个平滑项,以避免对语料库以外的词语进行除零操作。TF-IDF 度量简单地是 TF 和 IDF 的乘积:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
词频和文档频率的定义有多种变体。在 spark.mllib
中,我们将 TF 和 IDF 分开,以使其更加灵活。
我们的词频实现利用了哈希技巧。原始特征通过应用哈希函数被映射到一个索引(词语)。然后根据映射的索引计算词频。这种方法避免了计算全局词语到索引映射的需要,这对于大型语料库来说可能成本很高,但它存在潜在的哈希冲突问题,即不同的原始特征在哈希后可能变成相同的词语。为了减少冲突的可能性,我们可以增加目标特征维度,即哈希表的桶数量。默认的特征维度是 $2^{20} = 1,048,576$
。
注意: spark.mllib
不提供文本分词工具。我们建议用户参考斯坦福大学 NLP 小组和scalanlp/chalk。
TF 和 IDF 在 HashingTF 和 IDF 中实现。HashingTF
接受一个列表 RDD 作为输入。每个记录可以是一个字符串或其他类型的可迭代对象。
有关 API 的详细信息,请参见 HashingTF
Python 文档。
from pyspark.mllib.feature import HashingTF, IDF
# Load documents (one per line).
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
TF 和 IDF 在 HashingTF 和 IDF 中实现。HashingTF
接受一个 RDD[Iterable[_]]
作为输入。每个记录可以是一个字符串或其他类型的可迭代对象。
有关 API 的详细信息,请参见 HashingTF
Scala 文档。
import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
.map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
// First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
// spark.mllib IDF implementation provides an option for ignoring terms which occur in less than
// a minimum number of documents. In such cases, the IDF for these terms is set to 0.
// This feature can be used by passing the minDocFreq value to the IDF constructor.
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
Word2Vec
Word2Vec 计算词语的分布式向量表示。分布式表示的主要优点是相似的词语在向量空间中距离相近,这使得对新模式的泛化更容易,模型估计更稳健。分布式向量表示在许多自然语言处理应用中显示出其有用性,例如命名实体识别、消歧、解析、标注和机器翻译。
模型
在我们的 Word2Vec 实现中,我们使用了 skip-gram 模型。skip-gram 的训练目标是学习能够很好地预测同一句子中词语上下文的词向量表示。数学上,给定一个训练词序列 $w_1, w_2, \dots, w_T$
,skip-gram 模型的目标是最大化平均对数似然 \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \]
,其中 $k$ 是训练窗口的大小。
在 skip-gram 模型中,每个词 $w$ 都关联着两个向量 $u_w$ 和 $v_w$,它们分别是 $w$ 作为词语和上下文的向量表示。在给定词 $w_j$ 的情况下正确预测词 $w_i$ 的概率由 softmax 模型确定,即 \[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \]
,其中 $V$ 是词汇表大小。
带有 softmax 的 skip-gram 模型计算成本很高,因为计算 $\log p(w_i | w_j)$ 的成本与 $V$ 成正比,而 $V$ 可以轻易达到数百万。为了加速 Word2Vec 的训练,我们使用了分层 softmax,这将计算 $\log p(w_i | w_j)$ 的复杂性降低到 $O(\log(V))$。
示例
以下示例演示了如何加载文本文件,将其解析为 Seq[String]
的 RDD,构建 Word2Vec
实例,然后使用输入数据拟合 Word2VecModel
。最后,我们显示指定词语的 top 40 个同义词。要运行此示例,请首先下载 text8 数据并将其解压到您偏好的目录。这里我们假设解压后的文件是 text8
,并且与您运行 spark shell 的目录相同。
有关 API 的更多详细信息,请参见 Word2Vec
Python 文档。
from pyspark.mllib.feature import Word2Vec
inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)
synonyms = model.findSynonyms('1', 5)
for word, cosine_distance in synonyms:
print("{}: {}".format(word, cosine_distance))
有关 API 的详细信息,请参见 Word2Vec
Scala 文档。
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("1", 5)
for ((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")
StandardScaler
通过对训练集中的样本使用列汇总统计信息,将特征缩放至单位方差和/或移除均值以进行标准化。这是非常常见的预处理步骤。
例如,支持向量机的 RBF 核或 L1 和 L2 正则化线性模型通常在所有特征具有单位方差和/或零均值时表现更好。
标准化可以提高优化过程中的收敛速度,并防止具有非常大方差的特征在模型训练中施加过大的影响。
模型拟合
StandardScaler
在构造函数中具有以下参数:
withMean
默认值为 False。在缩放之前,使用均值对数据进行中心化。它会生成密集输出,因此在应用于稀疏输入时请注意。withStd
默认值为 True。将数据缩放至单位标准差。
我们在 StandardScaler
中提供了一个 fit
方法,它可以接受一个 RDD[Vector]
作为输入,学习汇总统计信息,然后返回一个模型,该模型可以根据我们如何配置 StandardScaler
,将输入数据集转换为单位标准差和/或零均值的特征。
该模型实现了 VectorTransformer
,它可以对 Vector
应用标准化以生成转换后的 Vector
,或对 RDD[Vector]
应用标准化以生成转换后的 RDD[Vector]
。
请注意,如果特征的方差为零,则该特征在 Vector
中将返回默认值 0.0
。
示例
以下示例演示了如何加载 libsvm 格式的数据集,并标准化特征,使新特征具有单位标准差和/或零均值。
有关 API 的更多详细信息,请参见 StandardScaler
Python 文档。
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
有关 API 的详细信息,请参见 StandardScaler
Scala 文档。
import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 is an identical model to scaler2, and will produce identical transformations
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)
// data1 will be unit variance.
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
// data2 will be unit variance and zero mean.
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
Normalizer
Normalizer 将单个样本缩放至单位 $L^p$ 范数。这在文本分类或聚类中是一种常见的操作。例如,两个 $L^2$ 归一化 TF-IDF 向量的点积就是向量的余弦相似度。
Normalizer
在构造函数中具有以下参数:
p
为 $L^p$ 空间中的归一化,默认值为 $p = 2$。
Normalizer
实现了 VectorTransformer
,它可以对 Vector
应用归一化以生成转换后的 Vector
,或对 RDD[Vector]
应用归一化以生成转换后的 RDD[Vector]
。
请注意,如果输入的范数为零,它将返回输入向量。
示例
以下示例演示了如何加载 libsvm 格式的数据集,并使用 $L^2$ 范数和 $L^\infty$ 范数对特征进行归一化。
有关 API 的更多详细信息,请参见 Normalizer
Python 文档。
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))
# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
有关 API 的详细信息,请参见 Normalizer
Scala 文档。
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
// Each sample in data1 will be normalized using $L^2$ norm.
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
// Each sample in data2 will be normalized using $L^\infty$ norm.
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
ChiSqSelector
特征选择旨在识别用于模型构建的相关特征。它减少了特征空间的维度,可以同时提高速度和统计学习行为。
ChiSqSelector
实现了卡方特征选择。它对带有类别特征的标注数据进行操作。ChiSqSelector 使用卡方独立性检验来决定选择哪些特征。它支持五种选择方法:numTopFeatures
(选择前 N 个特征),percentile
(百分位数),fpr
(误报率),fdr
(错误发现率),fwe
(家族错误率)。
numTopFeatures
根据卡方检验选择固定数量的顶部特征。这类似于选择具有最强预测能力的特征。percentile
与numTopFeatures
类似,但选择所有特征的一个分数(比例)而不是固定数量。fpr
选择所有 p 值低于阈值的特征,从而控制选择的误报率。fdr
使用Benjamini-Hochberg 过程来选择所有错误发现率低于阈值的特征。fwe
选择所有 p 值低于阈值的特征。阈值按 1/特征数量 进行缩放,从而控制选择的家族错误率。
默认情况下,选择方法是 numTopFeatures
,默认的顶部特征数量设置为 50。用户可以使用 setSelectorType
选择一种选择方法。
要选择的特征数量可以使用保留验证集进行调整。
模型拟合
fit
方法接受一个带有类别特征的 RDD[LabeledPoint]
作为输入,学习汇总统计信息,然后返回一个 ChiSqSelectorModel
,该模型可以将输入数据集转换为降维后的特征空间。ChiSqSelectorModel
可以应用于 Vector
以生成降维后的 Vector
,或应用于 RDD[Vector]
以生成降维后的 RDD[Vector]
。
请注意,用户也可以通过提供一个选定特征索引数组(必须按升序排序)来手动构建 ChiSqSelectorModel
。
示例
以下示例展示了 ChiSqSelector 的基本用法。使用的数据集具有一个特征矩阵,其中每个特征的灰度值范围从 0 到 255。
有关 API 的详细信息,请参见 ChiSqSelector
Scala 文档。
import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// Load some data in libsvm format
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
val discretizedData = data.map { lp =>
LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer = selector.fit(discretizedData)
// Filter the top 50 features from each feature vector
val filteredData = discretizedData.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
有关 API 的详细信息,请参见 ChiSqSelector
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
"data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Although features are doubles, the ChiSqSelector treats each unique value as a category
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
double[] discretizedFeatures = new double[lp.features().size()];
for (int i = 0; i < lp.features().size(); ++i) {
discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});
// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
new LabeledPoint(lp.label(), transformer.transform(lp.features())));
ElementwiseProduct
ElementwiseProduct
使用逐元素乘法,将每个输入向量乘以一个提供的“权重”向量。换句话说,它通过一个标量乘数来缩放数据集的每一列。这表示输入向量 v
和转换向量 scalingVec
之间的阿达玛积,从而得到结果向量。
将 scalingVec
表示为“w
”,此转换可以写为:
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
ElementwiseProduct
在构造函数中具有以下参数:
scalingVec
:转换向量。
ElementwiseProduct
实现了 VectorTransformer
,它可以对 Vector
应用加权以生成转换后的 Vector
,或对 RDD[Vector]
应用加权以生成转换后的 RDD[Vector]
。
示例
以下示例演示了如何使用转换向量值来转换向量。
有关 API 的更多详细信息,请参见 ElementwiseProduct
Python 文档。
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])
# Create weight vector.
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)
# Batch transform
transformedData = transformer.transform(parsedData)
# Single-row transform
transformedData2 = transformer.transform(parsedData.first())
有关 API 的详细信息,请参见 ElementwiseProduct
Scala 文档。
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// Create some vector data; also works for sparse vectors
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)
// Batch transform and per-row transform give the same results:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))
有关 API 的详细信息,请参见 ElementwiseProduct
Java 文档。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Create some vector data; also works for sparse vectors
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
PCA
一个使用 PCA 将向量投影到低维空间的特征转换器。详细信息请参见降维。