特征提取、转换和选择

本节涵盖了处理特征的算法,大致分为以下几组:

目录

特征提取器

TF-IDF

词频-逆文档频率(TF-IDF)是一种广泛应用于文本挖掘的特征向量化方法,用于反映语料库中一个词对于文档的重要性。将词表示为 $t$,文档表示为 $d$,语料库表示为 $D$。词频 $TF(t, d)$ 是词 $t$ 在文档 $d$ 中出现的次数,而文档频率 $DF(t, D)$ 是包含词 $t$ 的文档数量。如果仅使用词频来衡量重要性,则很容易过分强调那些出现频率很高但对文档信息量很少的词,例如“a”、“the”和“of”。如果一个词在整个语料库中出现频率很高,这意味着它对特定文档没有携带特殊信息。逆文档频率是一种衡量词提供信息量的数值:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] 其中 $|D|$ 是语料库中文档的总数。由于使用了对数,如果一个词出现在所有文档中,其 IDF 值将变为 0。请注意,为了避免语料库外词项的除以零错误,这里应用了一个平滑项。TF-IDF 衡量方法就是 TF 和 IDF 的乘积:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 词频和文档频率的定义有多种变体。在 MLlib 中,我们将 TF 和 IDF 分开,以使其更加灵活。

TFHashingTFCountVectorizer 都可以用来生成词频向量。

HashingTF 是一个 Transformer,它接受一组词项并将其转换为固定长度的特征向量。在文本处理中,“一组词项”可能是一个词袋。HashingTF 利用了哈希技巧。原始特征通过应用哈希函数映射到一个索引(词项)。这里使用的哈希函数是 MurmurHash 3。然后根据映射的索引计算词频。这种方法避免了计算全局词项到索引映射的需求,这对于大型语料库来说可能开销很大,但它会受到潜在哈希冲突的影响,即不同的原始特征在哈希后可能变成相同的词项。为了减少冲突的可能性,我们可以增加目标特征维度,即哈希表的桶数量。由于使用哈希值的简单模运算来确定向量索引,建议将特征维度设置为2的幂,否则特征将不会均匀映射到向量索引。默认特征维度是 $2^{18} = 262,144$。一个可选的二进制开关参数控制词频计数。当设置为 true 时,所有非零频率计数都设置为 1。这对于建模二进制而非整数计数的离散概率模型特别有用。

CountVectorizer 将文本文档转换为词计数向量。有关更多详细信息,请参阅CountVectorizer

IDFIDF 是一个 Estimator,它在一个数据集上进行拟合并生成一个 IDFModelIDFModel 接收特征向量(通常由 HashingTFCountVectorizer 创建)并缩放每个特征。直观地说,它会降低在语料库中频繁出现的特征的权重。

注意:spark.ml 不提供文本分段工具。我们建议用户参考斯坦福 NLP 小组scalanlp/chalk

示例

在下面的代码片段中,我们从一组句子开始。我们使用 Tokenizer 将每个句子分割成单词。对于每个句子(词袋),我们使用 HashingTF 将句子哈希为一个特征向量。我们使用 IDF 重新缩放特征向量;这通常在使用文本作为特征时提高性能。然后,我们的特征向量可以传递给学习算法。

有关 API 的更多详细信息,请参阅 HashingTF Python 文档IDF Python 文档

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/tf_idf_example.py"。

有关 API 的更多详细信息,请参阅 HashingTF Scala 文档IDF Scala 文档

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = spark.createDataFrame(Seq(
  (0.0, "Hi I heard about Spark"),
  (0.0, "I wish Java could use case classes"),
  (1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)

val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)

val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala"。

有关 API 的更多详细信息,请参阅 HashingTF Java 文档IDF Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, "Hi I heard about Spark"),
  RowFactory.create(0.0, "I wish Java could use case classes"),
  RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);

int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(numFeatures);

Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors

IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);

Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaTfIdfExample.java"。

Word2Vec

Word2Vec 是一个 Estimator,它接收表示文档的词序列,并训练一个 Word2VecModel。该模型将每个词映射到一个唯一的固定大小向量。Word2VecModel 使用文档中所有词的平均值将每个文档转换为一个向量;这个向量可以作为预测、文档相似度计算等任务的特征。请参阅MLlib Word2Vec 用户指南了解更多详细信息。

示例

在下面的代码片段中,我们从一组文档开始,每个文档都表示为一个词序列。对于每个文档,我们将其转换为一个特征向量。然后,这个特征向量可以传递给学习算法。

有关 API 的更多详细信息,请参阅 Word2Vec Python 文档

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/word2vec_example.py"。

有关 API 的更多详细信息,请参阅 Word2Vec Scala 文档

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
  println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala"。

有关 API 的更多详细信息,请参阅 Word2Vec Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
  RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
  RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0);

Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);

for (Row row : result.collectAsList()) {
  List<String> text = row.getList(0);
  Vector vector = (Vector) row.get(1);
  System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaWord2VecExample.java"。

CountVectorizer

CountVectorizerCountVectorizerModel 旨在帮助将文本文档集合转换为词元计数向量。当没有先验词典时,CountVectorizer 可以用作一个 Estimator 来提取词汇表,并生成一个 CountVectorizerModel。该模型为文档生成基于词汇表的稀疏表示,然后可以将其传递给其他算法,如 LDA。

在拟合过程中,CountVectorizer 将选择按语料库词频排序的前 vocabSize 个词。可选参数 minDF 也通过指定词汇表中包含的词必须出现在的最少文档数量(如果小于 1.0,则为分数)来影响拟合过程。另一个可选的二进制开关参数控制输出向量。如果设置为 true,所有非零计数都设置为 1。这对于建模二进制而非整数计数的离散概率模型特别有用。

示例

假设我们有以下包含 idtexts 列的 DataFrame

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

texts 中的每一行都是一个 Array[String] 类型的文档。调用 CountVectorizer 的 fit 方法会生成一个包含词汇表 (a, b, c) 的 CountVectorizerModel。然后转换后的输出列“vector”包含

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

每个向量表示文档中词元在词汇表上的计数。

有关 API 的更多详细信息,请参阅 CountVectorizer Python 文档CountVectorizerModel Python 文档

from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/count_vectorizer_example.py"。

有关 API 的更多详细信息,请参阅 CountVectorizer Scala 文档CountVectorizerModel Scala 文档

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala"。

有关 API 的更多详细信息,请参阅 CountVectorizer Java 文档CountVectorizerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("a", "b", "c")),
  RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("feature")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df);

// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
  .setInputCol("text")
  .setOutputCol("feature");

cvModel.transform(df).show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java"。

FeatureHasher

特征哈希将一组类别或数值特征投影到指定维度的特征向量中(通常远小于原始特征空间的维度)。这是通过使用哈希技巧将特征映射到特征向量中的索引来完成的。

FeatureHasher 转换器对多个列进行操作。每个列可以包含数值或类别特征。列数据类型的行为和处理方式如下:

空值(缺失值)被忽略(在结果特征向量中隐式为零)。

这里使用的哈希函数也是 HashingTF 中使用的 MurmurHash 3。由于使用哈希值的简单模运算来确定向量索引,建议将 numFeatures 参数设置为2的幂;否则特征将不会均匀映射到向量索引。

示例

假设我们有一个包含 4 个输入列 realboolstringNumstring 的 DataFrame。这些不同的数据类型作为输入将说明转换生成特征向量列的行为。

real| bool|stringNum|string
----|-----|---------|------
 2.2| true|        1|   foo
 3.3|false|        2|   bar
 4.4|false|        3|   baz
 5.5|false|        4|   foo

那么 FeatureHasher.transform 在此 DataFrame 上的输出是

real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1        |foo   |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2        |bar   |(262144,[6031,  80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3        |baz   |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4        |foo   |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])

生成的特征向量随后可以传递给学习算法。

有关 API 的更多详细信息,请参阅 FeatureHasher Python 文档

from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/feature_hasher_example.py"。

有关 API 的更多详细信息,请参阅 FeatureHasher Scala 文档

import org.apache.spark.ml.feature.FeatureHasher

val dataset = spark.createDataFrame(Seq(
  (2.2, true, "1", "foo"),
  (3.3, false, "2", "bar"),
  (4.4, false, "3", "baz"),
  (5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")

val hasher = new FeatureHasher()
  .setInputCols("real", "bool", "stringNum", "string")
  .setOutputCol("features")

val featurized = hasher.transform(dataset)
featurized.show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/FeatureHasherExample.scala"。

有关 API 的更多详细信息,请参阅 FeatureHasher Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(2.2, true, "1", "foo"),
  RowFactory.create(3.3, false, "2", "bar"),
  RowFactory.create(4.4, false, "3", "baz"),
  RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
  new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
  new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
  new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);

FeatureHasher hasher = new FeatureHasher()
  .setInputCols(new String[]{"real", "bool", "stringNum", "string"})
  .setOutputCol("features");

Dataset<Row> featurized = hasher.transform(dataset);

featurized.show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaFeatureHasherExample.java"。

特征转换器

分词器

分词是将文本(例如句子)分解为单个词项(通常是单词)的过程。一个简单的 Tokenizer 类提供了此功能。下面的示例演示了如何将句子分割成单词序列。

RegexTokenizer 允许基于正则表达式 (regex) 匹配进行更高级的分词。默认情况下,参数“pattern”(正则表达式,默认值:"\\s+")用作分隔符来分割输入文本。另外,用户可以将参数“gaps”设置为 false,表示正则表达式“pattern”表示“词元”而不是分隔符,并将所有匹配的出现作为分词结果。

示例

有关 API 的更多详细信息,请参阅 Tokenizer Python 文档RegexTokenizer Python 文档

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/tokenizer_example.py"。

有关 API 的更多详细信息,请参阅 Tokenizer Scala 文档RegexTokenizer Scala 文档

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val countTokens = udf { (words: Seq[String]) => words.length }

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala"。

有关 API 的更多详细信息,请参阅 Tokenizer Java 文档RegexTokenizer Java 文档

import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.Seq;

import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(1, "I wish Java could use case classes"),
  RowFactory.create(2, "Logistic,regression,models,are,neat")
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

RegexTokenizer regexTokenizer = new RegexTokenizer()
    .setInputCol("sentence")
    .setOutputCol("words")
    .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
  "countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);

Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java"。

停用词移除器

停用词是应该从输入中排除的词,通常因为这些词出现频繁且意义不大。

StopWordsRemover 接受字符串序列作为输入(例如 Tokenizer 的输出),并从输入序列中删除所有停用词。停用词列表由 stopWords 参数指定。通过调用 StopWordsRemover.loadDefaultStopWords(language) 可以获取某些语言的默认停用词,可用选项包括“danish”、“dutch”、“english”、“finnish”、“french”、“german”、“hungarian”、“italian”、“norwegian”、“portuguese”、“russian”、“spanish”、“swedish”和“turkish”。布尔参数 caseSensitive 指示匹配是否应区分大小写(默认为 false)。

示例

假设我们有以下包含 idraw 列的 DataFrame

 id | raw
----|----------
 0  | [I, saw, the, red, balloon]
 1  | [Mary, had, a, little, lamb]

StopWordsRemover 应用于以 raw 作为输入列和 filtered 作为输出列的 DataFrame,我们应该得到以下结果:

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, balloon]  |  [saw, red, balloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

filtered 中,停用词“I”、“the”、“had”和“a”已被过滤掉。

有关 API 的更多详细信息,请参阅 StopWordsRemover Python 文档

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/stopwords_remover_example.py"。

有关 API 的更多详细信息,请参阅 StopWordsRemover Scala 文档

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "balloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala"。

有关 API 的更多详细信息,请参阅 StopWordsRemover Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered");

List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
  RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);

StructType schema = new StructType(new StructField[]{
  new StructField(
    "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaStopWordsRemoverExample.java"。

$n$-gram

一个 n-gram 是由 $n$ 个词元(通常是单词)组成的序列,其中 $n$ 为某个整数。NGram 类可用于将输入特征转换为 $n$-gram。

NGram 接受字符串序列作为输入(例如 Tokenizer 的输出)。参数 n 用于确定每个 $n$-gram 中的词项数量。输出将包含一个 $n$-gram 序列,其中每个 $n$-gram 由 $n$ 个连续单词组成的空格分隔字符串表示。如果输入序列包含的字符串少于 n 个,则不产生任何输出。

示例

有关 API 的更多详细信息,请参阅 NGram Python 文档

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/n_gram_example.py"。

有关 API 的更多详细信息,请参阅 NGram Scala 文档

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala"。

有关 API 的更多详细信息,请参阅 NGram Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
  RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
  RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField(
    "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);

NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");

Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaNGramExample.java"。

二值化器

二值化是将数值特征阈值化为二进制(0/1)特征的过程。

Binarizer 接受常用参数 inputColoutputCol,以及用于二值化的 threshold。大于阈值的特征值被二值化为 1.0;等于或小于阈值的特征值被二值化为 0.0。inputCol 支持 Vector 和 Double 类型。

示例

有关 API 的更多详细信息,请参阅 Binarizer Python 文档

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/binarizer_example.py"。

有关 API 的更多详细信息,请参阅 Binarizer Scala 文档

import org.apache.spark.ml.feature.Binarizer

val data = immutable.ArraySeq.unsafeWrapArray(Array((0, 0.1), (1, 0.8), (2, 0.2)))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala"。

有关 API 的更多详细信息,请参阅 Binarizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 0.1),
  RowFactory.create(1, 0.8),
  RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);

Binarizer binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5);

Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);

System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaBinarizerExample.java"。

PCA

PCA 是一种统计过程,它使用正交变换将一组可能相关的变量的观测值转换为一组线性不相关的变量值,这些变量值称为主成分。PCA 类训练一个模型,使用 PCA 将向量投影到低维空间。下面的示例演示了如何将 5 维特征向量投影到 3 维主成分中。

示例

有关 API 的更多详细信息,请参阅 PCA Python 文档

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/pca_example.py"。

有关 API 的更多详细信息,请参阅 PCA Scala 文档

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply)))
  .toDF("features")

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)

val result = pca.transform(df).select("pcaFeatures")
result.show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala"。

有关 API 的更多详细信息,请参阅 PCA Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
  RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
  RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PCAModel pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df);

Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaPCAExample.java"。

多项式展开

多项式展开是将特征扩展到多项式空间的过程,该空间由原始维度的 n 次组合形成。PolynomialExpansion 类提供了此功能。下面的示例演示了如何将特征扩展到 3 次多项式空间。

示例

有关 API 的更多详细信息,请参阅 PolynomialExpansion Python 文档

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/polynomial_expansion_example.py"。

有关 API 的更多详细信息,请参阅 PolynomialExpansion Scala 文档

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(2.0, 1.0),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply)))
  .toDF("features")

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)

val polyDF = polyExpansion.transform(df)
polyDF.show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala"。

有关 API 的更多详细信息,请参阅 PolynomialExpansion Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(2.0, 1.0)),
  RowFactory.create(Vectors.dense(0.0, 0.0)),
  RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java"。

离散余弦变换 (DCT)

离散余弦变换将时域中长度为 $N$ 的实值序列转换为频域中另一个长度为 $N$ 的实值序列。DCT 类提供了此功能,实现了DCT-II 并将结果按 $1/\sqrt{2}$ 缩放,使得变换的表示矩阵是酉矩阵。变换后的序列没有应用移位(例如,变换序列的第 $0$ 个元素是第 $0$ 个 DCT 系数,而不是第 $N/2$ 个)。

示例

有关 API 的更多详细信息,请参阅 DCT Python 文档

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/dct_example.py"。

有关 API 的更多详细信息,请参阅 DCT Scala 文档

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala"。

有关 API 的更多详细信息,请参阅 DCT Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
  RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
  RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

DCT dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false);

Dataset<Row> dctDf = dct.transform(df);

dctDf.select("featuresDCT").show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaDCTExample.java"。

字符串索引器

StringIndexer 将标签的字符串列编码为标签索引列。StringIndexer 可以编码多个列。索引范围是 [0, numLabels),并支持四种排序选项:“frequencyDesc”:按标签频率降序排列(最频繁的标签分配索引 0),“frequencyAsc”:按标签频率升序排列(最不频繁的标签分配索引 0),“alphabetDesc”:按字母降序排列,以及“alphabetAsc”:按字母升序排列(默认值 = “frequencyDesc”)。请注意,在“frequencyDesc”/“frequencyAsc”下,如果频率相等,则字符串会进一步按字母顺序排序。

如果用户选择保留未见过的标签,它们将被放置在索引 numLabels 处。如果输入列是数值类型,我们将其转换为字符串并对字符串值进行索引。当下游管道组件(如 EstimatorTransformer)使用此字符串索引标签时,您必须将组件的输入列设置为此字符串索引列名。在许多情况下,您可以使用 setInputCol 设置输入列。

示例

假设我们有以下包含 idcategory 列的 DataFrame

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category 是一个包含三个标签:“a”、“b”和“c”的字符串列。将 StringIndexer 应用于以 category 作为输入列和 categoryIndex 作为输出列的 DataFrame,我们应该得到以下结果:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

“a”获得索引 0,因为它是最常见的,其次是“c”获得索引 1,以及“b”获得索引 2

此外,当您在一个数据集上拟合了 StringIndexer 然后将其用于转换另一个数据集时,StringIndexer 处理未见过的标签有三种策略:

示例

让我们回到之前的示例,但这次在以下数据集上重用我们之前定义的 StringIndexer

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

如果您尚未设置 StringIndexer 如何处理未见过的标签,或者将其设置为“error”,则会抛出异常。但是,如果您调用了 setHandleInvalid("skip"),则会生成以下数据集:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

请注意,包含“d”或“e”的行没有出现。

如果您调用 setHandleInvalid("keep"),则会生成以下数据集:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0

请注意,包含“d”或“e”的行被映射到索引“3.0”

有关 API 的更多详细信息,请参阅 StringIndexer Python 文档

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/string_indexer_example.py"。

有关 API 的更多详细信息,请参阅 StringIndexer Scala 文档

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala"。

有关 API 的更多详细信息,请参阅 StringIndexer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexer indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex");

Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaStringIndexerExample.java"。

索引到字符串

StringIndexer 对称,IndexToString 将标签索引列映射回包含原始标签的字符串列。一个常见的用例是使用 StringIndexer 从标签生成索引,然后使用这些索引训练模型,再使用 IndexToString 从预测索引列中检索原始标签。但是,您可以自由提供自己的标签。

示例

基于 StringIndexer 示例,假设我们有以下包含 idcategoryIndex 列的 DataFrame

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

IndexToString 应用于以 categoryIndex 作为输入列和 originalCategory 作为输出列的 DataFrame,我们能够检索原始标签(它们将从列的元数据中推断出来)

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

有关 API 的更多详细信息,请参阅 IndexToString Python 文档

from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/index_to_string_example.py"。

有关 API 的更多详细信息,请参阅 IndexToString Scala 文档

import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

println(s"Transformed string column '${indexer.getInputCol}' " +
    s"to indexed column '${indexer.getOutputCol}'")
indexed.show()

val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
    s"${Attribute.fromStructField(inputColSchema).toString}\n")

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)

println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
    s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala"。

有关 API 的更多详细信息,请参阅 IndexToString Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
Dataset<Row> indexed = indexer.transform(df);

System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
    "to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();

StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
    Attribute.fromStructField(inputColSchema).toString() + "\n");

IndexToString converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);

System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
    "original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaIndexToStringExample.java"。

独热编码器

独热编码将表示为标签索引的类别特征映射到一个二进制向量,该向量中至多有一个 1 值,表示所有特征值集合中某个特定特征值的存在。这种编码允许期望连续特征的算法(如逻辑回归)使用类别特征。对于字符串类型的输入数据,通常首先使用 StringIndexer 对类别特征进行编码。

OneHotEncoder 可以转换多个列,为每个输入列返回一个独热编码的输出向量列。通常会使用 VectorAssembler 将这些向量合并为单个特征向量。

OneHotEncoder 支持 handleInvalid 参数,用于选择在转换数据时如何处理无效输入。可用选项包括“keep”(任何无效输入都分配给一个额外的类别索引)和“error”(抛出错误)。

示例

有关 API 的更多详细信息,请参阅 OneHotEncoder Python 文档

from pyspark.ml.feature import OneHotEncoder

df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/onehot_encoder_example.py"。

有关 API 的更多详细信息,请参阅 OneHotEncoder Scala 文档

import org.apache.spark.ml.feature.OneHotEncoder

val df = spark.createDataFrame(Seq(
  (0.0, 1.0),
  (1.0, 0.0),
  (2.0, 1.0),
  (0.0, 2.0),
  (0.0, 1.0),
  (2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")

val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex1", "categoryIndex2"))
  .setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)

val encoded = model.transform(df)
encoded.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala"。

有关 API 的更多详细信息,请参阅 OneHotEncoder Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, 1.0),
  RowFactory.create(1.0, 0.0),
  RowFactory.create(2.0, 1.0),
  RowFactory.create(0.0, 2.0),
  RowFactory.create(0.0, 1.0),
  RowFactory.create(2.0, 0.0)
);

StructType schema = new StructType(new StructField[]{
  new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

OneHotEncoder encoder = new OneHotEncoder()
  .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
  .setOutputCols(new String[] {"categoryVec1", "categoryVec2"});

OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java"。

目标编码器

目标编码是一种数据预处理技术,它将高基数类别特征转换为准连续标量属性,适用于回归类型模型。这种范式将独立特征的单个值映射到一个标量,该标量表示因变量的某个估计值(意味着与目标变量具有相似统计特征的类别值将具有相似的表示)。

通过利用类别特征和目标变量之间的关系,目标编码通常比独热编码表现更好,并且不需要最终的二进制向量编码,从而降低了数据集的整体维度。

用户可以通过设置 inputColoutputCol 来指定单列用例的输入和输出列名,或者通过设置 inputColsoutputCols 来指定多列用例的输入和输出列名(两个数组必须具有相同的大小)。这些列预计包含类别索引(正整数),缺失值(null)被视为一个单独的类别。数据类型必须是‘NumericType’的任何子类。对于字符串类型的输入数据,通常首先使用 StringIndexer 对类别特征进行编码。

用户可以通过设置 label 来指定目标列名。该列预计包含将从中导出编码的真实标签。在计算估计值时,不考虑缺少标签(null)的观测值。数据类型必须是‘NumericType’的任何子类。

TargetEncoder 支持 handleInvalid 参数,用于选择在编码新数据时如何处理无效输入,即训练时未见过的类别。可用选项包括“keep”(任何无效输入都分配给一个额外的类别索引)和“error”(抛出异常)。

TargetEncoder 支持 targetType 参数,用于选择拟合数据时的标签类型,这会影响估计值的计算方式。可用选项包括“binary”(二进制)和“continuous”(连续)。

当设置为“binary”时,目标属性 $Y$ 预期为二进制,$Y\in{ 0,1 }$。转换将单个值 $X_{i}$ 映射到给定 $X=X_{i}$ 时 $Y$ 的条件概率:$\;\; S_{i}=P(Y\mid X=X_{i})$。这种方法也称为 bin-counting。

当设置为“continuous”时,目标属性 $Y$ 预期为连续的,$Y\in\mathbb{Q}$。转换将单个值 $X_{i}$ 映射到给定 $X=X_{i}$ 时 $Y$ 的平均值:$\;\; S_{i}=E[Y\mid X=X_{i}]$。这种方法也称为均值编码。

TargetEncoder 支持 smoothing 参数,用于调整类别内统计量和总体统计量的融合方式。高基数类别特征通常在 $X$ 的所有可能值中分布不均匀。因此,仅根据类别内统计量计算编码 $S_{i}$ 会使这些估计非常不可靠,并且很少见的类别很可能在学习中导致过拟合。

平滑通过根据特定类别在整个数据集上的相对大小,将类别内估计值与总体估计值加权,从而防止了这种行为。

对于二进制情况:$\;\;\; S_{i}=\lambda(n_{i})\, P(Y\mid X=X_{i})+(1-\lambda(n_{i}))\, P(Y)$

对于连续情况:$\;\;\; S_{i}=\lambda(n_{i})\, E[Y\mid X=X_{i}]+(1-\lambda(n_{i}))\, E[Y]$

其中 $\lambda(n_{i})$ 是关于 $n_{i}$ 的单调递增函数,值域在 0 到 1 之间。

通常 $\lambda(n_{i})$ 作为参数函数 $\lambda(n_{i})=\frac{n_{i}}{n_{i}+m}$ 实现,其中 $m$ 是平滑因子,由 TargetEncoder 中的 smoothing 参数表示。

示例

基于 TargetEncoder 示例,假设我们有以下包含 featuretarget(二进制和连续)列的 DataFrame

 feature | target | target
         | (bin)  | (cont)
 --------|--------|--------
 1       | 0      | 1.3
 1       | 1      | 2.5
 1       | 0      | 1.6
 2       | 1      | 1.8
 2       | 0      | 2.4
 3       | 1      | 3.2

TargetEncoder 应用于目标类型为“binary”、输入列为 feature、标签列为 target (bin)、输出列为 encoded 的数据,我们能够对数据进行模型拟合以学习编码,并根据这些映射转换数据

 feature | target | encoded
         | (bin)  |
 --------|--------|--------
 1       | 0      | 0.333
 1       | 1      | 0.333
 1       | 0      | 0.333
 2       | 1      | 0.5
 2       | 0      | 0.5
 3       | 1      | 1.0

TargetEncoder 应用于目标类型为“continuous”、输入列为 feature、标签列为 target (cont)、输出列为 encoded 的数据,我们能够对数据进行模型拟合以学习编码,并根据这些映射转换数据

 feature | target | encoded
         | (cont) |
 --------|--------|--------
 1       | 1.3    | 1.8
 1       | 2.5    | 1.8
 1       | 1.6    | 1.8
 2       | 1.8    | 2.1
 2       | 2.4    | 2.1
 3       | 3.2    | 3.2

有关 API 的更多详细信息,请参阅 TargetEncoder Python 文档

from pyspark.ml.feature import TargetEncoder


df = spark.createDataFrame(
    [
        (0.0, 1.0, 0, 10.0),
        (1.0, 0.0, 1, 20.0),
        (2.0, 1.0, 0, 30.0),
        (0.0, 2.0, 1, 40.0),
        (0.0, 1.0, 0, 50.0),
        (2.0, 0.0, 1, 60.0),
    ],
    ["categoryIndex1", "categoryIndex2", "binaryLabel", "continuousLabel"],
)

# binary target
encoder = TargetEncoder(
    inputCols=["categoryIndex1", "categoryIndex2"],
    outputCols=["categoryIndex1Target", "categoryIndex2Target"],
    labelCol="binaryLabel",
    targetType="binary"
)
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()

# continuous target
encoder = TargetEncoder(
    inputCols=["categoryIndex1", "categoryIndex2"],
    outputCols=["categoryIndex1Target", "categoryIndex2Target"],
    labelCol="continuousLabel",
    targetType="continuous"
)

model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/target_encoder_example.py"。

有关 API 的更多详细信息,请参阅 TargetEncoder Scala 文档

import org.apache.spark.ml.feature.TargetEncoder

val df = spark.createDataFrame(Seq(
  (0.0, 1.0, 0, 10.0),
  (1.0, 0.0, 1, 20.0),
  (2.0, 1.0, 0, 30.0),
  (0.0, 2.0, 1, 40.0),
  (0.0, 1.0, 0, 50.0),
  (2.0, 0.0, 1, 60.0)
)).toDF("categoryIndex1", "categoryIndex2",
        "binaryLabel", "continuousLabel")

// binary target
val bin_encoder = new TargetEncoder()
  .setInputCols(Array("categoryIndex1", "categoryIndex2"))
  .setOutputCols(Array("categoryIndex1Target", "categoryIndex2Target"))
  .setLabelCol("binaryLabel")
  .setTargetType("binary");

val bin_model = bin_encoder.fit(df)
val bin_encoded = bin_model.transform(df)
bin_encoded.show()

// continuous target
val cont_encoder = new TargetEncoder()
  .setInputCols(Array("categoryIndex1", "categoryIndex2"))
  .setOutputCols(Array("categoryIndex1Target", "categoryIndex2Target"))
  .setLabelCol("continuousLabel")
  .setTargetType("continuous");

val cont_model = cont_encoder.fit(df)
val cont_encoded = cont_model.transform(df)
cont_encoded.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/TargetEncoderExample.scala"。

有关 API 的更多详细信息,请参阅 TargetEncoder Java 文档

import org.apache.spark.ml.feature.TargetEncoder;
import org.apache.spark.ml.feature.TargetEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, 1.0, 0, 10.0),
  RowFactory.create(1.0, 0.0, 1, 20.0),
  RowFactory.create(2.0, 1.0, 0, 30.0),
  RowFactory.create(0.0, 2.0, 1, 40.0),
  RowFactory.create(0.0, 1.0, 0, 50.0),
  RowFactory.create(2.0, 0.0, 1, 60.0)
);

StructType schema = new StructType(new StructField[]{
  new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("binaryLabel", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("continuousLabel", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

// binary target
TargetEncoder bin_encoder = new TargetEncoder()
  .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
  .setOutputCols(new String[] {"categoryIndex1Target", "categoryIndex2Target"})
  .setLabelCol("binaryLabel")
  .setTargetType("binary");

TargetEncoderModel bin_model = bin_encoder.fit(df);
Dataset<Row> bin_encoded = bin_model.transform(df);
bin_encoded.show();

// continuous target
TargetEncoder cont_encoder = new TargetEncoder()
  .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
  .setOutputCols(new String[] {"categoryIndex1Target", "categoryIndex2Target"})
  .setLabelCol("continuousLabel")
  .setTargetType("continuous");

TargetEncoderModel cont_model = cont_encoder.fit(df);
Dataset<Row> cont_encoded = cont_model.transform(df);
cont_encoded.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaTargetEncoderExample.java"。

向量索引器

VectorIndexer 帮助对 Vector 数据集中的类别特征进行索引。它可以自动决定哪些特征是类别特征,并将原始值转换为类别索引。具体来说,它执行以下操作:

  1. 接受一个 Vector 类型的输入列和一个参数 maxCategories
  2. 根据不同值的数量决定哪些特征应该是类别特征,其中具有最多 maxCategories 不同值的特征被声明为类别特征。
  3. 为每个类别特征计算基于 0 的类别索引。
  4. 索引类别特征并将原始特征值转换为索引。

对类别特征进行索引允许决策树和树集成等算法适当地处理类别特征,从而提高性能。

示例

在下面的示例中,我们读取了一个标记点的Kaggle数据集,然后使用 VectorIndexer 来决定哪些特征应被视为类别特征。我们将类别特征值转换为它们的索引。然后,这些转换后的数据可以传递给处理类别特征的算法,例如 DecisionTreeRegressor

有关 API 的更多详细信息,请参阅 VectorIndexer Python 文档

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/vector_indexer_example.py"。

有关 API 的更多详细信息,请参阅 VectorIndexer Scala 文档

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
  s"categorical features: ${categoricalFeatures.mkString(", ")}")

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala"。

有关 API 的更多详细信息,请参阅 VectorIndexer Java 文档

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorIndexerExample.java"。

交互

Interaction 是一个 Transformer,它接受向量或双精度浮点列,并生成一个包含每个输入列中一个值的所有组合乘积的单个向量列。

例如,如果您有两个向量类型列,每个列都具有 3 个维度作为输入列,那么您将得到一个 9 维向量作为输出列。

示例

假设我们有以下包含“id1”、“vec1”和“vec2”列的 DataFrame

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]     

Interaction 应用于这些输入列,然后输出列 interactedCol 包含

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]       

有关 API 的更多详细信息,请参阅 Interaction Python 文档

from pyspark.ml.feature import Interaction, VectorAssembler

df = spark.createDataFrame(
    [(1, 1, 2, 3, 8, 4, 5),
     (2, 4, 3, 8, 7, 9, 8),
     (3, 6, 1, 9, 2, 3, 6),
     (4, 10, 8, 6, 9, 4, 5),
     (5, 9, 2, 7, 10, 7, 3),
     (6, 1, 1, 4, 2, 8, 4)],
    ["id1", "id2", "id3", "id4", "id5", "id6", "id7"])

assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")

assembled1 = assembler1.transform(df)

assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")

assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")

interacted = interaction.transform(assembled2)

interacted.show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/interaction_example.py"。

有关 API 的更多详细信息,请参阅 Interaction Scala 文档

import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 10, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 10, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
  setInputCols(Array("id2", "id3", "id4")).
  setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
  setInputCols(Array("id5", "id6", "id7")).
  setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
  .setInputCols(Array("id1", "vec1", "vec2"))
  .setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/InteractionExample.scala"。

有关 API 的更多详细信息,请参阅 Interaction Java 文档

List<Row> data = Arrays.asList(
  RowFactory.create(1, 1, 2, 3, 8, 4, 5),
  RowFactory.create(2, 4, 3, 8, 7, 9, 8),
  RowFactory.create(3, 6, 1, 9, 2, 3, 6),
  RowFactory.create(4, 10, 8, 6, 9, 4, 5),
  RowFactory.create(5, 9, 2, 7, 10, 7, 3),
  RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VectorAssembler assembler1 = new VectorAssembler()
        .setInputCols(new String[]{"id2", "id3", "id4"})
        .setOutputCol("vec1");

Dataset<Row> assembled1 = assembler1.transform(df);

VectorAssembler assembler2 = new VectorAssembler()
        .setInputCols(new String[]{"id5", "id6", "id7"})
        .setOutputCol("vec2");

Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");

Interaction interaction = new Interaction()
        .setInputCols(new String[]{"id1","vec1","vec2"})
        .setOutputCol("interactedCol");

Dataset<Row> interacted = interaction.transform(assembled2);

interacted.show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java"。

归一化器

Normalizer 是一个 Transformer,它转换由 Vector 行组成的数据集,将每个 Vector 归一化为单位范数。它接受参数 p,该参数指定用于归一化的p-范数。(默认情况下 $p = 2$。)这种归一化有助于标准化您的输入数据并改善学习算法的行为。

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每行归一化为单位 $L^1$ 范数和单位 $L^\infty$ 范数。

有关 API 的更多详细信息,请参阅 Normalizer Python 文档

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/normalizer_example.py"。

有关 API 的更多详细信息,请参阅 Normalizer Scala 文档

import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala"。

有关 API 的更多详细信息,请参阅 Normalizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaNormalizerExample.java"。

标准化器

StandardScaler 转换由 Vector 行组成的数据集,将每个特征标准化为单位标准差和/或零均值。它接受参数:

StandardScaler 是一个 Estimator,可以在数据集上进行 fit 以生成一个 StandardScalerModel;这相当于计算汇总统计量。然后该模型可以将数据集中的 Vector 列转换为具有单位标准差和/或零均值特征。

请注意,如果某个特征的标准差为零,它将在该特征的 Vector 中返回默认值 0.0

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征归一化为单位标准差。

有关 API 的更多详细信息,请参阅 StandardScaler Python 文档

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/standard_scaler_example.py"。

有关 API 的更多详细信息,请参阅 StandardScaler Scala 文档

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala"。

有关 API 的更多详细信息,请参阅 StandardScaler Java 文档

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaStandardScalerExample.java"。

鲁棒缩放器

RobustScaler 转换由 Vector 行组成的数据集,移除中位数并根据特定的分位数范围(默认为 IQR:四分位距,即第一四分位数和第三四分位数之间的范围)缩放数据。它的行为与 StandardScaler 相当相似,但使用中位数和分位数范围代替均值和标准差,这使其对异常值具有鲁棒性。它接受参数:

RobustScaler 是一个 Estimator,可以在数据集上进行 fit 以生成一个 RobustScalerModel;这相当于计算分位数统计量。然后该模型可以将数据集中的 Vector 列转换为具有单位分位数范围和/或零中值特征。

请注意,如果某个特征的分位数范围为零,它将在该特征的 Vector 中返回默认值 0.0

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征归一化为单位分位数范围。

有关 API 的更多详细信息,请参阅 RobustScaler Python 文档

from pyspark.ml.feature import RobustScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)

# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/robust_scaler_example.py"。

有关 API 的更多详细信息,请参阅 RobustScaler Scala 文档

import org.apache.spark.ml.feature.RobustScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75)

// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)

// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/RobustScalerExample.scala"。

有关 API 的更多详细信息,请参阅 RobustScaler Java 文档

import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

RobustScaler scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75);

// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);

// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaRobustScalerExample.java"。

最小最大值缩放器

MinMaxScaler 转换由 Vector 行组成的数据集,将每个特征重新缩放到特定范围(通常是 [0, 1])。它接受参数:

MinMaxScaler 计算数据集上的汇总统计量并生成一个 MinMaxScalerModel。然后该模型可以单独转换每个特征,使其在给定范围内。

特征 E 的重新缩放值计算如下:\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation} 对于 $E_{max} == E_{min}$ 的情况,$Rescaled(e_i) = 0.5 * (max + min)$

请注意,由于零值可能会转换为非零值,因此即使对于稀疏输入,转换器的输出也将是 DenseVector

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征重新缩放到 [0, 1] 范围。

有关 API 的更多详细信息,请参阅 MinMaxScaler Python 文档MinMaxScalerModel Python 文档

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/min_max_scaler_example.py"。

有关 API 的更多详细信息,请参阅 MinMaxScaler Scala 文档MinMaxScalerModel Scala 文档

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -1.0)),
  (1, Vectors.dense(2.0, 1.1, 1.0)),
  (2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala"。

有关 API 的更多详细信息,请参阅 MinMaxScaler Java 文档MinMaxScalerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
    RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
    + scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaMinMaxScalerExample.java"。

最大绝对值缩放器

MaxAbsScaler 转换由 Vector 行组成的数据集,通过除以每个特征中的最大绝对值,将每个特征重新缩放到 [-1, 1] 范围。它不会平移/居中数据,因此不会破坏任何稀疏性。

MaxAbsScaler 计算数据集上的汇总统计量并生成一个 MaxAbsScalerModel。然后该模型可以单独转换每个特征,使其在 [-1, 1] 范围内。

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征重新缩放到 [-1, 1] 范围。

有关 API 的更多详细信息,请参阅 MaxAbsScaler Python 文档MaxAbsScalerModel Python 文档

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/max_abs_scaler_example.py"。

有关 API 的更多详细信息,请参阅 MaxAbsScaler Scala 文档MaxAbsScalerModel Scala 文档

import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -8.0)),
  (1, Vectors.dense(2.0, 1.0, -4.0)),
  (2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala"。

有关 API 的更多详细信息,请参阅 MaxAbsScaler Java 文档MaxAbsScalerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaMaxAbsScalerExample.java"。

分桶器

Bucketizer 将连续特征列转换为特征分桶列,其中分桶由用户指定。它接受参数:

另请注意,如果您不知道目标列的上限和下限,您应该将 Double.NegativeInfinityDouble.PositiveInfinity 添加为分割点的边界,以防止可能出现 Bucketizer 越界异常。

另请注意,您提供的分割点必须是严格递增的,即 s0 < s1 < s2 < ... < sn

更多详细信息可以在 Bucketizer 的 API 文档中找到。

示例

以下示例演示了如何将 Double 列分桶到另一个索引化列中。

有关 API 的更多详细信息,请参阅 Bucketizer Python 文档

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/bucketizer_example.py"。

有关 API 的更多详细信息,请参阅 Bucketizer Scala 文档

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(
  immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply))).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()

val splitsArray = Array(
  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
  Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))

val data2 = Array(
  (-999.9, -999.9),
  (-0.5, -0.2),
  (-0.3, -0.1),
  (0.0, 0.0),
  (0.2, 0.4),
  (999.9, 999.9))
val dataFrame2 = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data2))
  .toDF("features1", "features2")

val bucketizer2 = new Bucketizer()
  .setInputCols(Array("features1", "features2"))
  .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
  .setSplitsArray(splitsArray)

// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)

println(s"Bucketizer output with [" +
  s"${bucketizer2.getSplitsArray(0).length-1}, " +
  s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala"。

有关 API 的更多详细信息,请参阅 Bucketizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-999.9),
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2),
  RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);

System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();

// Bucketize multiple columns at one pass.
double[][] splitsArray = {
  {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
  {Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};

List<Row> data2 = Arrays.asList(
  RowFactory.create(-999.9, -999.9),
  RowFactory.create(-0.5, -0.2),
  RowFactory.create(-0.3, -0.1),
  RowFactory.create(0.0, 0.0),
  RowFactory.create(0.2, 0.4),
  RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
  new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);

Bucketizer bucketizer2 = new Bucketizer()
  .setInputCols(new String[] {"features1", "features2"})
  .setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
  .setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);

System.out.println("Bucketizer output with [" +
  (bucketizer2.getSplitsArray()[0].length-1) + ", " +
  (bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java"。

元素乘积

元素乘积通过逐元素乘法将每个输入向量乘以提供的“权重”向量。换句话说,它通过标量乘数缩放数据集的每一列。这表示输入向量 v 和转换向量 w 之间的哈达玛乘积,以产生结果向量。

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

示例

以下示例演示了如何使用转换向量值来转换向量。

有关 API 的更多详细信息,请参阅 ElementwiseProduct Python 文档

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/elementwise_product_example.py"。

有关 API 的更多详细信息,请参阅 ElementwiseProduct Scala 文档

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala"。

有关 API 的更多详细信息,请参阅 ElementwiseProduct Java 文档

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaElementwiseProductExample.java"。

SQL转换器

SQLTransformer 实现了由 SQL 语句定义的转换。目前,我们只支持类似 "SELECT ... FROM __THIS__ ..." 的 SQL 语法,其中 "__THIS__" 表示输入数据集的底层表。SELECT 子句指定要在输出中显示的字段、常量和表达式,并且可以是 Spark SQL 支持的任何 SELECT 子句。用户还可以使用 Spark SQL 内置函数和 UDF 来操作这些选定的列。例如,SQLTransformer 支持以下语句:

示例

假设我们有以下包含 idv1v2 列的 DataFrame

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

这是使用语句 "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"SQLTransformer 的输出:

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0

有关 API 的更多详细信息,请参阅 SQLTransformer Python 文档

from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/sql_transformer.py"。

有关 API 的更多详细信息,请参阅 SQLTransformer Scala 文档

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala"。

有关 API 的更多详细信息,请参阅 SQLTransformer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java"。

向量组合器

VectorAssembler 是一个转换器,它将给定的列列表组合成单个向量列。它对于将原始特征和不同特征转换器生成的特征组合成一个特征向量非常有用,以便训练逻辑回归和决策树等机器学习模型。VectorAssembler 接受以下输入列类型:所有数值类型、布尔类型和向量类型。在每一行中,输入列的值将按指定顺序连接成一个向量。

示例

假设我们有一个包含 idhourmobileuserFeaturesclicked 列的 DataFrame

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeatures 是一个包含三个用户特征的向量列。我们希望将 hourmobileuserFeatures 组合成一个名为 features 的特征向量,并使用它来预测是否 clicked。如果我们将 VectorAssembler 的输入列设置为 hourmobileuserFeatures,输出列设置为 features,则转换后我们应该得到以下 DataFrame:

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

有关 API 的更多详细信息,请参阅 VectorAssembler Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/vector_assembler_example.py"。

有关 API 的更多详细信息,请参阅 VectorAssembler Scala 文档

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala"。

有关 API 的更多详细信息,请参阅 VectorAssembler Java 文档

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorAssemblerExample.java"。

向量大小提示

有时,明确指定 VectorType 列的向量大小会很有用。例如,VectorAssembler 使用其输入列的大小信息来为其输出列生成大小信息和元数据。虽然在某些情况下可以通过检查列的内容来获取此信息,但在流式 DataFrame 中,直到流启动后才能获取内容。VectorSizeHint 允许用户明确指定列的向量大小,以便 VectorAssembler 或其他可能需要了解向量大小的转换器可以将该列用作输入。

要使用 VectorSizeHint,用户必须设置 inputColsize 参数。将此转换器应用于 DataFrame 会生成一个新的 DataFrame,其中 inputCol 的元数据已更新,指定了向量大小。对结果 DataFrame 的下游操作可以使用此元数据获取该大小。

VectorSizeHint 还可以接受一个可选的 handleInvalid 参数,用于控制当向量列包含空值或错误大小的向量时的行为。默认情况下,handleInvalid 设置为“error”,表示应抛出异常。此参数也可以设置为“skip”,表示应从结果 DataFrame 中过滤掉包含无效值的行;或设置为“optimistic”,表示不应检查列中的无效值,并应保留所有行。请注意,使用“optimistic”可能会导致结果 DataFrame 处于不一致状态,这意味着 VectorSizeHint 应用的列的元数据与该列的内容不匹配。用户应注意避免这种不一致状态。

有关 API 的更多详细信息,请参阅 VectorSizeHint Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

sizeHint = VectorSizeHint(
    inputCol="userFeatures",
    handleInvalid="skip",
    size=3)

datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/python/ml/vector_size_hint_example.py"。

有关 API 的更多详细信息,请参阅 VectorSizeHint Scala 文档

import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
    (0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
    (0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3)

val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
完整的示例代码位于 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/VectorSizeHintExample.scala"。

有关 API 的更多详细信息,请参阅 VectorSizeHint Java 文档

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);

VectorSizeHint sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3);

Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
完整的示例代码位于 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSizeHintExample.java"。

分位数离散化器

QuantileDiscretizer 接受一个包含连续特征的列,并输出一个包含分桶类别特征的列。桶的数量由 numBuckets 参数设置。使用的桶的数量可能小于此值,例如,如果输入的唯一值太少而无法创建足够多的唯一分位数。

NaN 值:在 QuantileDiscretizer 拟合期间,NaN 值将从列中移除。这将生成一个用于进行预测的 Bucketizer 模型。在转换期间,Bucketizer 在数据集中发现 NaN 值时会引发错误,但用户也可以通过设置 handleInvalid 来选择保留或移除数据集中的 NaN 值。如果用户选择保留 NaN 值,它们将得到特殊处理并放置在自己的桶中,例如,如果使用 4 个桶,那么非 NaN 数据将放入桶 [0-3] 中,而 NaN 值将计入一个特殊的桶 [4] 中。

算法:分箱范围是使用近似算法选择的(详细描述请参见 approxQuantile 的文档)。近似的精度可以通过 relativeError 参数控制。当设置为零时,将计算精确分位数(注意:计算精确分位数是一项耗时的操作)。下限和上限分箱边界将分别为 -Infinity+Infinity,覆盖所有实数值。

示例

假设我们有一个包含 id, hour 列的 DataFrame。

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hour 是一个 Double 类型的连续特征。我们希望将这个连续特征转换为一个类别特征。给定 numBuckets = 3,我们应该得到以下 DataFrame:

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Python 文档

from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/quantile_discretizer_example.py"。

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Scala 文档

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data)).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show(false)
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala"。

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 18.0),
  RowFactory.create(1, 19.0),
  RowFactory.create(2, 8.0),
  RowFactory.create(3, 5.0),
  RowFactory.create(4, 2.2)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java"。

填补器

Imputer 估算器使用缺失值所在列的均值、中位数或众数来填充数据集中的缺失值。输入列应为数值类型。目前 Imputer 不支持类别特征,并且可能为包含类别特征的列创建不正确的值。Imputer 可以通过 .setMissingValue(custom_value) 填充除 'NaN' 之外的自定义值。例如,.setMissingValue(0) 将填充所有出现的 (0)。

注意:输入列中的所有 null 值都被视为缺失值,因此也会被填充。

示例

假设我们有一个包含 ab 列的 DataFrame。

      a     |      b      
------------|-----------
     1.0    | Double.NaN
     2.0    | Double.NaN
 Double.NaN |     3.0   
     4.0    |     4.0   
     5.0    |     5.0   

在此示例中,Imputer 将所有出现的 Double.NaN(缺失值的默认值)替换为从相应列中其他值计算出的均值(默认填充策略)。在此示例中,列 ab 的替代值分别为 3.0 和 4.0。转换后,输出列中的缺失值将替换为相关列的替代值。

      a     |      b     | out_a | out_b   
------------|------------|-------|-------
     1.0    | Double.NaN |  1.0  |  4.0 
     2.0    | Double.NaN |  2.0  |  4.0 
 Double.NaN |     3.0    |  3.0  |  3.0 
     4.0    |     4.0    |  4.0  |  4.0
     5.0    |     5.0    |  5.0  |  5.0 

有关 API 的更多详细信息,请参阅 Imputer Python 文档

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/imputer_example.py"。

有关 API 的更多详细信息,请参阅 Imputer Scala 文档

import org.apache.spark.ml.feature.Imputer

val df = spark.createDataFrame(Seq(
  (1.0, Double.NaN),
  (2.0, Double.NaN),
  (Double.NaN, 3.0),
  (4.0, 4.0),
  (5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
  .setInputCols(Array("a", "b"))
  .setOutputCols(Array("out_a", "out_b"))

val model = imputer.fit(df)
model.transform(df).show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/ImputerExample.scala"。

有关 API 的更多详细信息,请参阅 Imputer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1.0, Double.NaN),
  RowFactory.create(2.0, Double.NaN),
  RowFactory.create(Double.NaN, 3.0),
  RowFactory.create(4.0, 4.0),
  RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
  createStructField("a", DoubleType, false),
  createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Imputer imputer = new Imputer()
  .setInputCols(new String[]{"a", "b"})
  .setOutputCols(new String[]{"out_a", "out_b"});

ImputerModel model = imputer.fit(df);
model.transform(df).show();
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaImputerExample.java"。

特征选择器

向量切片器

VectorSlicer 是一个转换器,它接收一个特征向量并输出一个包含原始特征子数组的新特征向量。它对于从向量列中提取特征非常有用。

VectorSlicer 接受一个带有指定索引的向量列,然后输出一个新向量列,其值通过这些索引选择。有两种类型的索引:

  1. 表示向量索引的整数索引,setIndices()

  2. 表示向量中特征名称的字符串索引,setNames()这要求向量列具有 AttributeGroup,因为实现是根据 Attribute 的名称字段进行匹配的。

整数和字符串指定都可接受。此外,您可以同时使用整数索引和字符串名称。必须至少选择一个特征。不允许重复特征,因此选定的索引和名称之间不能有重叠。请注意,如果选择特征名称,遇到空的输入属性时将抛出异常。

输出向量将首先按选定索引的顺序(按给定顺序)排列特征,然后按选定名称的顺序(按给定顺序)排列。

示例

假设我们有一个包含 userFeatures 列的 DataFrame。

 userFeatures
------------------
 [0.0, 10.0, 0.5]

userFeatures 是一个包含三个用户特征的向量列。假设 userFeatures 的第一列都是零,所以我们想删除它,只选择最后两列。VectorSlicer 使用 setIndices(1, 2) 选择最后两个元素,然后生成一个名为 features 的新向量列。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]

还假设我们有 userFeatures 的潜在输入属性,即 ["f1", "f2", "f3"],那么我们可以使用 setNames("f2", "f3") 来选择它们。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

有关 API 的更多详细信息,请参阅 VectorSlicer Python 文档

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/vector_slicer_example.py"。

有关 API 的更多详细信息,请参阅 VectorSlicer Scala 文档

import java.util.Arrays

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

val data = Arrays.asList(
  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  Row(Vectors.dense(-2.0, 2.3, 0.0))
)

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
output.show(false)
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala"。

有关 API 的更多详细信息,请参阅 VectorSlicer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

Attribute[] attrs = {
  NumericAttribute.defaultAttr().withName("f1"),
  NumericAttribute.defaultAttr().withName("f2"),
  NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
  RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);

Dataset<Row> dataset =
  spark.createDataFrame(data, (new StructType()).add(group.toStructField()));

VectorSlicer vectorSlicer = new VectorSlicer()
  .setInputCol("userFeatures").setOutputCol("features");

vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})

Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java"。

RFormula

RFormula 根据 R 模型公式 选择列。目前我们支持 R 运算符的有限子集,包括 ' ~ ', ' . ', ' : ', ' + ' 和 ' - '。基本运算符是:

假设 ab 是双精度列,我们使用以下简单示例来说明 RFormula 的效果:

RFormula 产生一个特征向量列和一个双精度或字符串标签列。就像在 R 中用于线性回归的公式一样,数值列将被转换为双精度。对于字符串输入列,它们将首先使用 StringIndexer 进行转换,其排序由 stringOrderType 确定,排序后的最后一个类别将被删除,然后双精度值将被独热编码。

假设一个字符串特征列包含值 {'b', 'a', 'b', 'a', 'c', 'b'},我们设置 stringOrderType 来控制编码:

stringOrderType | Category mapped to 0 by StringIndexer |  Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b')          | least frequent category ('c')
'frequencyAsc'  | least frequent category ('c')         | most frequent category ('b')
'alphabetDesc'  | last alphabetical category ('c')      | first alphabetical category ('a')
'alphabetAsc'   | first alphabetical category ('a')     | last alphabetical category ('c')

如果标签列是字符串类型,它将首先使用 StringIndexer 转换为双精度,并使用 frequencyDesc 排序。如果 DataFrame 中不存在标签列,则输出标签列将从公式中指定的响应变量创建。

注意:排序选项 stringOrderType 不用于标签列。当标签列被索引时,它使用 StringIndexer 中默认的降序频率排序。

示例

假设我们有一个包含 idcountryhourclicked 列的 DataFrame。

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

如果我们使用 RFormula,公式字符串为 clicked ~ country + hour,这表示我们想基于 countryhour 预测 clicked,转换后我们应该得到以下 DataFrame:

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

有关 API 的更多详细信息,请参阅 RFormula Python 文档

from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/rformula_example.py"。

有关 API 的更多详细信息,请参阅 RFormula Scala 文档

import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")

val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala"。

有关 API 的更多详细信息,请参阅 RFormula Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("country", StringType, false),
  createStructField("hour", IntegerType, false),
  createStructField("clicked", DoubleType, false)
});

List<Row> data = Arrays.asList(
  RowFactory.create(7, "US", 18, 1.0),
  RowFactory.create(8, "CA", 12, 0.0),
  RowFactory.create(9, "NZ", 15, 0.0)
);

Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java"。

卡方选择器

ChiSqSelector 代表卡方特征选择。它对带有类别特征的标记数据进行操作。ChiSqSelector 使用 卡方独立性检验 来决定选择哪些特征。它支持五种选择方法:numTopFeaturespercentilefprfdrfwe

示例

假设我们有一个包含 idfeaturesclicked 列的 DataFrame,其中 clicked 是我们要预测的目标。

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0

如果我们使用 ChiSqSelectornumTopFeatures = 1,那么根据我们的标签 clickedfeatures 中的最后一列被选为最有用的特征。

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

有关 API 的更多详细信息,请参阅 ChiSqSelector Python 文档

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/chisq_selector_example.py"。

有关 API 的更多详细信息,请参阅 ChiSqSelector Scala 文档

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala"。

有关 API 的更多详细信息,请参阅 ChiSqSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

ChiSqSelector selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
    + " features selected");
result.show();
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java"。

单变量特征选择器

UnivariateFeatureSelector 对带有类别/连续特征的类别/连续标签进行操作。用户可以设置 featureTypelabelType,Spark 将根据指定的 featureTypelabelType 选择要使用的评分函数。

featureType |  labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous  |categorical | ANOVATest (f_classif)
continuous  |continuous  | F-value (f_regression)

它支持五种选择模式:numTopFeatures, percentile, fpr, fdr, fwe

默认情况下,选择模式为 numTopFeatures,默认选择阈值设置为 50。

示例

假设我们有一个包含 idfeatureslabel 列的 DataFrame,其中 label 是我们要预测的目标。

id | features                       | label
---|--------------------------------|---------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0

如果我们设置 featureTypecontinuouslabelTypecategorical,并且 numTopFeatures = 1,那么 features 中的最后一列被选为最有用的特征。

id | features                       | label   | selectedFeatures
---|--------------------------------|---------|------------------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0     | [2.3]
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0     | [4.1]
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0     | [2.5]
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0     | [3.8]
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0     | [3.0]
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0     | [2.1]

有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Python 文档

from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
    (2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
    (3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
    (4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
    (5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
    (6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)

result = selector.fit(df).transform(df)

print("UnivariateFeatureSelector output with top %d features selected using f_classif"
      % selector.getSelectionThreshold())
result.show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/univariate_feature_selector_example.py"。

有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Scala 文档

import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  (2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  (3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  (4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  (5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  (6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)

val df = spark.createDataset(data).toDF("id", "features", "label")

val selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
  s" features selected using f_classif")
result.show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/UnivariateFeatureSelectorExample.scala"。

有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("UnivariateFeatureSelector output with top "
    + selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaUnivariateFeatureSelectorExample.java"。

方差阈值选择器

VarianceThresholdSelector 是一个选择器,用于移除低方差特征。方差不大于 varianceThreshold 的特征将被移除。如果未设置,varianceThreshold 默认为 0,这意味着只有方差为 0 的特征(即在所有样本中具有相同值的特征)将被移除。

示例

假设我们有一个包含 idfeatures 列的 DataFrame,其中 features 是我们要预测的目标。

id | features
---|--------------------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]

这6个特征的样本方差分别为16.67、0.67、8.17、10.17、5.07和11.47。如果我们使用 VarianceThresholdSelectorvarianceThreshold = 8.0,那么方差小于等于 8.0 的特征将被移除。

id | features                       | selectedFeatures
---|--------------------------------|-------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]

有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Python 文档

from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
    (2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
    (3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
    (4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
    (5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
    (6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])

selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")

result = selector.fit(df).transform(df)

print("Output: Features with variance lower than %f are removed." %
      selector.getVarianceThreshold())
result.show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/variance_threshold_selector_example.py"。

有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Scala 文档

import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  (2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  (3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  (4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  (5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  (6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)

val df = spark.createDataset(data).toDF("id", "features")

val selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"Output: Features with variance lower than" +
  s" ${selector.getVarianceThreshold} are removed.")
result.show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/VarianceThresholdSelectorExample.scala"。

有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VarianceThresholdSelector selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("Output: Features with variance lower than "
    + selector.getVarianceThreshold() + " are removed.");
result.show();
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVarianceThresholdSelectorExample.java"。

局部敏感哈希

局部敏感哈希 (LSH) 是一种重要的哈希技术,常用于聚类、近似最近邻搜索和大型数据集的异常值检测。

LSH 的一般思想是使用一组函数(“LSH 族”)将数据点哈希到桶中,使得彼此靠近的数据点以高概率位于同一桶中,而彼此远离的数据点则很可能位于不同桶中。LSH 族形式化定义如下:

在度量空间 (M, d) 中,其中 M 是一个集合,dM 上的距离函数,LSH 族是满足以下性质的函数族 h\[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \] 这个 LSH 族被称为 (r1, r2, p1, p2)-敏感。

在 Spark 中,不同的 LSH 族在单独的类中实现(例如,MinHash),并且在每个类中都提供了用于特征转换、近似相似度连接和近似最近邻的 API。

在 LSH 中,我们将假阳性定义为一对相距较远的输入特征($d(p,q) \geq r2$)被哈希到同一个桶中,将假阴性定义为一对相邻特征($d(p,q) \leq r1$)被哈希到不同桶中。

LSH 操作

我们描述了 LSH 可以用于的主要操作类型。一个已拟合的 LSH 模型为这些操作中的每一个都提供了方法。

特征转换

特征转换是基本功能,用于将哈希值作为新列添加。这对于降维很有用。用户可以通过设置 inputColoutputCol 来指定输入和输出列名。

LSH 也支持多个 LSH 哈希表。用户可以通过设置 numHashTables 来指定哈希表的数量。这也用于近似相似性连接和近似最近邻中的 OR-放大。增加哈希表的数量将提高准确性,但也会增加通信成本和运行时间。

outputCol 的类型是 Seq[Vector],其中数组的维度等于 numHashTables,并且向量的维度当前设置为 1。在未来的版本中,我们将实现 AND-放大,以便用户可以指定这些向量的维度。

近似相似度连接

近似相似性连接接受两个数据集,并近似返回数据集中距离小于用户定义阈值的行对。近似相似性连接支持连接两个不同的数据集和自连接。自连接将产生一些重复的对。

近似相似性连接接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将自动进行转换。在这种情况下,哈希签名将创建为 outputCol

在连接后的数据集中,可以在 datasetAdatasetB 中查询原始数据集。输出数据集中将添加一个距离列,以显示返回的每对行之间的真实距离。

近似最近邻搜索接受一个数据集(特征向量)和一个键(单个特征向量),它近似返回数据集中与该向量最接近的指定数量的行。

近似最近邻搜索接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将自动进行转换。在这种情况下,哈希签名将创建为 outputCol

输出数据集中将添加一个距离列,以显示每个输出行与搜索键之间的真实距离。

注意:当哈希桶中没有足够的候选项时,近似最近邻搜索将返回少于 k 行。

LSH 算法

用于欧几里得距离的分桶随机投影

分桶随机投影是欧几里得距离的 LSH 族。欧几里得距离定义如下: \[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \] 其 LSH 族将特征向量 $\mathbf{x}$ 投影到随机单位向量 $\mathbf{v}$ 上,并将投影结果分到哈希桶中: \[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \] 其中 r 是用户定义的分桶长度。分桶长度可用于控制哈希桶的平均大小(以及桶的数量)。较大的分桶长度(即更少的桶)会增加特征被哈希到同一桶的概率(增加真阳性和假阳性的数量)。

分桶随机投影接受任意向量作为输入特征,并支持稀疏向量和密集向量。

有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Python 文档

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/bucketed_random_projection_lsh_example.py"。

有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Scala 文档

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = brp.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala"。

有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.dense(1.0, 1.0)),
  RowFactory.create(1, Vectors.dense(1.0, -1.0)),
  RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
  RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);

List<Row> dataB = Arrays.asList(
    RowFactory.create(4, Vectors.dense(1.0, 0.0)),
    RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
    RowFactory.create(6, Vectors.dense(0.0, 1.0)),
    RowFactory.create(7, Vectors.dense(0.0, -1.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

Vector key = Vectors.dense(1.0, 0.0);

BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes");

BucketedRandomProjectionLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java"。

用于 Jaccard 距离的 MinHash

MinHash 是 Jaccard 距离的一种 LSH 族,其中输入特征是自然数集合。两个集合的 Jaccard 距离由它们的交集和并集的基数定义: \[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \] MinHash 对集合中的每个元素应用一个随机哈希函数 g,并取所有哈希值的最小值: \[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]

MinHash 的输入集合表示为二进制向量,其中向量索引表示元素本身,向量中的非零值表示该元素在集合中存在。虽然同时支持密集向量和稀疏向量,但通常建议使用稀疏向量以提高效率。例如,Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) 表示空间中有 10 个元素。该集合包含元素 2、元素 3 和元素 5。所有非零值都被视为二进制 "1" 值。

注意:MinHash 无法转换空集合,这意味着任何输入向量必须至少有一个非零条目。

有关 API 的更多详细信息,请参阅 MinHashLSH Python 文档

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/python/ml/min_hash_lsh_example.py"。

有关 API 的更多详细信息,请参阅 MinHashLSH Scala 文档

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = mh.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完整示例代码请参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala"。

有关 API 的更多详细信息,请参阅 MinHashLSH Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

List<Row> dataB = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);

MinHashLSH mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes");

MinHashLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
完整示例代码请参见 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java"。