特征提取和转换 - 基于 RDD 的 API
TF-IDF
注意 我们建议使用基于 DataFrame 的 API,这在 关于 TF-IDF 的 ML 用户指南 中有详细介绍。
词频-逆文档频率 (TF-IDF) 是一种广泛用于文本挖掘的特征向量化方法,用于反映语料库中某个词语对文档的重要性。用 $t$
表示词语,用 $d$
表示文档,用 $D$
表示语料库。词频 $TF(t, d)$
是词语 $t$
在文档 $d$
中出现的次数,而文档频率 $DF(t, D)$
是包含词语 $t$
的文档数量。如果我们仅使用词频来衡量重要性,很容易过分强调那些出现频率很高但对文档信息量很小的词语,例如“a”、“the”和“of”。如果一个词语在整个语料库中出现频率很高,则意味着它没有携带关于特定文档的特殊信息。逆文档频率是衡量词语提供多少信息的数值指标:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
其中 $|D|$
是语料库中的文档总数。由于使用了对数,如果一个词语出现在所有文档中,则其 IDF 值变为 0。请注意,应用了一个平滑项以避免对语料库之外的词语除以零。TF-IDF 度量只是 TF 和 IDF 的乘积:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
词频和文档频率的定义有多种变体。在 spark.mllib
中,我们将 TF 和 IDF 分开以使其更加灵活。
我们对词频的实现利用了 哈希技巧。原始特征通过应用哈希函数映射到索引(词语)。然后根据映射的索引计算词频。这种方法避免了计算全局词语到索引映射的需要,这对于大型语料库来说可能很昂贵,但它会受到潜在哈希冲突的影响,其中不同的原始特征在哈希后可能会变成相同的词语。为了减少冲突的可能性,我们可以增加目标特征维度,即哈希表的桶数。默认特征维度为 $2^{20} = 1,048,576$
。
注意: spark.mllib
不提供文本分词工具。我们建议用户参考 斯坦福 NLP 组 和 scalanlp/chalk。
TF 和 IDF 在 HashingTF 和 IDF 中实现。HashingTF
将列表的 RDD 作为输入。每条记录可以是字符串或其他类型的可迭代对象。
有关 API 的详细信息,请参阅 HashingTF
Python 文档。
from pyspark.mllib.feature import HashingTF, IDF
# Load documents (one per line).
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
TF 和 IDF 在 HashingTF 和 IDF 中实现。HashingTF
将 RDD[Iterable[_]]
作为输入。每条记录可以是字符串或其他类型的可迭代对象。
有关 API 的详细信息,请参阅 HashingTF
Scala 文档。
import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
.map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
// First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
// spark.mllib IDF implementation provides an option for ignoring terms which occur in less than
// a minimum number of documents. In such cases, the IDF for these terms is set to 0.
// This feature can be used by passing the minDocFreq value to the IDF constructor.
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
Word2Vec
Word2Vec 计算单词的分布式向量表示。分布式表示的主要优点是,相似的单词在向量空间中彼此靠近,这使得对新模式的泛化更容易,模型估计更稳健。分布式向量表示在许多自然语言处理应用中都很有用,例如命名实体识别、消歧、解析、标记和机器翻译。
模型
在我们对 Word2Vec 的实现中,我们使用了 skip-gram 模型。skip-gram 的训练目标是学习能够很好地预测同一句话中上下文环境的单词向量表示。从数学上讲,给定一系列训练单词 $w_1, w_2, \dots, w_T$
,skip-gram 模型的目标是最大化平均对数似然 \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \]
其中 $k$ 是训练窗口的大小。
在 skip-gram 模型中,每个单词 $w$ 都与两个向量 $u_w$ 和 $v_w$ 相关联,它们分别是 $w$ 作为单词和上下文环境的向量表示。正确预测单词 $w_j$ 给定单词 $w_i$ 的概率由 softmax 模型确定,即 \[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \]
其中 $V$ 是词汇量大小。
使用 softmax 的 skip-gram 模型很昂贵,因为计算 $\log p(w_i | w_j)$ 的成本与 $V$ 成正比,而 $V$ 很容易达到数百万的量级。为了加快 Word2Vec 的训练速度,我们使用了分层 softmax,这将计算 $\log p(w_i | w_j)$ 的复杂度降低到 $O(\log(V))$
示例
下面的示例演示了如何加载文本文件,将其解析为 Seq[String]
的 RDD,构造 Word2Vec
实例,然后使用输入数据拟合 Word2VecModel
。最后,我们显示指定单词的前 40 个同义词。要运行该示例,请先下载 text8 数据并将其解压缩到您首选的目录。这里我们假设解压缩后的文件名为 text8
,并且与您运行 spark shell 的目录相同。
有关 API 的更多详细信息,请参阅 Word2Vec
Python 文档。
from pyspark.mllib.feature import Word2Vec
inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)
synonyms = model.findSynonyms('1', 5)
for word, cosine_distance in synonyms:
print("{}: {}".format(word, cosine_distance))
有关 API 的详细信息,请参阅 Word2Vec
Scala 文档。
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("1", 5)
for((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")
StandardScaler
通过使用训练集中样本的列汇总统计信息缩放到单位方差和/或移除均值来标准化特征。这是一个非常常见的预处理步骤。
例如,当所有特征都具有单位方差和/或零均值时,支持向量机的 RBF 核或 L1 和 L2 正则化线性模型通常效果更好。
标准化可以提高优化过程中的收敛速度,还可以防止方差非常大的特征在模型训练过程中产生过大的影响。
模型拟合
StandardScaler
在构造函数中具有以下参数
withMean
默认值为 False。在缩放之前,使用均值对数据进行中心化。它将构建一个密集的输出,因此在应用于稀疏输入时要小心。withStd
默认值为 True。将数据缩放到单位标准偏差。
我们在 StandardScaler
中提供了一个 fit
方法,它可以接受 RDD[Vector]
的输入,学习汇总统计信息,然后返回一个模型,该模型可以将输入数据集转换为单位标准偏差和/或零均值特征,具体取决于我们如何配置 StandardScaler
。
此模型实现了 VectorTransformer
,它可以对 Vector
应用标准化以生成转换后的 Vector
,或者对 RDD[Vector]
应用标准化以生成转换后的 RDD[Vector]
。
注意,如果某一特征的方差为零,则该特征在 Vector
中将返回默认值 0.0
。
示例
以下示例演示了如何加载 libsvm 格式的数据集,并对特征进行标准化,以便新特征具有单位标准差和/或零均值。
有关 API 的更多详细信息,请参阅 StandardScaler
Python 文档。
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
有关 API 的详细信息,请参阅 StandardScaler
Scala 文档。
import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 is an identical model to scaler2, and will produce identical transformations
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)
// data1 will be unit variance.
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
// data2 will be unit variance and zero mean.
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
Normalizer
Normalizer 将各个样本缩放为单位 $L^p$ 范数。 这是文本分类或聚类的常见操作。 例如,两个 $L^2$ 归一化 TF-IDF 向量的点积是向量的余弦相似度。
Normalizer
在构造函数中具有以下参数
p
$L^p$ 空间中的归一化,默认情况下 $p = 2$。
Normalizer
实现了 VectorTransformer
,它可以将归一化应用于 Vector
以生成转换后的 Vector
,或应用于 RDD[Vector]
以生成转换后的 RDD[Vector]
。
请注意,如果输入的范数为零,它将返回输入向量。
示例
以下示例演示了如何加载 libsvm 格式的数据集,并使用 $L^2$ 范数和 $L^\infty$ 范数对特征进行归一化。
有关 API 的更多详细信息,请参阅 Normalizer
Python 文档。
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))
# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
有关 API 的详细信息,请参阅 Normalizer
Scala 文档。
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
// Each sample in data1 will be normalized using $L^2$ norm.
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
// Each sample in data2 will be normalized using $L^\infty$ norm.
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
ChiSqSelector
特征选择 尝试识别用于模型构建的相关特征。 它减少了特征空间的大小,这可以提高速度和统计学习行为。
ChiSqSelector
实现了卡方特征选择。 它对具有分类特征的标记数据进行操作。 ChiSqSelector 使用 卡方独立性检验 来决定选择哪些特征。 它支持五种选择方法:numTopFeatures
、percentile
、fpr
、fdr
、fwe
numTopFeatures
根据卡方检验选择固定数量的顶部特征。 这类似于产生具有最大预测能力的特征。percentile
类似于numTopFeatures
,但选择所有特征的一部分而不是固定数量。fpr
选择 p 值低于阈值的所有特征,从而控制选择的误报率。fdr
使用 Benjamini-Hochberg 程序 来选择错误发现率低于阈值的所有特征。fwe
选择 p 值低于阈值的所有特征。 阈值按 1/numFeatures 进行缩放,从而控制选择的家庭错误率。
默认情况下,选择方法是 numTopFeatures
,默认的顶部特征数量设置为 50。用户可以使用 setSelectorType
选择选择方法。
可以使用留出的验证集来调整要选择的特征数量。
模型拟合
fit
方法接受具有分类特征的 RDD[LabeledPoint]
的输入,学习汇总统计信息,然后返回一个 ChiSqSelectorModel
,它可以将输入数据集转换为缩减的特征空间。 ChiSqSelectorModel
可以应用于 Vector
以生成缩减的 Vector
,或应用于 RDD[Vector]
以生成缩减的 RDD[Vector]
。
请注意,用户还可以通过提供选定特征索引数组(必须按升序排序)来手动构建 ChiSqSelectorModel
。
示例
以下示例显示了 ChiSqSelector 的基本用法。 使用的数据集具有一个特征矩阵,该矩阵由每个特征的灰度值组成,范围从 0 到 255。
有关 API 的详细信息,请参阅 ChiSqSelector
Scala 文档。
import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// Load some data in libsvm format
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
val discretizedData = data.map { lp =>
LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer = selector.fit(discretizedData)
// Filter the top 50 features from each feature vector
val filteredData = discretizedData.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
有关 API 的详细信息,请参阅 ChiSqSelector
Java 文档。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
"data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Although features are doubles, the ChiSqSelector treats each unique value as a category
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
double[] discretizedFeatures = new double[lp.features().size()];
for (int i = 0; i < lp.features().size(); ++i) {
discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});
// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
new LabeledPoint(lp.label(), transformer.transform(lp.features())));
ElementwiseProduct
ElementwiseProduct
使用元素乘法将每个输入向量乘以提供的“权重”向量。 换句话说,它通过标量乘数缩放数据集的每一列。 这表示输入向量 v
和变换向量 scalingVec
之间的 哈达玛积,以产生结果向量。
将 scalingVec
表示为“w
”,此变换可以写为
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
ElementwiseProduct
在构造函数中具有以下参数
scalingVec
:变换向量。
ElementwiseProduct
实现了 VectorTransformer
,它可以将加权应用于 Vector
以生成转换后的 Vector
,或应用于 RDD[Vector]
以生成转换后的 RDD[Vector]
。
示例
以下示例演示了如何使用变换向量值变换向量。
有关 API 的更多详细信息,请参阅 ElementwiseProduct
Python 文档。
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])
# Create weight vector.
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)
# Batch transform
transformedData = transformer.transform(parsedData)
# Single-row transform
transformedData2 = transformer.transform(parsedData.first())
有关 API 的详细信息,请参阅 ElementwiseProduct
Scala 文档。
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// Create some vector data; also works for sparse vectors
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)
// Batch transform and per-row transform give the same results:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))
有关 API 的详细信息,请参阅 ElementwiseProduct
Java 文档。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Create some vector data; also works for sparse vectors
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
PCA
一种特征转换器,使用 PCA 将向量投影到低维空间。 您可以阅读 降维 了解更多详细信息。