提取、转换和选择特征

本节介绍处理特征的算法,大致分为以下几组:

目录

特征提取器

TF-IDF

词频-逆文档频率 (TF-IDF) 是一种广泛应用于文本挖掘的特征向量化方法,用于反映语料库中某个词条对文档的重要性。用 $t$ 表示词条,$d$ 表示文档,$D$ 表示语料库。词频 $TF(t, d)$ 是词条 $t$ 在文档 $d$ 中出现的次数,而文档频率 $DF(t, D)$ 是包含词条 $t$ 的文档数量。如果我们只使用词频来衡量重要性,很容易过分强调那些出现频率很高但对文档信息量很小的词条,例如“a”、“the”和“of”。如果一个词条在整个语料库中出现频率很高,则意味着它不包含特定文档的特殊信息。逆文档频率是衡量词条提供多少信息的数值指标:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] 其中 $|D|$ 是语料库中的文档总数。由于使用了对数,如果一个词条出现在所有文档中,则其 IDF 值变为 0。请注意,这里应用了一个平滑项,以避免对语料库之外的词条进行除零运算。TF-IDF 度量值只是 TF 和 IDF 的乘积:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 词频和文档频率的定义有多种变体。在 MLlib 中,我们将 TF 和 IDF 分开以使其更加灵活。

TFHashingTFCountVectorizer 都可以用来生成词频向量。

HashingTF 是一个 Transformer,它接收词条集并将这些集合转换为固定长度的特征向量。在文本处理中,“词条集”可能是一个词袋。HashingTF 利用了哈希技巧。原始特征通过应用哈希函数映射到索引(词条)。这里使用的哈希函数是MurmurHash 3。然后根据映射的索引计算词频。这种方法避免了计算全局词条到索引映射的需要,这对于大型语料库来说可能很昂贵,但它存在潜在的哈希冲突,即不同的原始特征在哈希后可能变成相同的词条。为了减少冲突的可能性,我们可以增加目标特征维度,即哈希表的桶数。由于对哈希值进行简单的模运算来确定向量索引,因此建议使用 2 的幂作为特征维度,否则特征将不会均匀地映射到向量索引。默认特征维度为 $2^{18} = 262,144$。一个可选的二进制切换参数控制词频计数。当设置为 true 时,所有非零频率计数都设置为 1。这对于对二进制计数(而不是整数计数)进行建模的离散概率模型特别有用。

CountVectorizer 将文本文档转换为词频向量。有关更多详细信息,请参阅CountVectorizer

IDFIDF 是一个 Estimator,它适合于数据集并生成一个 IDFModelIDFModel 接收特征向量(通常由 HashingTFCountVectorizer 创建)并缩放每个特征。直观地说,它会降低语料库中频繁出现的特征的权重。

注意:spark.ml 不提供文本分段工具。我们建议用户参考斯坦福 NLP 小组scalanlp/chalk

示例

在下面的代码段中,我们从一组句子开始。我们使用 Tokenizer 将每个句子拆分为单词。对于每个句子(词袋),我们使用 HashingTF 将句子哈希到特征向量中。我们使用 IDF 重新缩放特征向量;当使用文本作为特征时,这通常会提高性能。然后,我们的特征向量可以传递给学习算法。

有关 API 的更多详细信息,请参阅HashingTF Python 文档IDF Python 文档

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
在 Spark 存储库的“examples/src/main/python/ml/tf_idf_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅HashingTF Scala 文档IDF Scala 文档

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = spark.createDataFrame(Seq(
  (0.0, "Hi I heard about Spark"),
  (0.0, "I wish Java could use case classes"),
  (1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)

val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)

val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅HashingTF Java 文档IDF Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, "Hi I heard about Spark"),
  RowFactory.create(0.0, "I wish Java could use case classes"),
  RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);

int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(numFeatures);

Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors

IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);

Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaTfIdfExample.java”中查找完整的示例代码。

Word2Vec

Word2Vec 是一个 Estimator,它接收表示文档的单词序列并训练一个 Word2VecModel。该模型将每个单词映射到一个唯一的固定大小的向量。Word2VecModel 使用文档中所有单词的平均值将每个文档转换为一个向量;然后,该向量可以用作预测、文档相似度计算等的特征。有关更多详细信息,请参阅MLlib 用户指南中的 Word2Vec

示例

在下面的代码段中,我们从一组文档开始,每个文档都表示为一个单词序列。对于每个文档,我们将其转换为一个特征向量。然后,该特征向量可以传递给学习算法。

有关 API 的更多详细信息,请参阅Word2Vec Python 文档

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
在 Spark 存储库的“examples/src/main/python/ml/word2vec_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅Word2Vec Scala 文档

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
  println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅Word2Vec Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
  RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
  RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0);

Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);

for (Row row : result.collectAsList()) {
  List<String> text = row.getList(0);
  Vector vector = (Vector) row.get(1);
  System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaWord2VecExample.java”中查找完整的示例代码。

CountVectorizer

CountVectorizerCountVectorizerModel 旨在帮助将文本文档集合转换为词例计数向量。当先验字典不可用时,CountVectorizer 可以用作 Estimator 来提取词汇表,并生成 CountVectorizerModel。该模型为词汇表上的文档生成稀疏表示,然后可以将其传递给 LDA 等其他算法。

在拟合过程中,CountVectorizer 将根据语料库中的词频顺序选择排名靠前的 vocabSize 个词。可选参数 minDF 还通过指定一个词必须出现的最小文档数(如果 < 1.0 则为分数)来影响拟合过程,以将其包含在词汇表中。另一个可选的二进制切换参数控制输出向量。如果设置为 true,则所有非零计数都设置为 1。这对于对二进制计数(而不是整数计数)建模的离散概率模型特别有用。

示例

假设我们有以下包含 idtexts 列的 DataFrame

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

texts 中的每一行都是 Array[String] 类型的文档。调用 CountVectorizer 的 fit 会生成一个带有词汇表(a、b、c)的 CountVectorizerModel。然后,转换后的输出列“vector”包含

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

每个向量表示文档在词汇表上的词例计数。

有关 API 的更多详细信息,请参阅 CountVectorizer Python 文档CountVectorizerModel Python 文档

from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
在 Spark 代码库的“examples/src/main/python/ml/count_vectorizer_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 CountVectorizer Scala 文档CountVectorizerModel Scala 文档

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).show(false)
在 Spark 代码库的“examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 CountVectorizer Java 文档CountVectorizerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("a", "b", "c")),
  RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("feature")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df);

// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
  .setInputCol("text")
  .setOutputCol("feature");

cvModel.transform(df).show(false);
在 Spark 代码库的“examples/src/main/java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java”中查找完整的示例代码。

FeatureHasher

特征哈希将一组分类或数值特征投影到指定维度的特征向量中(通常远小于原始特征空间的维度)。这是使用 哈希技巧 将特征映射到特征向量中的索引来完成的。

FeatureHasher 变换器对多列进行操作。每列可以包含数字或分类特征。列数据类型的行为和处理如下

空(缺失)值将被忽略(在生成的特征向量中隐式为零)。

此处使用的哈希函数也是 HashingTF 中使用的 MurmurHash 3。由于对哈希值的简单模运算用于确定向量索引,因此建议使用 2 的幂作为 numFeatures 参数;否则,特征将不会均匀地映射到向量索引。

示例

假设我们有一个包含 4 个输入列的 DataFrame:realboolstringNumstring。这些不同的数据类型作为输入将说明变换的行为,以生成特征向量列。

real| bool|stringNum|string
----|-----|---------|------
 2.2| true|        1|   foo
 3.3|false|        2|   bar
 4.4|false|        3|   baz
 5.5|false|        4|   foo

然后,FeatureHasher.transform 在此 DataFrame 上的输出为

real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1        |foo   |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2        |bar   |(262144,[6031,  80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3        |baz   |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4        |foo   |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])

然后可以将生成的特征向量传递给学习算法。

有关 API 的更多详细信息,请参阅 FeatureHasher Python 文档

from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)
在 Spark 代码库的“examples/src/main/python/ml/feature_hasher_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 FeatureHasher Scala 文档

import org.apache.spark.ml.feature.FeatureHasher

val dataset = spark.createDataFrame(Seq(
  (2.2, true, "1", "foo"),
  (3.3, false, "2", "bar"),
  (4.4, false, "3", "baz"),
  (5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")

val hasher = new FeatureHasher()
  .setInputCols("real", "bool", "stringNum", "string")
  .setOutputCol("features")

val featurized = hasher.transform(dataset)
featurized.show(false)
在 Spark 代码库的“examples/src/main/scala/org/apache/spark/examples/ml/FeatureHasherExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 FeatureHasher Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(2.2, true, "1", "foo"),
  RowFactory.create(3.3, false, "2", "bar"),
  RowFactory.create(4.4, false, "3", "baz"),
  RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
  new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
  new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
  new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);

FeatureHasher hasher = new FeatureHasher()
  .setInputCols(new String[]{"real", "bool", "stringNum", "string"})
  .setOutputCol("features");

Dataset<Row> featurized = hasher.transform(dataset);

featurized.show(false);
在 Spark 代码库的“examples/src/main/java/org/apache/spark/examples/ml/JavaFeatureHasherExample.java”中查找完整的示例代码。

特征转换器

Tokenizer

分词 是获取文本(例如句子)并将其分解为单个词项(通常是单词)的过程。一个简单的 Tokenizer 类提供了此功能。下面的示例显示了如何将句子拆分为单词序列。

RegexTokenizer 允许基于正则表达式 (regex) 匹配进行更高级的分词。默认情况下,参数“pattern”(正则表达式,默认值:"\\s+")用作分隔符来拆分输入文本。或者,用户可以将参数“gaps”设置为 false,表示正则表达式“pattern”表示“词例”而不是拆分间隙,并将所有匹配的出现作为分词结果。

示例

有关 API 的更多详细信息,请参阅 Tokenizer Python 文档RegexTokenizer Python 文档

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
在 Spark 代码库的“examples/src/main/python/ml/tokenizer_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Tokenizer Scala 文档RegexTokenizer Scala 文档

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val countTokens = udf { (words: Seq[String]) => words.length }

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)
在 Spark 代码库的“examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Tokenizer Java 文档RegexTokenizer Java 文档

import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.Seq;

import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(1, "I wish Java could use case classes"),
  RowFactory.create(2, "Logistic,regression,models,are,neat")
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

RegexTokenizer regexTokenizer = new RegexTokenizer()
    .setInputCol("sentence")
    .setOutputCol("words")
    .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
  "countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);

Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);
在 Spark 代码库的“examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java”中查找完整的示例代码。

StopWordsRemover

停用词 是应该从输入中排除的词,通常是因为这些词出现频率很高,而且没有太多含义。

StopWordsRemover 将字符串序列(例如 Tokenizer 的输出)作为输入,并从输入序列中删除所有停用词。停用词列表由 stopWords 参数指定。可以通过调用 StopWordsRemover.loadDefaultStopWords(language) 来访问某些语言的默认停用词,可用的选项包括“danish”、“dutch”、“english”、“finnish”、“french”、“german”、“hungarian”、“italian”、“norwegian”、“portuguese”、“russian”、“spanish”、“swedish”和“turkish”。布尔参数 caseSensitive 指示匹配是否区分大小写(默认为 false)。

示例

假设我们有以下包含 idraw 列的 DataFrame

 id | raw
----|----------
 0  | [I, saw, the, red, balloon]
 1  | [Mary, had, a, little, lamb]

StopWordsRemover 应用于 raw 作为输入列,filtered 作为输出列,我们应该得到以下结果

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, balloon]  |  [saw, red, balloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

filtered 中,停用词“I”、“the”、“had”和“a”已被过滤掉。

有关 API 的更多详细信息,请参阅 StopWordsRemover Python 文档

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
在 Spark 代码库的“examples/src/main/python/ml/stopwords_remover_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StopWordsRemover Scala 文档

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "balloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show(false)
在 Spark 代码库的“examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StopWordsRemover Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered");

List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
  RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);

StructType schema = new StructType(new StructField[]{
  new StructField(
    "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
在 Spark 代码库的“examples/src/main/java/org/apache/spark/examples/ml/JavaStopWordsRemoverExample.java”中查找完整的示例代码。

$n$ 元语法

n 元语法 是对于某个整数 $n$ 的 $n$ 个词例(通常是单词)的序列。NGram 类可用于将输入特征转换为 $n$ 元语法。

NGram 将字符串序列(例如 Tokenizer 的输出)作为输入。参数 n 用于确定每个 $n$ 元语法中的词例数。输出将包含一系列 $n$ 元语法,其中每个 $n$ 元语法由一个以空格分隔的 $n$ 个连续词的字符串表示。如果输入序列包含少于 n 个字符串,则不会产生输出。

示例

有关 API 的更多详细信息,请参阅 NGram Python 文档

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
在 Spark 代码库的“examples/src/main/python/ml/n_gram_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 NGram Scala 文档

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
在 Spark 代码库的“examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 NGram Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
  RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
  RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField(
    "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);

NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");

Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
在 Spark 代码库的“examples/src/main/java/org/apache/spark/examples/ml/JavaNGramExample.java”中查找完整的示例代码。

Binarizer

二值化是将数值特征阈值化为二进制 (0/1) 特征的过程。

Binarizer 采用通用参数 inputColoutputCol,以及用于二值化的 threshold。大于阈值的特征值将被二值化为 1.0;小于或等于阈值的特征值将被二值化为 0.0。 inputCol 支持 Vector 和 Double 类型。

示例

有关 API 的更多详细信息,请参阅Binarizer Python 文档

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
在 Spark 存储库的“examples/src/main/python/ml/binarizer_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅Binarizer Scala 文档

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅Binarizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 0.1),
  RowFactory.create(1, 0.8),
  RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);

Binarizer binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5);

Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);

System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaBinarizerExample.java”中查找完整的示例代码。

PCA

PCA(主成分分析)是一种统计过程,它使用正交变换将一组可能相关的变量的观测值转换为一组线性不相关的变量的值,这些变量称为主成分。 PCA 类训练一个模型,使用 PCA 将向量投影到低维空间。下面的示例显示了如何将 5 维特征向量投影到 3 维主成分。

示例

有关 API 的更多详细信息,请参阅PCA Python 文档

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
在 Spark 存储库的“examples/src/main/python/ml/pca_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅PCA Scala 文档

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)

val result = pca.transform(df).select("pcaFeatures")
result.show(false)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅PCA Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
  RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
  RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PCAModel pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df);

Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaPCAExample.java”中查找完整的示例代码。

PolynomialExpansion

多项式展开是将特征扩展到多项式空间的过程,该空间由原始维度的 n 次组合构成。 PolynomialExpansion 类提供了此功能。下面的示例显示了如何将特征扩展到 3 次多项式空间。

示例

有关 API 的更多详细信息,请参阅PolynomialExpansion Python 文档

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
在 Spark 存储库的“examples/src/main/python/ml/polynomial_expansion_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅PolynomialExpansion Scala 文档

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(2.0, 1.0),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)

val polyDF = polyExpansion.transform(df)
polyDF.show(false)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅PolynomialExpansion Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(2.0, 1.0)),
  RowFactory.create(Vectors.dense(0.0, 0.0)),
  RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java”中查找完整的示例代码。

离散余弦变换(DCT)

离散余弦变换将时域中的长度为 $N$ 的实值序列转换为频域中的另一个长度为 $N$ 的实值序列。 DCT 类提供了此功能,实现了 DCT-II 并将结果缩放 $1/\sqrt{2}$,使得变换的表示矩阵是酉矩阵。不对变换后的序列应用移位(例如,变换后的序列的第 $0$ 个元素是第 $0$ 个 DCT 系数,而*不是*第 $N/2$ 个)。

示例

有关 API 的更多详细信息,请参阅DCT Python 文档

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
在 Spark 存储库的“examples/src/main/python/ml/dct_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅DCT Scala 文档

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅DCT Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
  RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
  RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

DCT dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false);

Dataset<Row> dctDf = dct.transform(df);

dctDf.select("featuresDCT").show(false);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaDCTExample.java”中查找完整的示例代码。

StringIndexer

StringIndexer 将标签的字符串列编码为标签索引列。 StringIndexer 可以编码多列。索引位于 [0, numLabels) 中,并支持四种排序选项:“frequencyDesc”:按标签频率降序排列(最频繁的标签分配为 0),“frequencyAsc”:按标签频率升序排列(最不频繁的标签分配为 0),“alphabetDesc”:按字母降序排列,以及“alphabetAsc”:按字母升序排列(默认值为“frequencyDesc”)。请注意,如果在“frequencyDesc”/“frequencyAsc”下频率相同,则字符串将按字母顺序进一步排序。

如果用户选择保留未出现的标签,则它们将位于索引 numLabels 处。如果输入列是数字,我们会将其转换为字符串并对字符串值进行索引。当下游管道组件(例如 EstimatorTransformer)使用此字符串索引标签时,必须将组件的输入列设置为此字符串索引列名称。在许多情况下,可以使用 setInputCol 设置输入列。

示例

假设我们有以下包含 idcategory 列的 DataFrame

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category 是一个包含三个标签的字符串列:“a”、“b”和“c”。将 StringIndexer 应用于 category 作为输入列,并将 categoryIndex 作为输出列,我们应该得到以下结果

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

“a”的索引为 0,因为它最频繁,其次是索引为 1 的“c”和索引为 2 的“b”。

此外,关于 StringIndexer 如何处理未出现的标签,有三种策略,当您在一个数据集上拟合 StringIndexer 然后将其用于转换另一个数据集时

示例

让我们回到之前的示例,但这次在以下数据集上重用我们之前定义的 StringIndexer

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

如果您没有设置 StringIndexer 如何处理未出现的标签或将其设置为“error”,则会抛出异常。但是,如果您调用了 setHandleInvalid("skip"),则会生成以下数据集

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

请注意,包含“d”或“e”的行不会出现。

如果您调用 setHandleInvalid("keep"),则会生成以下数据集

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0

请注意,包含“d”或“e”的行映射到索引“3.0”

有关 API 的更多详细信息,请参阅StringIndexer Python 文档

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
在 Spark 存储库的“examples/src/main/python/ml/string_indexer_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅StringIndexer Scala 文档

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅StringIndexer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexer indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex");

Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaStringIndexerExample.java”中查找完整的示例代码。

IndexToString

StringIndexer 对称,IndexToString 将标签索引列映射回包含原始标签的列,这些标签为字符串。一个常见的用例是使用 StringIndexer 从标签生成索引,使用这些索引训练模型,并使用 IndexToString 从预测索引列中检索原始标签。但是,您可以自由提供自己的标签。

示例

StringIndexer 示例的基础上,假设我们有以下包含 idcategoryIndex 列的 DataFrame

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

IndexToString 应用于 categoryIndex 作为输入列,将 originalCategory 作为输出列,我们能够检索原始标签(它们将从列的元数据中推断出来)

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

有关 API 的更多详细信息,请参阅IndexToString Python 文档

from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
在 Spark 存储库的“examples/src/main/python/ml/index_to_string_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅IndexToString Scala 文档

import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

println(s"Transformed string column '${indexer.getInputCol}' " +
    s"to indexed column '${indexer.getOutputCol}'")
indexed.show()

val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
    s"${Attribute.fromStructField(inputColSchema).toString}\n")

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)

println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
    s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅IndexToString Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
Dataset<Row> indexed = indexer.transform(df);

System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
    "to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();

StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
    Attribute.fromStructField(inputColSchema).toString() + "\n");

IndexToString converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);

System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
    "original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaIndexToStringExample.java”中查找完整的示例代码。

OneHotEncoder

独热编码将表示为标签索引的分类特征映射到一个二进制向量,该向量最多只有一个值,表示在所有特征值集合中是否存在特定特征值。这种编码允许期望连续特征的算法(例如逻辑回归)使用分类特征。对于字符串类型的输入数据,通常首先使用 StringIndexer 对分类特征进行编码。

OneHotEncoder 可以转换多列,为每个输入列返回一个独热编码的输出向量列。通常使用 VectorAssembler 将这些向量合并到单个特征向量中。

OneHotEncoder 支持 handleInvalid 参数,以选择如何在转换数据期间处理无效输入。可用选项包括“keep”(任何无效输入都分配给额外的分类索引)和“error”(抛出错误)。

示例

有关 API 的更多详细信息,请参阅OneHotEncoder Python 文档

from pyspark.ml.feature import OneHotEncoder

df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
在 Spark 存储库的“examples/src/main/python/ml/onehot_encoder_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅OneHotEncoder Scala 文档

import org.apache.spark.ml.feature.OneHotEncoder

val df = spark.createDataFrame(Seq(
  (0.0, 1.0),
  (1.0, 0.0),
  (2.0, 1.0),
  (0.0, 2.0),
  (0.0, 1.0),
  (2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")

val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex1", "categoryIndex2"))
  .setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)

val encoded = model.transform(df)
encoded.show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 OneHotEncoder Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, 1.0),
  RowFactory.create(1.0, 0.0),
  RowFactory.create(2.0, 1.0),
  RowFactory.create(0.0, 2.0),
  RowFactory.create(0.0, 1.0),
  RowFactory.create(2.0, 0.0)
);

StructType schema = new StructType(new StructField[]{
  new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

OneHotEncoder encoder = new OneHotEncoder()
  .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
  .setOutputCols(new String[] {"categoryVec1", "categoryVec2"});

OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java” 中查找完整的示例代码。

VectorIndexer

VectorIndexer 帮助对 Vector 数据集中的分类特征进行索引。它既可以自动确定哪些特征是分类特征,也可以将原始值转换为类别索引。具体来说,它执行以下操作

  1. 获取一个 Vector 类型的输入列和一个参数 maxCategories
  2. 根据不同值的个数来决定哪些特征应该是分类特征,其中最多具有 maxCategories 个不同值的特征被声明为分类特征。
  3. 为每个分类特征计算从 0 开始的类别索引。
  4. 对分类特征进行索引,并将原始特征值转换为索引。

对分类特征进行索引允许决策树和树集成等算法适当地处理分类特征,从而提高性能。

示例

在下面的示例中,我们读入一个标记点数据集,然后使用 VectorIndexer 来决定哪些特征应该被视为分类特征。我们将分类特征值转换为它们的索引。然后,可以将此转换后的数据传递给处理分类特征的算法,例如 DecisionTreeRegressor

有关 API 的更多详细信息,请参阅 VectorIndexer Python 文档

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
在 Spark 代码库的 “examples/src/main/python/ml/vector_indexer_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorIndexer Scala 文档

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
  s"categorical features: ${categoricalFeatures.mkString(", ")}")

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorIndexer Java 文档

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaVectorIndexerExample.java” 中查找完整的示例代码。

Interaction

Interaction 是一个 Transformer,它接受向量或双精度值列,并生成一个包含每个输入列中一个值的组合的乘积的向量列。

例如,如果您有 2 个向量类型列,每个列都有 3 个维度作为输入列,那么您将得到一个 9 维向量作为输出列。

示例

假设我们有以下 DataFrame,其中包含 “id1”、“vec1” 和 “vec2” 列

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]     

使用这些输入列应用 Interaction,则作为输出列的 interactedCol 包含

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]       

有关 API 的更多详细信息,请参阅 Interaction Python 文档

from pyspark.ml.feature import Interaction, VectorAssembler

df = spark.createDataFrame(
    [(1, 1, 2, 3, 8, 4, 5),
     (2, 4, 3, 8, 7, 9, 8),
     (3, 6, 1, 9, 2, 3, 6),
     (4, 10, 8, 6, 9, 4, 5),
     (5, 9, 2, 7, 10, 7, 3),
     (6, 1, 1, 4, 2, 8, 4)],
    ["id1", "id2", "id3", "id4", "id5", "id6", "id7"])

assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")

assembled1 = assembler1.transform(df)

assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")

assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")

interacted = interaction.transform(assembled2)

interacted.show(truncate=False)
在 Spark 代码库的 “examples/src/main/python/ml/interaction_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Interaction Scala 文档

import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 10, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 10, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
  setInputCols(Array("id2", "id3", "id4")).
  setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
  setInputCols(Array("id5", "id6", "id7")).
  setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
  .setInputCols(Array("id1", "vec1", "vec2"))
  .setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/InteractionExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Interaction Java 文档

List<Row> data = Arrays.asList(
  RowFactory.create(1, 1, 2, 3, 8, 4, 5),
  RowFactory.create(2, 4, 3, 8, 7, 9, 8),
  RowFactory.create(3, 6, 1, 9, 2, 3, 6),
  RowFactory.create(4, 10, 8, 6, 9, 4, 5),
  RowFactory.create(5, 9, 2, 7, 10, 7, 3),
  RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VectorAssembler assembler1 = new VectorAssembler()
        .setInputCols(new String[]{"id2", "id3", "id4"})
        .setOutputCol("vec1");

Dataset<Row> assembled1 = assembler1.transform(df);

VectorAssembler assembler2 = new VectorAssembler()
        .setInputCols(new String[]{"id5", "id6", "id7"})
        .setOutputCol("vec2");

Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");

Interaction interaction = new Interaction()
        .setInputCols(new String[]{"id1","vec1","vec2"})
        .setOutputCol("interactedCol");

Dataset<Row> interacted = interaction.transform(assembled2);

interacted.show(false);
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java” 中查找完整的示例代码。

Normalizer

Normalizer 是一个 Transformer,它转换 Vector 行的数据集,将每个 Vector 归一化为单位范数。它接受参数 p,该参数指定用于归一化的 p-范数。(默认情况下,$p = 2$。)此归一化可以帮助标准化您的输入数据并改善学习算法的行为。

示例

以下示例演示如何加载 libsvm 格式的数据集,然后将每行归一化为单位 $L^1$ 范数和单位 $L^\infty$ 范数。

有关 API 的更多详细信息,请参阅 Normalizer Python 文档

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
在 Spark 代码库的 “examples/src/main/python/ml/normalizer_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Normalizer Scala 文档

import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Normalizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaNormalizerExample.java” 中查找完整的示例代码。

StandardScaler

StandardScaler 转换 Vector 行的数据集,将每个特征归一化为单位标准偏差和/或零均值。它接受以下参数

StandardScaler 是一个 Estimator,可以在数据集上进行 fit 以生成 StandardScalerModel;这相当于计算汇总统计信息。然后,该模型可以转换数据集中的 Vector 列,使其具有单位标准偏差和/或零均值特征。

请注意,如果某个特征的标准偏差为零,则它将在该特征的 Vector 中返回默认值 0.0

示例

以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征归一化为单位标准偏差。

有关 API 的更多详细信息,请参阅 StandardScaler Python 文档

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 代码库的 “examples/src/main/python/ml/standard_scaler_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StandardScaler Scala 文档

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StandardScaler Java 文档

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaStandardScalerExample.java” 中查找完整的示例代码。

RobustScaler

RobustScaler 转换 Vector 行的数据集,移除中位数并根据特定的分位数范围(默认情况下为 IQR:四分位距,即第 1 个四分位数和第 3 个四分位数之间的分位数范围)对数据进行缩放。它的行为与 StandardScaler 非常相似,但是使用中位数和分位数范围来代替均值和标准偏差,这使得它对异常值具有鲁棒性。它接受以下参数

RobustScaler 是一个 Estimator,可以在数据集上进行 fit 以生成 RobustScalerModel;这相当于计算分位数统计信息。然后,该模型可以转换数据集中的 Vector 列,使其具有单位分位数范围和/或零中位数特征。

请注意,如果某个特征的分位数范围为零,则它将在该特征的 Vector 中返回默认值 0.0

示例

以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征归一化为单位分位数范围。

有关 API 的更多详细信息,请参阅 RobustScaler Python 文档

from pyspark.ml.feature import RobustScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)

# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 代码库的 “examples/src/main/python/ml/robust_scaler_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 RobustScaler Scala 文档

import org.apache.spark.ml.feature.RobustScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75)

// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)

// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/RobustScalerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 RobustScaler Java 文档

import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

RobustScaler scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75);

// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);

// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaRobustScalerExample.java” 中查找完整的示例代码。

MinMaxScaler

MinMaxScaler 转换 Vector 行的数据集,将每个特征重新缩放到特定范围(通常为 [0, 1])。它接受以下参数

MinMaxScaler 计算数据集的汇总统计信息并生成 MinMaxScalerModel。然后,该模型可以分别转换每个特征,使其位于给定范围内。

特征 E 的重新缩放值计算如下,\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation} 对于 $E_{max} == E_{min}$ 的情况,$Rescaled(e_i) = 0.5 * (max + min)$

请注意,由于零值可能会被转换为非零值,因此即使对于稀疏输入,转换器的输出也将是 DenseVector

示例

以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征重新缩放到 [0, 1]。

有关 API 的更多详细信息,请参阅 MinMaxScaler Python 文档MinMaxScalerModel Python 文档

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
在 Spark 代码库的 “examples/src/main/python/ml/min_max_scaler_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 MinMaxScaler Scala 文档MinMaxScalerModel Scala 文档

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -1.0)),
  (1, Vectors.dense(2.0, 1.1, 1.0)),
  (2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅MinMaxScaler Java 文档MinMaxScalerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
    RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
    + scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaMinMaxScalerExample.java” 中查找完整的示例代码。

MaxAbsScaler

MaxAbsScaler 转换 Vector 行的数据集,通过除以每个特征中的最大绝对值,将每个特征重新缩放至 [-1, 1] 范围。它不会移动/中心化数据,因此不会破坏任何稀疏性。

MaxAbsScaler 计算数据集的汇总统计信息,并生成 MaxAbsScalerModel。然后,模型可以将每个特征分别转换为 [-1, 1] 范围。

示例

以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征重新缩放至 [-1, 1]。

有关 API 的更多详细信息,请参阅MaxAbsScaler Python 文档MaxAbsScalerModel Python 文档

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()
在 Spark 代码库的 “examples/src/main/python/ml/max_abs_scaler_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅MaxAbsScaler Scala 文档MaxAbsScalerModel Scala 文档

import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -8.0)),
  (1, Vectors.dense(2.0, 1.0, -4.0)),
  (2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅MaxAbsScaler Java 文档MaxAbsScalerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaMaxAbsScalerExample.java” 中查找完整的示例代码。

Bucketizer

Bucketizer 将连续特征列转换为特征桶列,其中桶由用户指定。它采用参数

请注意,如果您不知道目标列的上限和下限,则应添加 Double.NegativeInfinityDouble.PositiveInfinity 作为分割点的边界,以防止潜在的 Bucketizer 边界异常。

另请注意,您提供的分割点必须严格按升序排列,即 s0 < s1 < s2 < ... < sn

更多详细信息,请参阅Bucketizer 的 API 文档。

示例

以下示例演示如何将 Double 列分桶到另一个索引列中。

有关 API 的更多详细信息,请参阅Bucketizer Python 文档

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
在 Spark 代码库的 “examples/src/main/python/ml/bucketizer_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅Bucketizer Scala 文档

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()

val splitsArray = Array(
  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
  Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))

val data2 = Array(
  (-999.9, -999.9),
  (-0.5, -0.2),
  (-0.3, -0.1),
  (0.0, 0.0),
  (0.2, 0.4),
  (999.9, 999.9))
val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")

val bucketizer2 = new Bucketizer()
  .setInputCols(Array("features1", "features2"))
  .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
  .setSplitsArray(splitsArray)

// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)

println(s"Bucketizer output with [" +
  s"${bucketizer2.getSplitsArray(0).length-1}, " +
  s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅Bucketizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-999.9),
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2),
  RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);

System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();

// Bucketize multiple columns at one pass.
double[][] splitsArray = {
  {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
  {Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};

List<Row> data2 = Arrays.asList(
  RowFactory.create(-999.9, -999.9),
  RowFactory.create(-0.5, -0.2),
  RowFactory.create(-0.3, -0.1),
  RowFactory.create(0.0, 0.0),
  RowFactory.create(0.2, 0.4),
  RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
  new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);

Bucketizer bucketizer2 = new Bucketizer()
  .setInputCols(new String[] {"features1", "features2"})
  .setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
  .setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);

System.out.println("Bucketizer output with [" +
  (bucketizer2.getSplitsArray()[0].length-1) + ", " +
  (bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java” 中查找完整的示例代码。

ElementwiseProduct

ElementwiseProduct 使用元素乘法将每个输入向量乘以提供的“权重”向量。换句话说,它通过标量乘数缩放数据集的每一列。这表示输入向量 v 和变换向量 w 之间的哈达玛积,以产生结果向量。

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

示例

以下示例演示如何使用变换向量值变换向量。

有关 API 的更多详细信息,请参阅ElementwiseProduct Python 文档

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
在 Spark 代码库的 “examples/src/main/python/ml/elementwise_product_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅ElementwiseProduct Scala 文档

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅ElementwiseProduct Java 文档

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaElementwiseProductExample.java” 中查找完整的示例代码。

SQLTransformer

SQLTransformer 实现由 SQL 语句定义的转换。目前,我们仅支持 "SELECT ... FROM __THIS__ ..." 之类的 SQL 语法,其中 "__THIS__" 表示输入数据集的基础表。select 子句指定要在输出中显示的字段、常量和表达式,并且可以是 Spark SQL 支持的任何 select 子句。用户还可以使用 Spark SQL 内置函数和 UDF 对这些选定的列进行操作。例如,SQLTransformer 支持以下语句

示例

假设我们有以下 DataFrame,其中包含列 idv1v2

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

这是语句为 "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"SQLTransformer 的输出

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0

有关 API 的更多详细信息,请参阅SQLTransformer Python 文档

from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
在 Spark 代码库的 “examples/src/main/python/ml/sql_transformer.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅SQLTransformer Scala 文档

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅SQLTransformer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java” 中查找完整的示例代码。

VectorAssembler

VectorAssembler 是一个转换器,它将给定的列列表组合成单个向量列。它对于将原始特征和由不同特征转换器生成的特征组合成单个特征向量非常有用,以便训练逻辑回归和决策树等 ML 模型。VectorAssembler 接受以下输入列类型:所有数字类型、布尔类型和向量类型。在每一行中,输入列的值将按指定顺序连接成一个向量。

示例

假设我们有一个 DataFrame,其中包含列 idhourmobileuserFeaturesclicked

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeatures 是一个包含三个用户特征的向量列。我们希望将 hourmobileuserFeatures 组合成一个名为 features 的特征向量,并使用它来预测是否 clicked。如果我们将 VectorAssembler 的输入列设置为 hourmobileuserFeatures,并将输出列设置为 features,则在转换后,我们应该获得以下 DataFrame

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

有关 API 的更多详细信息,请参阅VectorAssembler Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
在 Spark 代码库的 “examples/src/main/python/ml/vector_assembler_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅VectorAssembler Scala 文档

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
在 Spark 代码库的 “examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅VectorAssembler Java 文档

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
在 Spark 代码库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaVectorAssemblerExample.java” 中查找完整的示例代码。

VectorSizeHint

有时,为 VectorType 列显式指定向量的大小会很有用。例如,VectorAssembler 使用其输入列的大小信息来生成其输出列的大小信息和元数据。虽然在某些情况下,可以通过检查列的内容来获取此信息,但在流式 DataFrame 中,在流启动之前,内容不可用。VectorSizeHint 允许用户为列显式指定向量大小,以便 VectorAssembler 或可能需要知道向量大小的其他转换器可以使用该列作为输入。

要使用 VectorSizeHint,用户必须设置 inputColsize 参数。将此转换器应用于 DataFrame 会生成一个新的 DataFrame,其中包含更新的 inputCol 元数据,用于指定向量大小。对生成的 DataFrame 的下游操作可以使用元数据获取此大小。

VectorSizeHint 还可以接受一个可选的 handleInvalid 参数,该参数控制当向量列包含空值或大小错误的向量时的行为。默认情况下,handleInvalid 设置为“error”,表示应抛出异常。此参数也可以设置为“skip”,表示应从结果数据框中过滤掉包含无效值的行,或设置为“optimistic”,表示不应检查列中是否存在无效值,并且应保留所有行。请注意,使用“optimistic”可能会导致结果数据框处于不一致状态,这意味着应用于 VectorSizeHint 列的元数据与该列的内容不匹配。用户应注意避免这种不一致状态。

有关 API 的更多详细信息,请参阅 VectorSizeHint Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

sizeHint = VectorSizeHint(
    inputCol="userFeatures",
    handleInvalid="skip",
    size=3)

datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
在 Spark 存储库的“examples/src/main/python/ml/vector_size_hint_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSizeHint Scala 文档

import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
    (0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
    (0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3)

val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/VectorSizeHintExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSizeHint Java 文档

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);

VectorSizeHint sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3);

Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSizeHintExample.java”中查找完整的示例代码。

QuantileDiscretizer

QuantileDiscretizer 接受一个具有连续特征的列,并输出一个具有分箱分类特征的列。分箱数量由 numBuckets 参数设置。使用的分箱数量可能小于此值,例如,如果输入的不同值太少而无法创建足够多的不同分位数。

NaN 值:在 QuantileDiscretizer 拟合期间,将从列中删除 NaN 值。这将生成一个用于进行预测的 Bucketizer 模型。在转换过程中,Bucketizer 在数据集中找到 NaN 值时会引发错误,但用户也可以通过设置 handleInvalid 来选择保留或删除数据集中的 NaN 值。如果用户选择保留 NaN 值,则将对它们进行特殊处理并将其放入自己的分箱中,例如,如果使用了 4 个分箱,则非 NaN 数据将放入分箱 [0-3] 中,但 NaN 将被计入特殊分箱 [4] 中。

算法:使用近似算法选择分箱范围(有关详细说明,请参阅 approxQuantile 的文档)。可以使用 relativeError 参数控制近似的精度。设置为零时,将计算精确的分位数(**注意:**计算精确的分位数是一项昂贵的操作)。下限和上限将为 -Infinity+Infinity,涵盖所有实数值。

示例

假设我们有一个包含 idhour 列的 DataFrame

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hour 是一个具有 Double 类型的连续特征。我们想将连续特征转换为分类特征。给定 numBuckets = 3,我们应该得到以下 DataFrame

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Python 文档

from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()
在 Spark 存储库的“examples/src/main/python/ml/quantile_discretizer_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Scala 文档

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show(false)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 18.0),
  RowFactory.create(1, 19.0),
  RowFactory.create(2, 8.0),
  RowFactory.create(3, 5.0),
  RowFactory.create(4, 2.2)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java”中查找完整的示例代码。

Imputer

Imputer 估计器使用缺失值所在列的均值、中位数或众数来填充数据集中的缺失值。输入列应为数字类型。目前,Imputer 不支持分类特征,并且可能会为包含分类特征的列创建不正确的值。Imputer 可以通过 .setMissingValue(custom_value) 来估算除“NaN”以外的自定义值。例如,.setMissingValue(0) 将估算所有出现的 (0)。

**注意** 输入列中的所有 null 值都被视为缺失值,因此也会被估算。

示例

假设我们有一个包含 ab 列的 DataFrame

      a     |      b      
------------|-----------
     1.0    | Double.NaN
     2.0    | Double.NaN
 Double.NaN |     3.0   
     4.0    |     4.0   
     5.0    |     5.0   

在本例中,Imputer 会将所有出现的 Double.NaN(缺失值的默认值)替换为从相应列中的其他值计算出的均值(默认估算策略)。在本例中,列 ab 的替代值分别为 3.0 和 4.0。转换后,输出列中的缺失值将由相关列的替代值替换。

      a     |      b     | out_a | out_b   
------------|------------|-------|-------
     1.0    | Double.NaN |  1.0  |  4.0 
     2.0    | Double.NaN |  2.0  |  4.0 
 Double.NaN |     3.0    |  3.0  |  3.0 
     4.0    |     4.0    |  4.0  |  4.0
     5.0    |     5.0    |  5.0  |  5.0 

有关 API 的更多详细信息,请参阅 Imputer Python 文档

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
在 Spark 存储库的“examples/src/main/python/ml/imputer_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Imputer Scala 文档

import org.apache.spark.ml.feature.Imputer

val df = spark.createDataFrame(Seq(
  (1.0, Double.NaN),
  (2.0, Double.NaN),
  (Double.NaN, 3.0),
  (4.0, 4.0),
  (5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
  .setInputCols(Array("a", "b"))
  .setOutputCols(Array("out_a", "out_b"))

val model = imputer.fit(df)
model.transform(df).show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/ImputerExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Imputer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1.0, Double.NaN),
  RowFactory.create(2.0, Double.NaN),
  RowFactory.create(Double.NaN, 3.0),
  RowFactory.create(4.0, 4.0),
  RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
  createStructField("a", DoubleType, false),
  createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Imputer imputer = new Imputer()
  .setInputCols(new String[]{"a", "b"})
  .setOutputCols(new String[]{"out_a", "out_b"});

ImputerModel model = imputer.fit(df);
model.transform(df).show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaImputerExample.java”中查找完整的示例代码。

特征选择器

VectorSlicer

VectorSlicer 是一个转换器,它接受一个特征向量并输出一个具有原始特征子数组的新特征向量。它对于从向量列中提取特征非常有用。

VectorSlicer 接受一个具有指定索引的向量列,然后输出一个新的向量列,其值是通过这些索引选择的。索引有两种类型:

  1. 表示向量索引的整数索引,setIndices()

  2. 表示向量中特征名称的字符串索引,setNames()。*这要求向量列具有 AttributeGroup,因为实现匹配 Attribute 的名称字段。*

可以通过整数和字符串进行指定。此外,您可以同时使用整数索引和字符串名称。必须至少选择一个特征。不允许重复的特征,因此所选索引和名称之间不能重叠。请注意,如果选择了特征名称,则在遇到空输入属性时将引发异常。

输出向量将首先按照给定的顺序对具有所选索引的特征进行排序,然后按照给定的顺序对所选名称进行排序。

示例

假设我们有一个包含 userFeatures 列的 DataFrame

 userFeatures
------------------
 [0.0, 10.0, 0.5]

userFeatures 是一个包含三个用户特征的向量列。假设 userFeatures 的第一列全为零,因此我们想删除它并仅选择最后两列。VectorSlicer 使用 setIndices(1, 2) 选择最后两个元素,然后生成一个名为 features 的新向量列

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]

还假设我们有 userFeatures 的潜在输入属性,即 ["f1", "f2", "f3"],那么我们可以使用 setNames("f2", "f3") 来选择它们。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

有关 API 的更多详细信息,请参阅 VectorSlicer Python 文档

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
在 Spark 存储库的“examples/src/main/python/ml/vector_slicer_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSlicer Scala 文档

import java.util.Arrays

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

val data = Arrays.asList(
  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  Row(Vectors.dense(-2.0, 2.3, 0.0))
)

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
output.show(false)
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSlicer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

Attribute[] attrs = {
  NumericAttribute.defaultAttr().withName("f1"),
  NumericAttribute.defaultAttr().withName("f2"),
  NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
  RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);

Dataset<Row> dataset =
  spark.createDataFrame(data, (new StructType()).add(group.toStructField()));

VectorSlicer vectorSlicer = new VectorSlicer()
  .setInputCol("userFeatures").setOutputCol("features");

vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})

Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java”中查找完整的示例代码。

RFormula

RFormula 选择由 R 模型公式 指定的列。目前,我们支持 R 运算符的一个有限子集,包括“~”、“.”、“:”、“+”和“-”。基本运算符是

假设 ab 是双精度列,我们使用以下简单示例来说明 RFormula 的效果

RFormula 生成一个特征向量列和一个双精度或字符串列的标签。就像在 R 中将公式用于线性回归时一样,数字列将被转换为双精度。至于字符串输入列,它们将首先使用由 stringOrderType 确定的顺序使用 StringIndexer 进行转换,并删除排序后的最后一类,然后对双精度进行独热编码。

假设一个字符串特征列包含值 {'b', 'a', 'b', 'a', 'c', 'b'},我们设置 stringOrderType 来控制编码

stringOrderType | Category mapped to 0 by StringIndexer |  Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b')          | least frequent category ('c')
'frequencyAsc'  | least frequent category ('c')         | most frequent category ('b')
'alphabetDesc'  | last alphabetical category ('c')      | first alphabetical category ('a')
'alphabetAsc'   | first alphabetical category ('a')     | last alphabetical category ('c')

如果标签列的类型为字符串,则将首先使用 frequencyDesc 排序使用 StringIndexer 将其转换为双精度。如果 DataFrame 中不存在标签列,则将根据公式中指定的响应变量创建输出标签列。

**注意:**排序选项 stringOrderType 不适用于标签列。当对标签列进行索引时,它使用 StringIndexer 中的默认降序频率排序。

示例

假设我们有一个包含 idcountryhourclicked 列的 DataFrame

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

如果我们使用带有公式字符串 clicked ~ country + hourRFormula,这表示我们想要根据 countryhour 预测 clicked,那么在转换之后,我们应该得到以下 DataFrame

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

有关 API 的更多详细信息,请参阅RFormula Python 文档

from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
在 Spark 存储库的“examples/src/main/python/ml/rformula_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅RFormula Scala 文档

import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")

val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅RFormula Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("country", StringType, false),
  createStructField("hour", IntegerType, false),
  createStructField("clicked", DoubleType, false)
});

List<Row> data = Arrays.asList(
  RowFactory.create(7, "US", 18, 1.0),
  RowFactory.create(8, "CA", 12, 0.0),
  RowFactory.create(9, "NZ", 15, 0.0)
);

Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java”中查找完整的示例代码。

ChiSqSelector

ChiSqSelector 代表卡方特征选择。它对具有分类特征的标记数据进行操作。ChiSqSelector 使用卡方独立性检验来决定选择哪些特征。它支持五种选择方法:numTopFeaturespercentilefprfdrfwe

示例

假设我们有一个包含 idfeaturesclicked 列的 DataFrame,它被用作我们要预测的目标

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0

如果我们使用 ChiSqSelectornumTopFeatures = 1,那么根据我们的标签 clickedfeatures 中的最后一列将被选为最有用的特征

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

有关 API 的更多详细信息,请参阅ChiSqSelector Python 文档

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
在 Spark 存储库的“examples/src/main/python/ml/chisq_selector_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅ChiSqSelector Scala 文档

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅ChiSqSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

ChiSqSelector selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
    + " features selected");
result.show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java”中查找完整的示例代码。

UnivariateFeatureSelector

UnivariateFeatureSelector 对具有分类/连续特征的分类/连续标签进行操作。用户可以设置 featureTypelabelType,Spark 将根据指定的 featureTypelabelType 选择要使用的评分函数。

featureType |  labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous  |categorical | ANOVATest (f_classif)
continuous  |continuous  | F-value (f_regression)

它支持五种选择模式:numTopFeaturespercentilefprfdrfwe

默认情况下,选择模式为 numTopFeatures,默认的 selectionThreshold 设置为 50。

示例

假设我们有一个包含 idfeatureslabel 列的 DataFrame,它被用作我们要预测的目标

id | features                       | label
---|--------------------------------|---------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0

如果我们将 featureType 设置为 continuous,将 labelType 设置为 categorical,并将 numTopFeatures 设置为 1,则 features 中的最后一列将被选为最有用的特征

id | features                       | label   | selectedFeatures
---|--------------------------------|---------|------------------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0     | [2.3]
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0     | [4.1]
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0     | [2.5]
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0     | [3.8]
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0     | [3.0]
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0     | [2.1]

有关 API 的更多详细信息,请参阅UnivariateFeatureSelector Python 文档

from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
    (2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
    (3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
    (4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
    (5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
    (6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)

result = selector.fit(df).transform(df)

print("UnivariateFeatureSelector output with top %d features selected using f_classif"
      % selector.getSelectionThreshold())
result.show()
在 Spark 存储库的“examples/src/main/python/ml/univariate_feature_selector_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅UnivariateFeatureSelector Scala 文档

import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  (2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  (3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  (4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  (5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  (6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)

val df = spark.createDataset(data).toDF("id", "features", "label")

val selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
  s" features selected using f_classif")
result.show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/UnivariateFeatureSelectorExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅UnivariateFeatureSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("UnivariateFeatureSelector output with top "
    + selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaUnivariateFeatureSelectorExample.java”中查找完整的示例代码。

VarianceThresholdSelector

VarianceThresholdSelector 是一种选择器,用于删除低方差特征。(样本)方差不大于 varianceThreshold 的特征将被删除。如果未设置,则 varianceThreshold 默认为 0,这意味着只有方差为 0 的特征(即在所有样本中具有相同值的特征)才会被删除。

示例

假设我们有一个包含 idfeatures 列的 DataFrame,它被用作我们要预测的目标

id | features
---|--------------------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]

6 个特征的样本方差分别为 16.67、0.67、8.17、10.17、5.07 和 11.47。如果我们使用 VarianceThresholdSelectorvarianceThreshold = 8.0,则方差 <= 8.0 的特征将被删除

id | features                       | selectedFeatures
---|--------------------------------|-------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]

有关 API 的更多详细信息,请参阅VarianceThresholdSelector Python 文档

from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
    (2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
    (3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
    (4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
    (5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
    (6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])

selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")

result = selector.fit(df).transform(df)

print("Output: Features with variance lower than %f are removed." %
      selector.getVarianceThreshold())
result.show()
在 Spark 存储库的“examples/src/main/python/ml/variance_threshold_selector_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅VarianceThresholdSelector Scala 文档

import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  (2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  (3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  (4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  (5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  (6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)

val df = spark.createDataset(data).toDF("id", "features")

val selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"Output: Features with variance lower than" +
  s" ${selector.getVarianceThreshold} are removed.")
result.show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/VarianceThresholdSelectorExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅VarianceThresholdSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VarianceThresholdSelector selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("Output: Features with variance lower than "
    + selector.getVarianceThreshold() + " are removed.");
result.show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaVarianceThresholdSelectorExample.java”中查找完整的示例代码。

局部敏感哈希

局部敏感哈希 (LSH) 是一类重要的哈希技术,通常用于处理大型数据集的聚类、近似最近邻搜索和异常值检测。

LSH 的总体思路是使用一系列函数(“LSH 族”)将数据点哈希到桶中,以便彼此接近的数据点以高概率位于相同的桶中,而彼此远离的数据点很可能位于不同的桶中。LSH 族在形式上定义如下。

在度量空间 (M, d) 中,其中 M 是一个集合,dM 上的距离函数,LSH 族是满足以下属性的函数族 h\[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \] 这个 LSH 族被称为 (r1, r2, p1, p2) 敏感的。

在 Spark 中,不同的 LSH 族在不同的类中实现(例如,MinHash),并且每个类都提供了用于特征转换、近似相似性连接和近似最近邻的 API。

在 LSH 中,我们将误报定义为一对哈希到同一个桶中的远距离输入特征($d(p,q) \geq r2$),并将漏报定义为一对哈希到不同桶中的近距离特征($d(p,q) \leq r1$)。

LSH 操作

我们描述了 LSH 可用于的主要操作类型。拟合的 LSH 模型对每个操作都有对应的方法。

特征转换

特征转换是添加哈希值作为新列的基本功能。这对于降维很有用。用户可以通过设置 inputColoutputCol 来指定输入和输出列名称。

LSH 还支持多个 LSH 哈希表。用户可以通过设置 numHashTables 来指定哈希表的数量。这还用于近似相似性连接和近似最近邻中的OR-amplification。增加哈希表的数量会提高准确性,但也会增加通信成本和运行时间。

outputCol 的类型是 Seq[Vector],其中数组的维度等于 numHashTables,向量的维度当前设置为 1。在未来的版本中,我们将实现 AND 放大,以便用户可以指定这些向量的维度。

近似相似度连接

近似相似性连接接受两个数据集,并近似返回数据集中距离小于用户定义阈值的行的对。近似相似性连接支持连接两个不同的数据集和自连接。自连接将产生一些重复的对。

近似相似性连接接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将被自动转换。在这种情况下,哈希签名将创建为 outputCol

在连接的数据集中,可以在 datasetAdatasetB 中查询原始数据集。距离列将添加到输出数据集中,以显示返回的每对行之间的真实距离。

近似最近邻搜索接受一个数据集(特征向量)和一个键(单个特征向量),并近似返回数据集中最接近该向量的指定行数。

近似最近邻搜索接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将被自动转换。在这种情况下,哈希签名将创建为 outputCol

距离列将添加到输出数据集中,以显示每个输出行与搜索键之间的真实距离。

**注意:**当哈希桶中没有足够的候选项时,近似最近邻搜索返回的行数将少于 k 行。

LSH 算法

用于欧几里得距离的分桶随机投影

分桶随机投影 是欧几里得距离的 LSH 族。欧几里得距离定义如下:\[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \] 其 LSH 族将特征向量 $\mathbf{x}$ 投影到随机单位向量 $\mathbf{v}$ 上,并将投影结果划分到哈希桶中:\[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \] 其中 r 是用户定义的桶长度。桶长度可用于控制哈希桶的平均大小(以及桶的数量)。较大的桶长度(即,较少的桶)会增加将特征哈希到同一个桶的概率(增加真假阳性的数量)。

分桶随机投影接受任意向量作为输入特征,并支持稀疏向量和密集向量。

有关 API 的更多详细信息,请参阅BucketedRandomProjectionLSH Python 文档

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库的“examples/src/main/python/ml/bucketed_random_projection_lsh_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅BucketedRandomProjectionLSH Scala 文档

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = brp.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅BucketedRandomProjectionLSH Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.dense(1.0, 1.0)),
  RowFactory.create(1, Vectors.dense(1.0, -1.0)),
  RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
  RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);

List<Row> dataB = Arrays.asList(
    RowFactory.create(4, Vectors.dense(1.0, 0.0)),
    RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
    RowFactory.create(6, Vectors.dense(0.0, 1.0)),
    RowFactory.create(7, Vectors.dense(0.0, -1.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

Vector key = Vectors.dense(1.0, 0.0);

BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes");

BucketedRandomProjectionLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java”中查找完整的示例代码。

用于 Jaccard 距离的 MinHash

MinHash 是用于 Jaccard 距离的 LSH 族,其中输入特征是自然数集。两个集合的 Jaccard 距离由其交集和并集的基数定义:\[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \] MinHash 对集合中的每个元素应用随机哈希函数 g,并取所有哈希值的最小值:\[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]

MinHash 的输入集表示为二进制向量,其中向量索引表示元素本身,向量中的非零值表示该元素在集合中的存在。虽然支持密集向量和稀疏向量,但通常建议使用稀疏向量以提高效率。例如,Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) 表示空间中有 10 个元素。此集合包含元素 2、元素 3 和元素 5。所有非零值都被视为二进制“1”值。

**注意:**空集不能由 MinHash 转换,这意味着任何输入向量必须至少有一个非零条目。

有关 API 的更多详细信息,请参阅MinHashLSH Python 文档

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库的“examples/src/main/python/ml/min_hash_lsh_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅MinHashLSH Scala 文档

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = mh.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅MinHashLSH Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

List<Row> dataB = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);

MinHashLSH mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes");

MinHashLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java”中查找完整的示例代码。