提取、转换和选择特征

本节介绍用于处理特征的算法,大致分为以下几组

目录

特征提取器

TF-IDF

词频-逆文档频率 (TF-IDF)是一种特征向量化方法,广泛用于文本挖掘,以反映术语对语料库中文档的重要性。用 $t$ 表示术语,用 $d$ 表示文档,用 $D$ 表示语料库。 词频 $TF(t, d)$ 是术语 $t$ 在文档 $d$ 中出现的次数,而文档频率 $DF(t, D)$ 是包含术语 $t$ 的文档的数量。如果我们仅使用词频来衡量重要性,则很容易过度强调出现非常频繁但对文档几乎没有信息的术语,例如“a”、“the”和“of”。如果一个术语在语料库中出现非常频繁,则意味着它没有携带关于特定文档的特殊信息。逆文档频率是衡量术语提供多少信息的数值度量: \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] 其中 $|D|$ 是语料库中文档的总数。由于使用了对数,如果一个术语出现在所有文档中,则其 IDF 值变为 0。请注意,应用了平滑项以避免为语料库之外的术语除以零。TF-IDF 度量只是 TF 和 IDF 的乘积: \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 词频和文档频率的定义有几个变体。在 MLlib 中,我们将 TF 和 IDF 分开以使其灵活。

TFHashingTFCountVectorizer 都可以用于生成词频向量。

HashingTF 是一个 Transformer,它接受术语集并将这些集合转换为固定长度的特征向量。在文本处理中,“术语集”可能是词袋。HashingTF 利用 哈希技巧。通过应用哈希函数,将原始特征映射到索引(术语)中。这里使用的哈希函数是 MurmurHash 3。然后基于映射的索引计算词频。这种方法避免了计算全局术语到索引映射的需要,这对于大型语料库来说可能非常昂贵,但它存在潜在的哈希冲突,其中不同的原始特征在哈希后可能变成相同的术语。为了减少冲突的机会,我们可以增加目标特征维度,即哈希表的桶数。由于使用哈希值的简单模数来确定向量索引,因此建议使用 2 的幂作为特征维度,否则特征将不会均匀地映射到向量索引。默认特征维度是 $2^{18} = 262,144$。一个可选的二进制切换参数控制词频计数。当设置为 true 时,所有非零频率计数都设置为 1。这对于对二进制(而不是整数)计数建模的离散概率模型特别有用。

CountVectorizer 将文本文档转换为术语计数的向量。有关更多详细信息,请参阅 CountVectorizer

IDFIDF 是一个 Estimator,它适合于数据集并生成一个 IDFModelIDFModel 接受特征向量(通常从 HashingTFCountVectorizer 创建)并缩放每个特征。直观地说,它会降低在语料库中频繁出现的特征的权重。

注意: spark.ml 不提供文本分段工具。我们建议用户参考 Stanford NLP Groupscalanlp/chalk

示例

在以下代码段中,我们从一组句子开始。我们使用 Tokenizer 将每个句子拆分为单词。对于每个句子(词袋),我们使用 HashingTF 将句子哈希到特征向量中。我们使用 IDF 来重新缩放特征向量;这通常在使用文本作为特征时可以提高性能。然后,我们的特征向量可以传递给学习算法。

有关 API 的更多详细信息,请参阅 HashingTF Python 文档IDF Python 文档

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
在 Spark 存储库中的“examples/src/main/python/ml/tf_idf_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 HashingTF Scala 文档IDF Scala 文档

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = spark.createDataFrame(Seq(
  (0.0, "Hi I heard about Spark"),
  (0.0, "I wish Java could use case classes"),
  (1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)

val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)

val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 HashingTF Java 文档IDF Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, "Hi I heard about Spark"),
  RowFactory.create(0.0, "I wish Java could use case classes"),
  RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);

int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(numFeatures);

Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors

IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);

Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
在 Spark 存储库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaTfIdfExample.java”中查找完整的示例代码。

Word2Vec

Word2Vec 是一个 Estimator,它接受代表文档的单词序列并训练一个 Word2VecModel。该模型将每个单词映射到一个唯一的固定大小向量。Word2VecModel 使用文档中所有单词的平均值将每个文档转换为向量;然后,此向量可用作预测、文档相似度计算等的特征。有关更多详细信息,请参阅 MLlib 用户指南中的 Word2Vec

示例

在以下代码段中,我们从一组文档开始,每个文档都表示为单词序列。对于每个文档,我们将其转换为特征向量。然后,可以将此特征向量传递给学习算法。

有关 API 的更多详细信息,请参阅 Word2Vec Python 文档

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
在 Spark 存储库中的“examples/src/main/python/ml/word2vec_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Word2Vec Scala 文档

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
  println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Word2Vec Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
  RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
  RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0);

Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);

for (Row row : result.collectAsList()) {
  List<String> text = row.getList(0);
  Vector vector = (Vector) row.get(1);
  System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
在 Spark 存储库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaWord2VecExample.java”中查找完整的示例代码。

CountVectorizer

CountVectorizerCountVectorizerModel 旨在帮助将文本文件集合转换为词频计数向量。当没有先验字典可用时,可以使用 CountVectorizer 作为 Estimator 来提取词汇表,并生成 CountVectorizerModel。该模型为文档生成基于词汇表的稀疏表示,然后可以将其传递给其他算法,例如 LDA。

在拟合过程中,CountVectorizer 将选择按语料库中的词频排序的前 vocabSize 个词。 可选参数 minDF 也会影响拟合过程,它指定一个词必须出现在多少个文档(如果 < 1.0 则是文档的比例)中才能被包含在词汇表中。 另一个可选的二进制切换参数控制输出向量。 如果设置为 true,则所有非零计数都设置为 1。 这对于对二进制(而不是整数)计数建模的离散概率模型特别有用。

示例

假设我们有以下 DataFrame,其中包含列 idtexts

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

texts 中的每一行都是 Array[String] 类型的文档。 调用 CountVectorizer 的 fit 方法会生成一个带有词汇表 (a, b, c) 的 CountVectorizerModel。 然后,转换后的输出列 “vector” 包含

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

每个向量表示文档在词汇表上的词频计数。

有关 API 的更多详细信息,请参阅 CountVectorizer Python 文档CountVectorizerModel Python 文档

from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
在 Spark 仓库中的 "examples/src/main/python/ml/count_vectorizer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 CountVectorizer Scala 文档CountVectorizerModel Scala 文档

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).show(false)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 CountVectorizer Java 文档CountVectorizerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("a", "b", "c")),
  RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("feature")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df);

// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
  .setInputCol("text")
  .setOutputCol("feature");

cvModel.transform(df).show(false);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java" 中查找完整的示例代码。

FeatureHasher

特征哈希将一组类别特征或数值特征投影到指定维度的特征向量中(通常远小于原始特征空间的维度)。 这是通过使用 哈希技巧 将特征映射到特征向量中的索引来完成的。

FeatureHasher 转换器对多个列进行操作。 每列可以包含数值特征或类别特征。 列数据类型的行为和处理方式如下:

空(缺失)值将被忽略(在生成的特征向量中隐式为零)。

此处使用的哈希函数也是 MurmurHash 3,它在 HashingTF 中使用。 由于使用哈希值的简单模数来确定向量索引,因此建议使用 2 的幂作为 numFeatures 参数; 否则,特征将不会均匀地映射到向量索引。

示例

假设我们有一个 DataFrame,其中包含 4 个输入列 realboolstringNumstring。 这些不同的数据类型作为输入将说明转换的行为,以生成特征向量的列。

real| bool|stringNum|string
----|-----|---------|------
 2.2| true|        1|   foo
 3.3|false|        2|   bar
 4.4|false|        3|   baz
 5.5|false|        4|   foo

然后,此 DataFrame 上的 FeatureHasher.transform 的输出为

real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1        |foo   |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2        |bar   |(262144,[6031,  80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3        |baz   |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4        |foo   |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])

然后,生成的特征向量可以传递给学习算法。

有关 API 的更多详细信息,请参阅 FeatureHasher Python 文档

from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)
在 Spark 仓库中的 "examples/src/main/python/ml/feature_hasher_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 FeatureHasher Scala 文档

import org.apache.spark.ml.feature.FeatureHasher

val dataset = spark.createDataFrame(Seq(
  (2.2, true, "1", "foo"),
  (3.3, false, "2", "bar"),
  (4.4, false, "3", "baz"),
  (5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")

val hasher = new FeatureHasher()
  .setInputCols("real", "bool", "stringNum", "string")
  .setOutputCol("features")

val featurized = hasher.transform(dataset)
featurized.show(false)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/FeatureHasherExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 FeatureHasher Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(2.2, true, "1", "foo"),
  RowFactory.create(3.3, false, "2", "bar"),
  RowFactory.create(4.4, false, "3", "baz"),
  RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
  new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
  new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
  new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);

FeatureHasher hasher = new FeatureHasher()
  .setInputCols(new String[]{"real", "bool", "stringNum", "string"})
  .setOutputCol("features");

Dataset<Row> featurized = hasher.transform(dataset);

featurized.show(false);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaFeatureHasherExample.java" 中查找完整的示例代码。

特征转换器

Tokenizer

分词是将文本(例如句子)分解为单个术语(通常是单词)的过程。 简单的 Tokenizer 类提供了此功能。 下面的示例展示了如何将句子拆分为单词序列。

RegexTokenizer 允许基于正则表达式 (regex) 匹配进行更高级的分词。 默认情况下,参数“pattern”(regex,默认值:"\\s+")用作分隔符来拆分输入文本。 或者,用户可以将参数“gaps”设置为 false,表明 regex “pattern” 表示“token”而不是拆分间隙,并找到所有匹配项作为分词结果。

示例

有关 API 的更多详细信息,请参阅 Tokenizer Python 文档RegexTokenizer Python 文档

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
在 Spark 仓库中的 "examples/src/main/python/ml/tokenizer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Tokenizer Scala 文档RegexTokenizer Scala 文档

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val countTokens = udf { (words: Seq[String]) => words.length }

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Tokenizer Java 文档RegexTokenizer Java 文档

import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.Seq;

import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(1, "I wish Java could use case classes"),
  RowFactory.create(2, "Logistic,regression,models,are,neat")
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

RegexTokenizer regexTokenizer = new RegexTokenizer()
    .setInputCol("sentence")
    .setOutputCol("words")
    .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
  "countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);

Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java" 中查找完整的示例代码。

StopWordsRemover

停用词是从输入中排除的单词,通常是因为这些词出现频率很高并且没有携带太多含义。

StopWordsRemover 接收字符串序列(例如,Tokenizer 的输出)作为输入,并删除输入序列中的所有停用词。 停用词列表由 stopWords 参数指定。 通过调用 StopWordsRemover.loadDefaultStopWords(language) 可以访问某些语言的默认停用词,可用的选项包括“danish”、“dutch”、“english”、“finnish”、“french”、“german”、“hungarian”、“italian”、“norwegian”、“portuguese”、“russian”、“spanish”、“swedish”和“turkish”。 布尔参数 caseSensitive 指示匹配是否应区分大小写(默认为 false)。

示例

假设我们有以下 DataFrame,其中包含列 idraw

 id | raw
----|----------
 0  | [I, saw, the, red, balloon]
 1  | [Mary, had, a, little, lamb]

StopWordsRemover 应用于以 raw 作为输入列和 filtered 作为输出列,我们应该得到以下结果

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, balloon]  |  [saw, red, balloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

filtered 中,停用词“I”、“the”、“had”和“a”已被过滤掉。

有关 API 的更多详细信息,请参阅 StopWordsRemover Python 文档

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
在 Spark 仓库中的 "examples/src/main/python/ml/stopwords_remover_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StopWordsRemover Scala 文档

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "balloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show(false)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StopWordsRemover Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered");

List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
  RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);

StructType schema = new StructType(new StructField[]{
  new StructField(
    "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaStopWordsRemoverExample.java" 中查找完整的示例代码。

$n$-gram

n-gram 是对于某个整数 $n$ 的 $n$ 个 token(通常是单词)的序列。NGram 类可用于将输入特征转换为 $n$-gram。

NGram 接收字符串序列(例如,Tokenizer 的输出)作为输入。 参数 n 用于确定每个 $n$-gram 中的术语数。 输出将由 $n$-gram 序列组成,其中每个 $n$-gram 由 $n$ 个连续单词的空格分隔字符串表示。 如果输入序列包含少于 n 个字符串,则不会生成任何输出。

示例

有关 API 的更多详细信息,请参阅 NGram Python 文档

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
在 Spark 仓库中的 "examples/src/main/python/ml/n_gram_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 NGram Scala 文档

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 NGram Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
  RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
  RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField(
    "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);

NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");

Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/ml/JavaNGramExample.java" 中查找完整的示例代码。

Binarizer

二值化是将数值特征阈值化为二进制 (0/1) 特征的过程。

Binarizer 接受常见的参数 inputColoutputCol,以及二值化的 threshold。特征值大于阈值的会被二值化为 1.0;小于或等于阈值的会被二值化为 0.0。 inputCol 支持 Vector 和 Double 类型。

示例

有关 API 的更多详细信息,请参阅 Binarizer Python 文档

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
在 Spark 仓库的 "examples/src/main/python/ml/binarizer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Binarizer Scala 文档

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Binarizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 0.1),
  RowFactory.create(1, 0.8),
  RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);

Binarizer binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5);

Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);

System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaBinarizerExample.java" 中查找完整的示例代码。

PCA

PCA 是一种统计程序,它使用正交变换将可能相关的变量的一组观测值转换为一组线性不相关的变量值,称为主成分。PCA 类训练一个模型,使用 PCA 将向量投影到低维空间。下面的示例展示了如何将 5 维特征向量投影到 3 维主成分中。

示例

有关 API 的更多详细信息,请参阅 PCA Python 文档

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
在 Spark 仓库的 "examples/src/main/python/ml/pca_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 PCA Scala 文档

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)

val result = pca.transform(df).select("pcaFeatures")
result.show(false)
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 PCA Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
  RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
  RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PCAModel pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df);

Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaPCAExample.java" 中查找完整的示例代码。

PolynomialExpansion

多项式展开是将特征扩展到多项式空间的过程,该空间由原始维度的 n 次组合构成。 PolynomialExpansion 类提供此功能。下面的示例展示了如何将特征扩展到 3 次多项式空间。

示例

有关 API 的更多详细信息,请参阅 PolynomialExpansion Python 文档

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
在 Spark 仓库的 "examples/src/main/python/ml/polynomial_expansion_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 PolynomialExpansion Scala 文档

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(2.0, 1.0),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)

val polyDF = polyExpansion.transform(df)
polyDF.show(false)
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 PolynomialExpansion Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(2.0, 1.0)),
  RowFactory.create(Vectors.dense(0.0, 0.0)),
  RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java" 中查找完整的示例代码。

离散余弦变换 (DCT)

离散余弦变换将长度为 $N$ 的实值序列在时域中转换为另一个长度为 $N$ 的实值序列在频域中。 DCT 类提供此功能,实现 DCT-II 并将结果缩放 $1/\sqrt{2}$,以便变换的表示矩阵是酉矩阵。 不会对变换后的序列应用移位(例如,变换后的序列的第 $0$ 个元素是第 $0$ 个 DCT 系数,不是第 $N/2$ 个)。

示例

有关 API 的更多详细信息,请参阅 DCT Python 文档

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
在 Spark 仓库的 "examples/src/main/python/ml/dct_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 DCT Scala 文档

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 DCT Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
  RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
  RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

DCT dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false);

Dataset<Row> dctDf = dct.transform(df);

dctDf.select("featuresDCT").show(false);
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaDCTExample.java" 中查找完整的示例代码。

StringIndexer

StringIndexer 将标签的字符串列编码为标签索引列。StringIndexer 可以对多个列进行编码。索引在 [0, numLabels) 中,并支持四种排序选项:“frequencyDesc”:按标签频率降序排列(最常见的标签分配为 0),“frequencyAsc”:按标签频率升序排列(最不常见的标签分配为 0),“alphabetDesc”:按字母降序排列,以及“alphabetAsc”:按字母升序排列(默认 = “frequencyDesc”)。请注意,在使用“frequencyDesc”/“frequencyAsc”时,如果频率相等,则字符串将按字母顺序进一步排序。

如果用户选择保留未见标签,则这些标签将位于索引 numLabels 处。 如果输入列是数字,我们会将其转换为字符串并索引字符串值。 当下游管道组件(例如 EstimatorTransformer)使用此字符串索引标签时,您必须将组件的输入列设置为此字符串索引的列名。 在许多情况下,您可以使用 setInputCol 设置输入列。

示例

假设我们有以下带有 idcategory 列的 DataFrame

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category 是一个包含三个标签的字符串列:“a”、“b”和“c”。 将 StringIndexer 应用于以 category 作为输入列,以 categoryIndex 作为输出列时,我们应该得到以下结果

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

“a”获得索引 0,因为它最频繁,其次是“c”的索引 1 和“b”的索引 2

此外,关于 StringIndexer 将如何处理未见标签,有三种策略,当你已经在一个数据集上拟合了一个 StringIndexer,然后用它来转换另一个数据集

示例

让我们回到之前的例子,但这次在以下数据集上重用我们之前定义的 StringIndexer

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

如果您没有设置 StringIndexer 如何处理未见标签,或者将其设置为“error”,则会抛出异常。 但是,如果您调用了 setHandleInvalid("skip"),将生成以下数据集

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

请注意,不包含“d”或“e”的行不会显示。

如果您调用 setHandleInvalid("keep"),将生成以下数据集

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0

请注意,包含“d”或“e”的行映射到索引“3.0”

有关 API 的更多详细信息,请参阅 StringIndexer Python 文档

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
在 Spark 仓库的 "examples/src/main/python/ml/string_indexer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StringIndexer Scala 文档

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StringIndexer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexer indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex");

Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaStringIndexerExample.java" 中查找完整的示例代码。

IndexToString

StringIndexer 对称,IndexToString 将标签索引列映射回包含原始标签的字符串列。 一个常见的用例是使用 StringIndexer 从标签生成索引,使用这些索引训练模型,并使用 IndexToString 从预测索引列中检索原始标签。 但是,您可以自由提供您自己的标签。

示例

StringIndexer 示例的基础上,假设我们有以下带有 idcategoryIndex 列的 DataFrame

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

IndexToString 应用于以 categoryIndex 作为输入列,以 originalCategory 作为输出列时,我们能够检索到原始标签(它们将从列的元数据中推断出来)

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

有关 API 的更多详细信息,请参阅 IndexToString Python 文档

from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
在 Spark 仓库的 "examples/src/main/python/ml/index_to_string_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 IndexToString Scala 文档

import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

println(s"Transformed string column '${indexer.getInputCol}' " +
    s"to indexed column '${indexer.getOutputCol}'")
indexed.show()

val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
    s"${Attribute.fromStructField(inputColSchema).toString}\n")

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)

println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
    s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 IndexToString Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
Dataset<Row> indexed = indexer.transform(df);

System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
    "to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();

StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
    Attribute.fromStructField(inputColSchema).toString() + "\n");

IndexToString converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);

System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
    "original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaIndexToStringExample.java" 中查找完整的示例代码。

OneHotEncoder

独热编码将分类特征(表示为标签索引)映射到一个二进制向量,该向量最多只有一个值为 1,表示在所有特征值集合中存在特定的特征值。 此编码允许期望连续特征的算法(例如 Logistic 回归)使用分类特征。 对于字符串类型输入数据,通常首先使用 StringIndexer 对分类特征进行编码。

OneHotEncoder 可以转换多个列,为每个输入列返回一个 one-hot 编码的输出向量列。通常使用 VectorAssembler 将这些向量合并成一个单一的特征向量。

OneHotEncoder 支持 handleInvalid 参数,用于选择在转换数据期间如何处理无效输入。可用选项包括 “keep”(任何无效输入都将分配给一个额外的类别索引)和 “error”(抛出错误)。

示例

有关 API 的更多详细信息,请参阅 OneHotEncoder Python 文档

from pyspark.ml.feature import OneHotEncoder

df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
在 Spark 仓库的 “examples/src/main/python/ml/onehot_encoder_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 OneHotEncoder Scala 文档

import org.apache.spark.ml.feature.OneHotEncoder

val df = spark.createDataFrame(Seq(
  (0.0, 1.0),
  (1.0, 0.0),
  (2.0, 1.0),
  (0.0, 2.0),
  (0.0, 1.0),
  (2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")

val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex1", "categoryIndex2"))
  .setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)

val encoded = model.transform(df)
encoded.show()
在 Spark 仓库的 “examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 OneHotEncoder Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, 1.0),
  RowFactory.create(1.0, 0.0),
  RowFactory.create(2.0, 1.0),
  RowFactory.create(0.0, 2.0),
  RowFactory.create(0.0, 1.0),
  RowFactory.create(2.0, 0.0)
);

StructType schema = new StructType(new StructField[]{
  new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

OneHotEncoder encoder = new OneHotEncoder()
  .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
  .setOutputCols(new String[] {"categoryVec1", "categoryVec2"});

OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
在 Spark 仓库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java” 中查找完整的示例代码。

VectorIndexer

VectorIndexer 帮助索引 Vector 数据集中的类别特征。它可以自动决定哪些特征是类别特征,并将原始值转换为类别索引。具体来说,它执行以下操作:

  1. 接收一个类型为 Vector 的输入列和一个参数 maxCategories
  2. 根据不同值的数量来决定哪些特征应该是类别特征,其中最多具有 maxCategories 的特征被声明为类别特征。
  3. 为每个类别特征计算从 0 开始的类别索引。
  4. 索引类别特征并将原始特征值转换为索引。

索引类别特征允许诸如决策树和树集成之类的算法适当地处理类别特征,从而提高性能。

示例

在下面的示例中,我们读入一个标记点数据集,然后使用 VectorIndexer 来决定哪些特征应被视为类别特征。我们将类别特征值转换为它们的索引。然后,可以将此转换后的数据传递给诸如 DecisionTreeRegressor 之类的可以处理类别特征的算法。

有关 API 的更多详细信息,请参阅 VectorIndexer Python 文档

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
在 Spark 仓库的 “examples/src/main/python/ml/vector_indexer_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorIndexer Scala 文档

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
  s"categorical features: ${categoricalFeatures.mkString(", ")}")

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
在 Spark 仓库的 “examples/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorIndexer Java 文档

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
在 Spark 仓库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaVectorIndexerExample.java” 中查找完整的示例代码。

Interaction

Interaction 是一个 Transformer,它接受向量或双精度值列,并生成一个包含每个输入列中一个值的所有组合的乘积的单一向量列。

例如,如果你有两个向量类型列,每个列都有 3 个维度作为输入列,那么你将得到一个 9 维向量作为输出列。

示例

假设我们有以下具有列 “id1”、“vec1” 和 “vec2” 的 DataFrame

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]     

应用具有这些输入列的 Interaction,然后作为输出列的 interactedCol 包含

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]       

有关 API 的更多详细信息,请参阅 Interaction Python 文档

from pyspark.ml.feature import Interaction, VectorAssembler

df = spark.createDataFrame(
    [(1, 1, 2, 3, 8, 4, 5),
     (2, 4, 3, 8, 7, 9, 8),
     (3, 6, 1, 9, 2, 3, 6),
     (4, 10, 8, 6, 9, 4, 5),
     (5, 9, 2, 7, 10, 7, 3),
     (6, 1, 1, 4, 2, 8, 4)],
    ["id1", "id2", "id3", "id4", "id5", "id6", "id7"])

assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")

assembled1 = assembler1.transform(df)

assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")

assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")

interacted = interaction.transform(assembled2)

interacted.show(truncate=False)
在 Spark 仓库的 “examples/src/main/python/ml/interaction_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Interaction Scala 文档

import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 10, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 10, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
  setInputCols(Array("id2", "id3", "id4")).
  setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
  setInputCols(Array("id5", "id6", "id7")).
  setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
  .setInputCols(Array("id1", "vec1", "vec2"))
  .setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)
在 Spark 仓库的 “examples/src/main/scala/org/apache/spark/examples/ml/InteractionExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Interaction Java 文档

List<Row> data = Arrays.asList(
  RowFactory.create(1, 1, 2, 3, 8, 4, 5),
  RowFactory.create(2, 4, 3, 8, 7, 9, 8),
  RowFactory.create(3, 6, 1, 9, 2, 3, 6),
  RowFactory.create(4, 10, 8, 6, 9, 4, 5),
  RowFactory.create(5, 9, 2, 7, 10, 7, 3),
  RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VectorAssembler assembler1 = new VectorAssembler()
        .setInputCols(new String[]{"id2", "id3", "id4"})
        .setOutputCol("vec1");

Dataset<Row> assembled1 = assembler1.transform(df);

VectorAssembler assembler2 = new VectorAssembler()
        .setInputCols(new String[]{"id5", "id6", "id7"})
        .setOutputCol("vec2");

Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");

Interaction interaction = new Interaction()
        .setInputCols(new String[]{"id1","vec1","vec2"})
        .setOutputCol("interactedCol");

Dataset<Row> interacted = interaction.transform(assembled2);

interacted.show(false);
在 Spark 仓库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java” 中查找完整的示例代码。

Normalizer

Normalizer 是一个 Transformer,它转换 Vector 行的数据集,将每个 Vector 规范化为具有单位范数。它采用参数 p,该参数指定用于规范化的 p-范数。(默认情况下 $p = 2$。)这种规范化可以帮助标准化你的输入数据并改善学习算法的行为。

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每一行规范化为具有单位 $L^1$ 范数和单位 $L^\infty$ 范数。

有关 API 的更多详细信息,请参阅 Normalizer Python 文档

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
在 Spark 仓库的 “examples/src/main/python/ml/normalizer_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Normalizer Scala 文档

import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
在 Spark 仓库的 “examples/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Normalizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
在 Spark 仓库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaNormalizerExample.java” 中查找完整的示例代码。

StandardScaler

StandardScaler 转换 Vector 行的数据集,将每个特征规范化为具有单位标准差和/或零均值。它采用参数

StandardScaler 是一个 Estimator,可以 fit 在数据集上以生成一个 StandardScalerModel;这相当于计算摘要统计信息。然后,该模型可以将数据集中的 Vector 列转换为具有单位标准差和/或零均值特征。

请注意,如果某个特征的标准差为零,它将在该特征的 Vector 中返回默认的 0.0 值。

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征规范化为具有单位标准差。

有关 API 的更多详细信息,请参阅 StandardScaler Python 文档

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 仓库的 “examples/src/main/python/ml/standard_scaler_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StandardScaler Scala 文档

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 仓库的 “examples/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 StandardScaler Java 文档

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
在 Spark 仓库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaStandardScalerExample.java” 中查找完整的示例代码。

RobustScaler

RobustScaler 转换 Vector 行的数据集,删除中位数并根据特定的分位数范围(默认情况下为 IQR:四分位距,第 1 四分位数和第 3 四分位数之间的分位数范围)缩放数据。它的行为与 StandardScaler 非常相似,但是使用中位数和分位数范围而不是均值和标准差,这使其对异常值具有鲁棒性。它采用参数

RobustScaler 是一个 Estimator,可以 fit 在数据集上以生成一个 RobustScalerModel;这相当于计算分位数统计信息。然后,该模型可以将数据集中的 Vector 列转换为具有单位分位数范围和/或零中位数特征。

请注意,如果某个特征的分位数范围为零,它将在该特征的 Vector 中返回默认的 0.0 值。

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征规范化为具有单位分位数范围。

有关 API 的更多详细信息,请参阅 RobustScaler Python 文档

from pyspark.ml.feature import RobustScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)

# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 仓库的 “examples/src/main/python/ml/robust_scaler_example.py” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 RobustScaler Scala 文档

import org.apache.spark.ml.feature.RobustScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75)

// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)

// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
在 Spark 仓库的 “examples/src/main/scala/org/apache/spark/examples/ml/RobustScalerExample.scala” 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 RobustScaler Java 文档

import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

RobustScaler scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75);

// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);

// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
在 Spark 仓库的 “examples/src/main/java/org/apache/spark/examples/ml/JavaRobustScalerExample.java” 中查找完整的示例代码。

MinMaxScaler

MinMaxScaler 转换 Vector 行的数据集,将每个特征重新缩放到特定范围(通常为 [0, 1])。它采用参数

MinMaxScaler 计算数据集的摘要统计信息并生成一个 MinMaxScalerModel。然后,该模型可以单独转换每个特征,使其位于给定的范围内。

特征 E 的重新缩放值计算如下: \begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation} 对于 $E_{max} == E_{min}$ 的情况, $Rescaled(e_i) = 0.5 * (max + min)$

请注意,由于零值可能会转换为非零值,因此即使对于稀疏输入,转换器的输出也将是 DenseVector

示例

以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征重新缩放到 [0, 1]。

有关 API 的更多详细信息,请参阅 MinMaxScaler Python 文档MinMaxScalerModel Python 文档

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
在 Spark 仓库的 "examples/src/main/python/ml/min_max_scaler_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 MinMaxScaler Scala 文档MinMaxScalerModel Scala 文档

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -1.0)),
  (1, Vectors.dense(2.0, 1.1, 1.0)),
  (2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 MinMaxScaler Java 文档MinMaxScalerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
    RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
    + scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaMinMaxScalerExample.java" 中查找完整的示例代码。

MaxAbsScaler

MaxAbsScaler 转换 Vector 行的数据集,通过将每个特征除以每个特征中的最大绝对值,将每个特征重新缩放到范围 [-1, 1]。 它不会移动/居中数据,因此不会破坏任何稀疏性。

MaxAbsScaler 计算数据集上的摘要统计信息,并生成 MaxAbsScalerModel。 然后,该模型可以单独转换每个特征到范围 [-1, 1]。

示例

以下示例演示了如何加载 libsvm 格式的数据集,然后将每个特征重新缩放到 [-1, 1]。

有关 API 的更多详细信息,请参阅 MaxAbsScaler Python 文档MaxAbsScalerModel Python 文档

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()
在 Spark 仓库的 "examples/src/main/python/ml/max_abs_scaler_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 MaxAbsScaler Scala 文档MaxAbsScalerModel Scala 文档

import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -8.0)),
  (1, Vectors.dense(2.0, 1.0, -4.0)),
  (2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 MaxAbsScaler Java 文档MaxAbsScalerModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaMaxAbsScalerExample.java" 中查找完整的示例代码。

Bucketizer

Bucketizer 将连续特征的列转换为特征桶的列,其中桶由用户指定。 它接受一个参数

请注意,如果您不知道目标列的上限和下限,则应添加 Double.NegativeInfinityDouble.PositiveInfinity 作为拆分的边界,以防止潜在的 Bucketizer 范围外异常。

另请注意,您提供的拆分必须严格按升序排列,即 s0 < s1 < s2 < ... < sn

有关更多详细信息,请参阅 Bucketizer 的 API 文档。

示例

以下示例演示了如何将 Doubles 的列分桶到另一个索引式列中。

有关 API 的更多详细信息,请参阅 Bucketizer Python 文档

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
在 Spark 仓库的 "examples/src/main/python/ml/bucketizer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Bucketizer Scala 文档

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()

val splitsArray = Array(
  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
  Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))

val data2 = Array(
  (-999.9, -999.9),
  (-0.5, -0.2),
  (-0.3, -0.1),
  (0.0, 0.0),
  (0.2, 0.4),
  (999.9, 999.9))
val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")

val bucketizer2 = new Bucketizer()
  .setInputCols(Array("features1", "features2"))
  .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
  .setSplitsArray(splitsArray)

// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)

println(s"Bucketizer output with [" +
  s"${bucketizer2.getSplitsArray(0).length-1}, " +
  s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Bucketizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-999.9),
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2),
  RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);

System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();

// Bucketize multiple columns at one pass.
double[][] splitsArray = {
  {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
  {Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};

List<Row> data2 = Arrays.asList(
  RowFactory.create(-999.9, -999.9),
  RowFactory.create(-0.5, -0.2),
  RowFactory.create(-0.3, -0.1),
  RowFactory.create(0.0, 0.0),
  RowFactory.create(0.2, 0.4),
  RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
  new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);

Bucketizer bucketizer2 = new Bucketizer()
  .setInputCols(new String[] {"features1", "features2"})
  .setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
  .setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);

System.out.println("Bucketizer output with [" +
  (bucketizer2.getSplitsArray()[0].length-1) + ", " +
  (bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java" 中查找完整的示例代码。

ElementwiseProduct

ElementwiseProduct 使用元素级乘法将每个输入向量乘以提供的“权重”向量。 换句话说,它通过标量乘数缩放数据集的每一列。 这表示输入向量 v 和变换向量 w 之间的 Hadamard 乘积,以产生结果向量。

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

示例

下面的示例演示了如何使用变换向量值来转换向量。

有关 API 的更多详细信息,请参阅 ElementwiseProduct Python 文档

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
在 Spark 仓库的 "examples/src/main/python/ml/elementwise_product_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 ElementwiseProduct Scala 文档

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 ElementwiseProduct Java 文档

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaElementwiseProductExample.java" 中查找完整的示例代码。

SQLTransformer

SQLTransformer 实现由 SQL 语句定义的转换。 目前,我们仅支持类似 "SELECT ... FROM __THIS__ ..." 的 SQL 语法,其中 "__THIS__" 表示输入数据集的基础表。 select 子句指定要在输出中显示的字段、常量和表达式,并且可以是 Spark SQL 支持的任何 select 子句。 用户还可以使用 Spark SQL 内置函数和 UDF 来操作这些选定的列。 例如,SQLTransformer 支持如下语句:

示例

假设我们有以下具有列 idv1v2 的 DataFrame

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

这是 SQLTransformer 的输出,语句为 "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0

有关 API 的更多详细信息,请参阅 SQLTransformer Python 文档

from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
在 Spark 仓库的 "examples/src/main/python/ml/sql_transformer.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 SQLTransformer Scala 文档

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 SQLTransformer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java" 中查找完整的示例代码。

VectorAssembler

VectorAssembler 是一个转换器,它将给定的列列表组合成一个向量列。 它对于将原始特征和由不同的特征转换器生成的特征组合成单个特征向量非常有用,以便训练 ML 模型,例如逻辑回归和决策树。 VectorAssembler 接受以下输入列类型:所有数值类型、布尔类型和向量类型。 在每一行中,输入列的值将按指定的顺序连接成一个向量。

示例

假设我们有一个 DataFrame,其中包含列 idhourmobileuserFeaturesclicked

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeatures 是一个向量列,包含三个用户特征。 我们想将 hourmobileuserFeatures 组合成一个名为 features 的单个特征向量,并使用它来预测 clicked 与否。 如果我们将 VectorAssembler 的输入列设置为 hourmobileuserFeatures,并将输出列设置为 features,那么在转换后,我们应该得到以下 DataFrame

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

有关 API 的更多详细信息,请参阅 VectorAssembler Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
在 Spark 仓库的 "examples/src/main/python/ml/vector_assembler_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorAssembler Scala 文档

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorAssembler Java 文档

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorAssemblerExample.java" 中查找完整的示例代码。

VectorSizeHint

有时,显式指定 VectorType 列的向量大小可能很有用。 例如,VectorAssembler 使用其输入列的大小信息来生成其输出列的大小信息和元数据。 虽然在某些情况下,可以通过检查列的内容来获取此信息,但在流式 dataframe 中,在流启动之前无法获得内容。 VectorSizeHint 允许用户显式指定列的向量大小,以便 VectorAssembler 或其他可能需要知道向量大小的转换器可以使用该列作为输入。

要使用 VectorSizeHint,用户必须设置 inputColsize 参数。 将此转换器应用于 dataframe 会生成一个新的 dataframe,其中更新了 inputCol 的元数据,指定了向量大小。 结果 dataframe 上的下游操作可以使用元数据获取此大小。

VectorSizeHint 还可以接受可选的 handleInvalid 参数,该参数控制向量列包含空值或大小错误的向量时的行为。 默认情况下,handleInvalid 设置为“error”,表示应抛出异常。 此参数也可以设置为“skip”,表示应从结果 dataframe 中过滤掉包含无效值的行,或者设置为“optimistic”,表示不应检查该列的无效值,并且应保留所有行。 请注意,使用“optimistic”可能会导致结果 dataframe 处于不一致的状态,这意味着应用于 VectorSizeHint 的列的元数据与该列的内容不匹配。 用户应注意避免这种不一致的状态。

有关 API 的更多详细信息,请参阅 VectorSizeHint Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

sizeHint = VectorSizeHint(
    inputCol="userFeatures",
    handleInvalid="skip",
    size=3)

datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
请在 Spark 仓库的 "examples/src/main/python/ml/vector_size_hint_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSizeHint Scala 文档

import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
    (0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
    (0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3)

val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
请在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/VectorSizeHintExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSizeHint Java 文档

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);

VectorSizeHint sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3);

Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
请在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSizeHintExample.java" 中查找完整的示例代码。

QuantileDiscretizer

QuantileDiscretizer 接受具有连续特征的列,并输出具有分箱分类特征的列。箱的数量由 numBuckets 参数设置。使用的桶的数量可能小于此值,例如,如果输入的非重复值太少,无法创建足够多的非重复分位数。

NaN 值:在 QuantileDiscretizer 拟合期间,NaN 值将从列中删除。 这将生成一个 Bucketizer 模型用于进行预测。 在转换期间,当 Bucketizer 在数据集中找到 NaN 值时会引发错误,但用户也可以通过设置 handleInvalid 来选择保留或删除数据集中的 NaN 值。 如果用户选择保留 NaN 值,它们将被特殊处理并放入自己的桶中,例如,如果使用 4 个桶,则非 NaN 数据将被放入桶 [0-3] 中,但 NaN 将在特殊桶 [4] 中计数。

算法:使用近似算法选择箱的范围(有关详细描述,请参阅 approxQuantile 的文档)。可以使用 relativeError 参数控制近似的精度。当设置为零时,将计算精确分位数(注意:计算精确分位数是一项昂贵的操作)。较低和较高的箱边界将为 -Infinity+Infinity,涵盖所有实数值。

示例

假设我们有一个 DataFrame,其中包含列 idhour

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hour 是一个具有 Double 类型的连续特征。我们想把连续特征变成一个分类特征。给定 numBuckets = 3,我们应该得到下面的 DataFrame

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Python 文档

from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()
请在 Spark 仓库的 "examples/src/main/python/ml/quantile_discretizer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Scala 文档

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show(false)
请在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 QuantileDiscretizer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 18.0),
  RowFactory.create(1, 19.0),
  RowFactory.create(2, 8.0),
  RowFactory.create(3, 5.0),
  RowFactory.create(4, 2.2)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
请在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java" 中查找完整的示例代码。

Imputer

Imputer 评估器使用缺失值所在列的平均值、中值或众数来完成数据集中的缺失值。 输入列应为数值类型。 目前,Imputer 不支持分类特征,并且可能会为包含分类特征的列创建不正确的值。 Imputer 可以通过 .setMissingValue(custom_value) 推算除 “NaN” 之外的自定义值。 例如,.setMissingValue(0) 将推算(0)的所有出现项。

注意,输入列中的所有 null 值都将被视为缺失值,因此也会被推算。

示例

假设我们有一个 DataFrame,其中包含列 ab

      a     |      b      
------------|-----------
     1.0    | Double.NaN
     2.0    | Double.NaN
 Double.NaN |     3.0   
     4.0    |     4.0   
     5.0    |     5.0   

在此示例中,Imputer 将把所有出现的 Double.NaN(缺失值的默认值)替换为从相应列中的其他值计算得出的平均值(默认的推算策略)。 在此示例中,列 ab 的代理值分别为 3.0 和 4.0。 转换后,输出列中的缺失值将替换为相关列的代理值。

      a     |      b     | out_a | out_b   
------------|------------|-------|-------
     1.0    | Double.NaN |  1.0  |  4.0 
     2.0    | Double.NaN |  2.0  |  4.0 
 Double.NaN |     3.0    |  3.0  |  3.0 
     4.0    |     4.0    |  4.0  |  4.0
     5.0    |     5.0    |  5.0  |  5.0 

有关 API 的更多详细信息,请参阅 Imputer Python 文档

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
请在 Spark 仓库的 "examples/src/main/python/ml/imputer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Imputer Scala 文档

import org.apache.spark.ml.feature.Imputer

val df = spark.createDataFrame(Seq(
  (1.0, Double.NaN),
  (2.0, Double.NaN),
  (Double.NaN, 3.0),
  (4.0, 4.0),
  (5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
  .setInputCols(Array("a", "b"))
  .setOutputCols(Array("out_a", "out_b"))

val model = imputer.fit(df)
model.transform(df).show()
请在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/ImputerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 Imputer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1.0, Double.NaN),
  RowFactory.create(2.0, Double.NaN),
  RowFactory.create(Double.NaN, 3.0),
  RowFactory.create(4.0, 4.0),
  RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
  createStructField("a", DoubleType, false),
  createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Imputer imputer = new Imputer()
  .setInputCols(new String[]{"a", "b"})
  .setOutputCols(new String[]{"out_a", "out_b"});

ImputerModel model = imputer.fit(df);
model.transform(df).show();
请在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaImputerExample.java" 中查找完整的示例代码。

特征选择器

VectorSlicer

VectorSlicer 是一个转换器,它接受一个特征向量并输出一个包含原始特征子数组的新特征向量。 它对于从向量列中提取特征很有用。

VectorSlicer 接受具有指定索引的向量列,然后输出一个新的向量列,其值通过这些索引选择。 有两种类型的索引,

  1. 表示向量索引的整数索引,setIndices()

  2. 表示向量中特征名称的字符串索引,setNames()这需要向量列具有 AttributeGroup,因为该实现与 Attribute 的名称字段匹配。

整数和字符串的规范都可以接受。 而且,您可以同时使用整数索引和字符串名称。 必须至少选择一个特征。 不允许重复特征,因此所选索引和名称之间不能有重叠。 请注意,如果选择了特征名称,则如果遇到空的输入属性,将引发异常。

输出向量将首先按所选索引(按给定的顺序)对特征进行排序,然后按所选名称(按给定的顺序)进行排序。

示例

假设我们有一个 DataFrame,其中包含列 userFeatures

 userFeatures
------------------
 [0.0, 10.0, 0.5]

userFeatures 是一个向量列,其中包含三个用户特征。 假设 userFeatures 的第一列全部为零,因此我们要删除它,只选择最后两列。 VectorSlicer 使用 setIndices(1, 2) 选择最后两个元素,然后生成一个名为 features 的新向量列

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]

还假设我们有 userFeatures 的潜在输入属性,即 ["f1", "f2", "f3"],那么我们可以使用 setNames("f2", "f3") 来选择它们。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

有关 API 的更多详细信息,请参阅 VectorSlicer Python 文档

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
请在 Spark 仓库的 "examples/src/main/python/ml/vector_slicer_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSlicer Scala 文档

import java.util.Arrays

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

val data = Arrays.asList(
  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  Row(Vectors.dense(-2.0, 2.3, 0.0))
)

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
output.show(false)
请在 Spark 仓库的 "examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VectorSlicer Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

Attribute[] attrs = {
  NumericAttribute.defaultAttr().withName("f1"),
  NumericAttribute.defaultAttr().withName("f2"),
  NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
  RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);

Dataset<Row> dataset =
  spark.createDataFrame(data, (new StructType()).add(group.toStructField()));

VectorSlicer vectorSlicer = new VectorSlicer()
  .setInputCol("userFeatures").setOutputCol("features");

vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})

Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
请在 Spark 仓库的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java" 中查找完整的示例代码。

RFormula

RFormula 选择由 R 模型公式 指定的列。 目前,我们支持 R 运算符的一个有限子集,包括 “~”、“.”、“:”、“+” 和 “-”。 基本运算符为

假设 ab 是 double 列,我们使用以下简单示例来说明 RFormula 的效果

RFormula 生成一个特征向量列和一个 double 或 string 类型的标签列。 就像在 R 中将公式用于线性回归一样,数值列将被强制转换为 double 类型。 至于字符串输入列,它们将首先使用由 stringOrderType 确定的排序,通过 StringIndexer 进行转换,然后删除排序后的最后一个类别,然后对 double 类型进行 one-hot 编码。

假设一个字符串特征列包含值 {'b', 'a', 'b', 'a', 'c', 'b'},我们设置 stringOrderType 来控制编码

stringOrderType | Category mapped to 0 by StringIndexer |  Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b')          | least frequent category ('c')
'frequencyAsc'  | least frequent category ('c')         | most frequent category ('b')
'alphabetDesc'  | last alphabetical category ('c')      | first alphabetical category ('a')
'alphabetAsc'   | first alphabetical category ('a')     | last alphabetical category ('c')

如果标签列的类型为字符串,则将首先使用 StringIndexer 使用 frequencyDesc 排序将其转换为 double 类型。 如果 DataFrame 中不存在标签列,则将从公式中指定的响应变量创建输出标签列。

注意:排序选项 stringOrderType 不用于标签列。 当索引标签列时,它使用 StringIndexer 中的默认降序频率排序。

示例

假设我们有一个 DataFrame,其中包含列 idcountryhourclicked

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

如果我们使用 RFormula,其公式字符串为 clicked ~ country + hour,这表示我们想基于 countryhour 预测 clicked,则转换后我们应该得到以下 DataFrame

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

有关 API 的更多详细信息,请参阅 RFormula Python 文档

from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
在 Spark repo 的 "examples/src/main/python/ml/rformula_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 RFormula Scala 文档

import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")

val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 RFormula Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("country", StringType, false),
  createStructField("hour", IntegerType, false),
  createStructField("clicked", DoubleType, false)
});

List<Row> data = Arrays.asList(
  RowFactory.create(7, "US", 18, 1.0),
  RowFactory.create(8, "CA", 12, 0.0),
  RowFactory.create(9, "NZ", 15, 0.0)
);

Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
在 Spark repo 的 "examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java" 中查找完整的示例代码。

ChiSqSelector

ChiSqSelector 代表卡方特征选择。它对带有分类特征的标记数据进行操作。 ChiSqSelector 使用卡方独立性检验来决定选择哪些特征。它支持五种选择方法:numTopFeaturespercentilefprfdrfwe

示例

假设我们有一个 DataFrame,其中包含列 idfeaturesclicked,它被用作我们要预测的目标

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0

如果我们使用 ChiSqSelectornumTopFeatures = 1,那么根据我们的标签 clicked,我们的 features 中的最后一列将被选择为最有用的特征

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

有关 API 的更多详细信息,请参阅 ChiSqSelector Python 文档

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
在 Spark repo 的 "examples/src/main/python/ml/chisq_selector_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 ChiSqSelector Scala 文档

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 ChiSqSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

ChiSqSelector selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
    + " features selected");
result.show();
在 Spark repo 的 "examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java" 中查找完整的示例代码。

UnivariateFeatureSelector

UnivariateFeatureSelector 对带有分类/连续特征的分类/连续标签进行操作。 用户可以设置 featureTypelabelType,Spark 将根据指定的 featureTypelabelType 选择要使用的评分函数。

featureType |  labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous  |categorical | ANOVATest (f_classif)
continuous  |continuous  | F-value (f_regression)

它支持五种选择模式:numTopFeaturespercentilefprfdrfwe

默认情况下,选择模式为 numTopFeatures,并且默认的 selectionThreshold 设置为 50。

示例

假设我们有一个 DataFrame,其中包含列 idfeatureslabel,它被用作我们要预测的目标

id | features                       | label
---|--------------------------------|---------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0

如果我们设置 featureTypecontinuous,设置 labelTypecategorical,并且设置 numTopFeatures = 1,则我们的 features 中的最后一列将被选择为最有用的特征

id | features                       | label   | selectedFeatures
---|--------------------------------|---------|------------------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0     | [2.3]
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0     | [4.1]
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0     | [2.5]
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0     | [3.8]
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0     | [3.0]
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0     | [2.1]

有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Python 文档

from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
    (2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
    (3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
    (4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
    (5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
    (6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)

result = selector.fit(df).transform(df)

print("UnivariateFeatureSelector output with top %d features selected using f_classif"
      % selector.getSelectionThreshold())
result.show()
在 Spark repo 的 "examples/src/main/python/ml/univariate_feature_selector_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Scala 文档

import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  (2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  (3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  (4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  (5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  (6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)

val df = spark.createDataset(data).toDF("id", "features", "label")

val selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
  s" features selected using f_classif")
result.show()
在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/ml/UnivariateFeatureSelectorExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("UnivariateFeatureSelector output with top "
    + selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
在 Spark repo 的 "examples/src/main/java/org/apache/spark/examples/ml/JavaUnivariateFeatureSelectorExample.java" 中查找完整的示例代码。

VarianceThresholdSelector

VarianceThresholdSelector 是一个用于删除低方差特征的选择器。 方差(样本)不大于 varianceThreshold 的特征将被删除。 如果未设置,则 varianceThreshold 默认为 0,这意味着仅删除方差为 0 的特征(即,在所有样本中具有相同值的特征)。

示例

假设我们有一个 DataFrame,其中包含列 idfeatures,它被用作我们要预测的目标

id | features
---|--------------------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]

这 6 个特征的样本方差分别为 16.67、0.67、8.17、10.17、5.07 和 11.47。 如果我们使用 VarianceThresholdSelectorvarianceThreshold = 8.0,则方差 <= 8.0 的特征将被删除

id | features                       | selectedFeatures
---|--------------------------------|-------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]

有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Python 文档

from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
    (2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
    (3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
    (4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
    (5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
    (6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])

selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")

result = selector.fit(df).transform(df)

print("Output: Features with variance lower than %f are removed." %
      selector.getVarianceThreshold())
result.show()
在 Spark repo 的 "examples/src/main/python/ml/variance_threshold_selector_example.py" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Scala 文档

import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  (2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  (3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  (4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  (5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  (6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)

val df = spark.createDataset(data).toDF("id", "features")

val selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"Output: Features with variance lower than" +
  s" ${selector.getVarianceThreshold} are removed.")
result.show()
在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/ml/VarianceThresholdSelectorExample.scala" 中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VarianceThresholdSelector selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("Output: Features with variance lower than "
    + selector.getVarianceThreshold() + " are removed.");
result.show();
在 Spark repo 的 "examples/src/main/java/org/apache/spark/examples/ml/JavaVarianceThresholdSelectorExample.java" 中查找完整的示例代码。

局部敏感哈希

局部敏感哈希 (LSH) 是一种重要的哈希技术,通常用于大数据集的聚类、近似最近邻搜索和异常值检测。

LSH 的总体思路是使用一系列函数(“LSH 族”)将数据点哈希到桶中,以便彼此接近的数据点以高概率位于同一桶中,而彼此远离的数据点很可能位于不同的桶中。 LSH 族正式定义如下。

在度量空间 (M, d) 中,其中 M 是一个集合,dM 上的距离函数,LSH 族是一系列满足以下属性的函数 h\[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \] 此 LSH 族称为 (r1, r2, p1, p2)-敏感。

在 Spark 中,不同的 LSH 族在单独的类中实现(例如,MinHash),并且在每个类中都提供了特征转换、近似相似性连接和近似最近邻的 API。

在 LSH 中,我们将假阳性定义为哈希到同一桶中的一对遥远的输入特征(其中 $d(p,q) \geq r2$),我们将假阴性定义为哈希到不同桶中的一对附近的特征(其中 $d(p,q) \leq r1$)。

LSH 操作

我们描述了 LSH 可以用于的主要操作类型。 拟合的 LSH 模型具有每种操作的方法。

特征转换

特征变换是将哈希值添加为新列的基本功能。 这对于降维非常有用。 用户可以通过设置 inputColoutputCol 来指定输入和输出列名。

LSH 还支持多个 LSH 哈希表。 用户可以通过设置 numHashTables 来指定哈希表的数量。 这也用于近似相似性连接和近似最近邻中的OR-放大。 增加哈希表的数量会提高准确性,但也会增加通信成本和运行时间。

outputCol 的类型是 Seq[Vector],其中数组的维度等于 numHashTables,并且向量的维度当前设置为 1。 在未来的版本中,我们将实现 AND 放大,以便用户可以指定这些向量的维度。

近似相似度连接

近似相似度连接接受两个数据集,并近似返回数据集中距离小于用户定义阈值的行对。 近似相似度连接支持连接两个不同的数据集和自连接。 自连接会产生一些重复的对。

近似相似度连接接受转换和未转换的数据集作为输入。 如果使用未转换的数据集,它将自动转换。 在这种情况下,哈希签名将创建为 outputCol

在连接的数据集中,原始数据集可以在 datasetAdatasetB 中查询。 将向输出数据集添加一个距离列,以显示返回的每对行之间的真实距离。

近似最近邻搜索接受一个数据集(特征向量)和一个键(单个特征向量),并近似返回数据集中与该向量最接近的指定数量的行。

近似最近邻搜索接受转换和未转换的数据集作为输入。 如果使用未转换的数据集,它将自动转换。 在这种情况下,哈希签名将创建为 outputCol

将向输出数据集添加一个距离列,以显示每个输出行与搜索键之间的真实距离。

注意: 当哈希桶中没有足够的候选对象时,近似最近邻搜索将返回少于 k 行。

LSH 算法

用于欧几里得距离的桶式随机投影

分桶随机投影是用于欧几里得距离的 LSH 族。 欧几里得距离定义如下: \[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \] 其 LSH 族将特征向量 $\mathbf{x}$ 投影到随机单位向量 $\mathbf{v}$ 上,并将投影结果分成哈希桶: \[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \] 其中 r 是用户定义的分桶长度。 分桶长度可用于控制哈希桶的平均大小(从而控制桶的数量)。 较大的分桶长度(即,较少的桶)会增加将特征散列到同一桶的概率(从而增加真阳性和假阳性的数量)。

分桶随机投影接受任意向量作为输入特征,并支持稀疏向量和稠密向量。

有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Python 文档

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库中的“examples/src/main/python/ml/bucketed_random_projection_lsh_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Scala 文档

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = brp.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.dense(1.0, 1.0)),
  RowFactory.create(1, Vectors.dense(1.0, -1.0)),
  RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
  RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);

List<Row> dataB = Arrays.asList(
    RowFactory.create(4, Vectors.dense(1.0, 0.0)),
    RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
    RowFactory.create(6, Vectors.dense(0.0, 1.0)),
    RowFactory.create(7, Vectors.dense(0.0, -1.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

Vector key = Vectors.dense(1.0, 0.0);

BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes");

BucketedRandomProjectionLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
在 Spark 存储库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java”中查找完整的示例代码。

用于 Jaccard 距离的 MinHash

MinHash 是 Jaccard 距离的 LSH 族,其中输入特征是自然数集。 两个集合的 Jaccard 距离由它们的交集和并集的基数定义: \[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \] MinHash 将随机哈希函数 g 应用于集合中的每个元素,并取所有哈希值的最小值: \[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]

MinHash 的输入集表示为二进制向量,其中向量索引表示元素本身,向量中的非零值表示该元素存在于集合中。 虽然支持稠密向量和稀疏向量,但通常建议使用稀疏向量以提高效率。 例如,Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)]) 表示空间中有 10 个元素。 此集合包含元素 2、元素 3 和元素 5。 所有非零值都视为二进制“1”值。

注意: 空集不能由 MinHash 转换,这意味着任何输入向量必须至少有 1 个非零条目。

有关 API 的更多详细信息,请参阅 MinHashLSH Python 文档

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库中的“examples/src/main/python/ml/min_hash_lsh_example.py”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 MinHashLSH Scala 文档

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = mh.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
在 Spark 存储库中的“examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala”中查找完整的示例代码。

有关 API 的更多详细信息,请参阅 MinHashLSH Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

List<Row> dataB = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);

MinHashLSH mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes");

MinHashLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
在 Spark 存储库中的“examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java”中查找完整的示例代码。