特征提取和转换 - 基于 RDD 的 API

TF-IDF

注意 我们建议使用基于 DataFrame 的 API,这在 关于 TF-IDF 的 ML 用户指南 中有详细介绍。

词频-逆文档频率 (TF-IDF) 是一种广泛用于文本挖掘的特征向量化方法,用于反映语料库中某个词语对文档的重要性。用 $t$ 表示词语,用 $d$ 表示文档,用 $D$ 表示语料库。词频 $TF(t, d)$ 是词语 $t$ 在文档 $d$ 中出现的次数,而文档频率 $DF(t, D)$ 是包含词语 $t$ 的文档数量。如果我们仅使用词频来衡量重要性,很容易过分强调那些出现频率很高但对文档信息量很小的词语,例如“a”、“the”和“of”。如果一个词语在整个语料库中出现频率很高,则意味着它没有携带关于特定文档的特殊信息。逆文档频率是衡量词语提供多少信息的数值指标:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] 其中 $|D|$ 是语料库中的文档总数。由于使用了对数,如果一个词语出现在所有文档中,则其 IDF 值变为 0。请注意,应用了一个平滑项以避免对语料库之外的词语除以零。TF-IDF 度量只是 TF 和 IDF 的乘积:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 词频和文档频率的定义有多种变体。在 spark.mllib 中,我们将 TF 和 IDF 分开以使其更加灵活。

我们对词频的实现利用了 哈希技巧。原始特征通过应用哈希函数映射到索引(词语)。然后根据映射的索引计算词频。这种方法避免了计算全局词语到索引映射的需要,这对于大型语料库来说可能很昂贵,但它会受到潜在哈希冲突的影响,其中不同的原始特征在哈希后可能会变成相同的词语。为了减少冲突的可能性,我们可以增加目标特征维度,即哈希表的桶数。默认特征维度为 $2^{20} = 1,048,576$

注意: spark.mllib 不提供文本分词工具。我们建议用户参考 斯坦福 NLP 组scalanlp/chalk

TF 和 IDF 在 HashingTFIDF 中实现。HashingTF 将列表的 RDD 作为输入。每条记录可以是字符串或其他类型的可迭代对象。

有关 API 的详细信息,请参阅 HashingTF Python 文档

from pyspark.mllib.feature import HashingTF, IDF

# Load documents (one per line).
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))

hashingTF = HashingTF()
tf = hashingTF.transform(documents)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
在 Spark 存储库的“examples/src/main/python/mllib/tf_idf_example.py”中查找完整的示例代码。

TF 和 IDF 在 HashingTFIDF 中实现。HashingTFRDD[Iterable[_]] 作为输入。每条记录可以是字符串或其他类型的可迭代对象。

有关 API 的详细信息,请参阅 HashingTF Scala 文档

import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
  .map(_.split(" ").toSeq)

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)

// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
// First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

// spark.mllib IDF implementation provides an option for ignoring terms which occur in less than
// a minimum number of documents. In such cases, the IDF for these terms is set to 0.
// This feature can be used by passing the minDocFreq value to the IDF constructor.
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/mllib/TFIDFExample.scala”中查找完整的示例代码。

Word2Vec

Word2Vec 计算单词的分布式向量表示。分布式表示的主要优点是,相似的单词在向量空间中彼此靠近,这使得对新模式的泛化更容易,模型估计更稳健。分布式向量表示在许多自然语言处理应用中都很有用,例如命名实体识别、消歧、解析、标记和机器翻译。

模型

在我们对 Word2Vec 的实现中,我们使用了 skip-gram 模型。skip-gram 的训练目标是学习能够很好地预测同一句话中上下文环境的单词向量表示。从数学上讲,给定一系列训练单词 $w_1, w_2, \dots, w_T$,skip-gram 模型的目标是最大化平均对数似然 \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \] 其中 $k$ 是训练窗口的大小。

在 skip-gram 模型中,每个单词 $w$ 都与两个向量 $u_w$ 和 $v_w$ 相关联,它们分别是 $w$ 作为单词和上下文环境的向量表示。正确预测单词 $w_j$ 给定单词 $w_i$ 的概率由 softmax 模型确定,即 \[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \] 其中 $V$ 是词汇量大小。

使用 softmax 的 skip-gram 模型很昂贵,因为计算 $\log p(w_i | w_j)$ 的成本与 $V$ 成正比,而 $V$ 很容易达到数百万的量级。为了加快 Word2Vec 的训练速度,我们使用了分层 softmax,这将计算 $\log p(w_i | w_j)$ 的复杂度降低到 $O(\log(V))$

示例

下面的示例演示了如何加载文本文件,将其解析为 Seq[String] 的 RDD,构造 Word2Vec 实例,然后使用输入数据拟合 Word2VecModel。最后,我们显示指定单词的前 40 个同义词。要运行该示例,请先下载 text8 数据并将其解压缩到您首选的目录。这里我们假设解压缩后的文件名为 text8,并且与您运行 spark shell 的目录相同。

有关 API 的更多详细信息,请参阅 Word2Vec Python 文档

from pyspark.mllib.feature import Word2Vec

inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))

word2vec = Word2Vec()
model = word2vec.fit(inp)

synonyms = model.findSynonyms('1', 5)

for word, cosine_distance in synonyms:
    print("{}: {}".format(word, cosine_distance))
在 Spark 存储库的“examples/src/main/python/mllib/word2vec_example.py”中查找完整的示例代码。

有关 API 的详细信息,请参阅 Word2Vec Scala 文档

import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("1", 5)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/mllib/Word2VecExample.scala”中查找完整的示例代码。

StandardScaler

通过使用训练集中样本的列汇总统计信息缩放到单位方差和/或移除均值来标准化特征。这是一个非常常见的预处理步骤。

例如,当所有特征都具有单位方差和/或零均值时,支持向量机的 RBF 核或 L1 和 L2 正则化线性模型通常效果更好。

标准化可以提高优化过程中的收敛速度,还可以防止方差非常大的特征在模型训练过程中产生过大的影响。

模型拟合

StandardScaler 在构造函数中具有以下参数

我们在 StandardScaler 中提供了一个 fit 方法,它可以接受 RDD[Vector] 的输入,学习汇总统计信息,然后返回一个模型,该模型可以将输入数据集转换为单位标准偏差和/或零均值特征,具体取决于我们如何配置 StandardScaler

此模型实现了 VectorTransformer,它可以对 Vector 应用标准化以生成转换后的 Vector,或者对 RDD[Vector] 应用标准化以生成转换后的 RDD[Vector]

注意,如果某一特征的方差为零,则该特征在 Vector 中将返回默认值 0.0

示例

以下示例演示了如何加载 libsvm 格式的数据集,并对特征进行标准化,以便新特征具有单位标准差和/或零均值。

有关 API 的更多详细信息,请参阅 StandardScaler Python 文档

from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)

# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))

# data2 will be unit variance and zero mean.
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
在 Spark 代码库的 “examples/src/main/python/mllib/standard_scaler_example.py” 中查找完整的示例代码。

有关 API 的详细信息,请参阅 StandardScaler Scala 文档

import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 is an identical model to scaler2, and will produce identical transformations
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)

// data1 will be unit variance.
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))

// data2 will be unit variance and zero mean.
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/mllib/StandardScalerExample.scala” 中查找完整的示例代码。

Normalizer

Normalizer 将各个样本缩放为单位 $L^p$ 范数。 这是文本分类或聚类的常见操作。 例如,两个 $L^2$ 归一化 TF-IDF 向量的点积是向量的余弦相似度。

Normalizer 在构造函数中具有以下参数

Normalizer 实现了 VectorTransformer,它可以将归一化应用于 Vector 以生成转换后的 Vector,或应用于 RDD[Vector] 以生成转换后的 RDD[Vector]

请注意,如果输入的范数为零,它将返回输入向量。

示例

以下示例演示了如何加载 libsvm 格式的数据集,并使用 $L^2$ 范数和 $L^\infty$ 范数对特征进行归一化。

有关 API 的更多详细信息,请参阅 Normalizer Python 文档

from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))

# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))

# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))
在 Spark 代码库的 “examples/src/main/python/mllib/normalizer_example.py” 中查找完整的示例代码。

有关 API 的详细信息,请参阅 Normalizer Scala 文档

import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)

// Each sample in data1 will be normalized using $L^2$ norm.
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))

// Each sample in data2 will be normalized using $L^\infty$ norm.
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/mllib/NormalizerExample.scala” 中查找完整的示例代码。

ChiSqSelector

特征选择 尝试识别用于模型构建的相关特征。 它减少了特征空间的大小,这可以提高速度和统计学习行为。

ChiSqSelector 实现了卡方特征选择。 它对具有分类特征的标记数据进行操作。 ChiSqSelector 使用 卡方独立性检验 来决定选择哪些特征。 它支持五种选择方法:numTopFeaturespercentilefprfdrfwe

默认情况下,选择方法是 numTopFeatures,默认的顶部特征数量设置为 50。用户可以使用 setSelectorType 选择选择方法。

可以使用留出的验证集来调整要选择的特征数量。

模型拟合

fit 方法接受具有分类特征的 RDD[LabeledPoint] 的输入,学习汇总统计信息,然后返回一个 ChiSqSelectorModel,它可以将输入数据集转换为缩减的特征空间。 ChiSqSelectorModel 可以应用于 Vector 以生成缩减的 Vector,或应用于 RDD[Vector] 以生成缩减的 RDD[Vector]

请注意,用户还可以通过提供选定特征索引数组(必须按升序排序)来手动构建 ChiSqSelectorModel

示例

以下示例显示了 ChiSqSelector 的基本用法。 使用的数据集具有一个特征矩阵,该矩阵由每个特征的灰度值组成,范围从 0 到 255。

有关 API 的详细信息,请参阅 ChiSqSelector Scala 文档

import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

// Load some data in libsvm format
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
val discretizedData = data.map { lp =>
  LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer = selector.fit(discretizedData)
// Filter the top 50 features from each feature vector
val filteredData = discretizedData.map { lp =>
  LabeledPoint(lp.label, transformer.transform(lp.features))
}
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/mllib/ChiSqSelectorExample.scala” 中查找完整的示例代码。

有关 API 的详细信息,请参阅 ChiSqSelector Java 文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
  "data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();

// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Although features are doubles, the ChiSqSelector treats each unique value as a category
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
  double[] discretizedFeatures = new double[lp.features().size()];
  for (int i = 0; i < lp.features().size(); ++i) {
    discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
  }
  return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});

// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
  new LabeledPoint(lp.label(), transformer.transform(lp.features())));
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java” 中查找完整的示例代码。

ElementwiseProduct

ElementwiseProduct 使用元素乘法将每个输入向量乘以提供的“权重”向量。 换句话说,它通过标量乘数缩放数据集的每一列。 这表示输入向量 v 和变换向量 scalingVec 之间的 哈达玛积,以产生结果向量。

scalingVec 表示为“w”,此变换可以写为

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

ElementwiseProduct 在构造函数中具有以下参数

ElementwiseProduct 实现了 VectorTransformer,它可以将加权应用于 Vector 以生成转换后的 Vector,或应用于 RDD[Vector] 以生成转换后的 RDD[Vector]

示例

以下示例演示了如何使用变换向量值变换向量。

有关 API 的更多详细信息,请参阅 ElementwiseProduct Python 文档

from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors

data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])

# Create weight vector.
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)

# Batch transform
transformedData = transformer.transform(parsedData)
# Single-row transform
transformedData2 = transformer.transform(parsedData.first())
在 Spark 代码库的 “examples/src/main/python/mllib/elementwise_product_example.py” 中查找完整的示例代码。

有关 API 的详细信息,请参阅 ElementwiseProduct Scala 文档

import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors

// Create some vector data; also works for sparse vectors
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)

// Batch transform and per-row transform give the same results:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/mllib/ElementwiseProductExample.scala” 中查找完整的示例代码。

有关 API 的详细信息,请参阅 ElementwiseProduct Java 文档

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Create some vector data; also works for sparse vectors
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
  Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);

// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java” 中查找完整的示例代码。

PCA

一种特征转换器,使用 PCA 将向量投影到低维空间。 您可以阅读 降维 了解更多详细信息。