特征提取、转换和选择
本节涵盖了处理特征的算法,大致分为以下几组:
- 提取:从“原始”数据中提取特征
- 转换:缩放、转换或修改特征
- 选择:从较大的特征集中选择一个子集
- 局部敏感哈希 (LSH):此类算法将特征转换的各个方面与其他算法结合起来。
目录
特征提取器
TF-IDF
词频-逆文档频率(TF-IDF)是一种广泛应用于文本挖掘的特征向量化方法,用于反映语料库中一个词对于文档的重要性。将词表示为 $t$
,文档表示为 $d$
,语料库表示为 $D$
。词频 $TF(t, d)$
是词 $t$
在文档 $d$
中出现的次数,而文档频率 $DF(t, D)$
是包含词 $t$
的文档数量。如果仅使用词频来衡量重要性,则很容易过分强调那些出现频率很高但对文档信息量很少的词,例如“a”、“the”和“of”。如果一个词在整个语料库中出现频率很高,这意味着它对特定文档没有携带特殊信息。逆文档频率是一种衡量词提供信息量的数值:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
其中 $|D|$
是语料库中文档的总数。由于使用了对数,如果一个词出现在所有文档中,其 IDF 值将变为 0。请注意,为了避免语料库外词项的除以零错误,这里应用了一个平滑项。TF-IDF 衡量方法就是 TF 和 IDF 的乘积:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
词频和文档频率的定义有多种变体。在 MLlib 中,我们将 TF 和 IDF 分开,以使其更加灵活。
TF:HashingTF
和 CountVectorizer
都可以用来生成词频向量。
HashingTF
是一个 Transformer
,它接受一组词项并将其转换为固定长度的特征向量。在文本处理中,“一组词项”可能是一个词袋。HashingTF
利用了哈希技巧。原始特征通过应用哈希函数映射到一个索引(词项)。这里使用的哈希函数是 MurmurHash 3。然后根据映射的索引计算词频。这种方法避免了计算全局词项到索引映射的需求,这对于大型语料库来说可能开销很大,但它会受到潜在哈希冲突的影响,即不同的原始特征在哈希后可能变成相同的词项。为了减少冲突的可能性,我们可以增加目标特征维度,即哈希表的桶数量。由于使用哈希值的简单模运算来确定向量索引,建议将特征维度设置为2的幂,否则特征将不会均匀映射到向量索引。默认特征维度是 $2^{18} = 262,144$
。一个可选的二进制开关参数控制词频计数。当设置为 true 时,所有非零频率计数都设置为 1。这对于建模二进制而非整数计数的离散概率模型特别有用。
CountVectorizer
将文本文档转换为词计数向量。有关更多详细信息,请参阅CountVectorizer。
IDF:IDF
是一个 Estimator
,它在一个数据集上进行拟合并生成一个 IDFModel
。IDFModel
接收特征向量(通常由 HashingTF
或 CountVectorizer
创建)并缩放每个特征。直观地说,它会降低在语料库中频繁出现的特征的权重。
注意:spark.ml
不提供文本分段工具。我们建议用户参考斯坦福 NLP 小组和scalanlp/chalk。
示例
在下面的代码片段中,我们从一组句子开始。我们使用 Tokenizer
将每个句子分割成单词。对于每个句子(词袋),我们使用 HashingTF
将句子哈希为一个特征向量。我们使用 IDF
重新缩放特征向量;这通常在使用文本作为特征时提高性能。然后,我们的特征向量可以传递给学习算法。
有关 API 的更多详细信息,请参阅 HashingTF Python 文档和 IDF Python 文档。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
有关 API 的更多详细信息,请参阅 HashingTF Scala 文档和 IDF Scala 文档。
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val sentenceData = spark.createDataFrame(Seq(
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
有关 API 的更多详细信息,请参阅 HashingTF Java 文档和 IDF Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, "Hi I heard about Spark"),
RowFactory.create(0.0, "I wish Java could use case classes"),
RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(numFeatures);
Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
Word2Vec
Word2Vec
是一个 Estimator
,它接收表示文档的词序列,并训练一个 Word2VecModel
。该模型将每个词映射到一个唯一的固定大小向量。Word2VecModel
使用文档中所有词的平均值将每个文档转换为一个向量;这个向量可以作为预测、文档相似度计算等任务的特征。请参阅MLlib Word2Vec 用户指南了解更多详细信息。
示例
在下面的代码片段中,我们从一组文档开始,每个文档都表示为一个词序列。对于每个文档,我们将其转换为一个特征向量。然后,这个特征向量可以传递给学习算法。
有关 API 的更多详细信息,请参阅 Word2Vec Python 文档。
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
有关 API 的更多详细信息,请参阅 Word2Vec Scala 文档。
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
有关 API 的更多详细信息,请参阅 Word2Vec Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);
// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);
for (Row row : result.collectAsList()) {
List<String> text = row.getList(0);
Vector vector = (Vector) row.get(1);
System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
CountVectorizer
CountVectorizer
和 CountVectorizerModel
旨在帮助将文本文档集合转换为词元计数向量。当没有先验词典时,CountVectorizer
可以用作一个 Estimator
来提取词汇表,并生成一个 CountVectorizerModel
。该模型为文档生成基于词汇表的稀疏表示,然后可以将其传递给其他算法,如 LDA。
在拟合过程中,CountVectorizer
将选择按语料库词频排序的前 vocabSize
个词。可选参数 minDF
也通过指定词汇表中包含的词必须出现在的最少文档数量(如果小于 1.0,则为分数)来影响拟合过程。另一个可选的二进制开关参数控制输出向量。如果设置为 true,所有非零计数都设置为 1。这对于建模二进制而非整数计数的离散概率模型特别有用。
示例
假设我们有以下包含 id
和 texts
列的 DataFrame
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
texts
中的每一行都是一个 Array[String] 类型的文档。调用 CountVectorizer
的 fit 方法会生成一个包含词汇表 (a, b, c) 的 CountVectorizerModel
。然后转换后的输出列“vector”包含
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
每个向量表示文档中词元在词汇表上的计数。
有关 API 的更多详细信息,请参阅 CountVectorizer Python 文档和 CountVectorizerModel Python 文档。
from pyspark.ml.feature import CountVectorizer
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
有关 API 的更多详细信息,请参阅 CountVectorizer Scala 文档和 CountVectorizerModel Scala 文档。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).show(false)
有关 API 的更多详细信息,请参阅 CountVectorizer Java 文档和 CountVectorizerModel Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("a", "b", "c")),
RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
.setInputCol("text")
.setOutputCol("feature")
.setVocabSize(3)
.setMinDF(2)
.fit(df);
// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
.setInputCol("text")
.setOutputCol("feature");
cvModel.transform(df).show(false);
FeatureHasher
特征哈希将一组类别或数值特征投影到指定维度的特征向量中(通常远小于原始特征空间的维度)。这是通过使用哈希技巧将特征映射到特征向量中的索引来完成的。
FeatureHasher
转换器对多个列进行操作。每个列可以包含数值或类别特征。列数据类型的行为和处理方式如下:
- 数值列:对于数值特征,列名的哈希值用于将特征值映射到特征向量中的索引。默认情况下,数值特征不被视为类别特征(即使它们是整数)。要将它们视为类别特征,请使用
categoricalCols
参数指定相关列。 - 字符串列:对于类别特征,字符串“column_name=value”的哈希值用于映射到向量索引,指示值为
1.0
。因此,类别特征是“独热”编码的(类似于使用 OneHotEncoder 并设置dropLast=false
)。 - 布尔列:布尔值与字符串列的处理方式相同。也就是说,布尔特征表示为“column_name=true”或“column_name=false”,指示值为
1.0
。
空值(缺失值)被忽略(在结果特征向量中隐式为零)。
这里使用的哈希函数也是 HashingTF 中使用的 MurmurHash 3。由于使用哈希值的简单模运算来确定向量索引,建议将 numFeatures 参数设置为2的幂;否则特征将不会均匀映射到向量索引。
示例
假设我们有一个包含 4 个输入列 real
、bool
、stringNum
和 string
的 DataFrame。这些不同的数据类型作为输入将说明转换生成特征向量列的行为。
real| bool|stringNum|string
----|-----|---------|------
2.2| true| 1| foo
3.3|false| 2| bar
4.4|false| 3| baz
5.5|false| 4| foo
那么 FeatureHasher.transform
在此 DataFrame 上的输出是
real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1 |foo |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2 |bar |(262144,[6031, 80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3 |baz |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4 |foo |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])
生成的特征向量随后可以传递给学习算法。
有关 API 的更多详细信息,请参阅 FeatureHasher Python 文档。
from pyspark.ml.feature import FeatureHasher
dataset = spark.createDataFrame([
(2.2, True, "1", "foo"),
(3.3, False, "2", "bar"),
(4.4, False, "3", "baz"),
(5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)
有关 API 的更多详细信息,请参阅 FeatureHasher Scala 文档。
import org.apache.spark.ml.feature.FeatureHasher
val dataset = spark.createDataFrame(Seq(
(2.2, true, "1", "foo"),
(3.3, false, "2", "bar"),
(4.4, false, "3", "baz"),
(5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")
val hasher = new FeatureHasher()
.setInputCols("real", "bool", "stringNum", "string")
.setOutputCol("features")
val featurized = hasher.transform(dataset)
featurized.show(false)
有关 API 的更多详细信息,请参阅 FeatureHasher Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(2.2, true, "1", "foo"),
RowFactory.create(3.3, false, "2", "bar"),
RowFactory.create(4.4, false, "3", "baz"),
RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
FeatureHasher hasher = new FeatureHasher()
.setInputCols(new String[]{"real", "bool", "stringNum", "string"})
.setOutputCol("features");
Dataset<Row> featurized = hasher.transform(dataset);
featurized.show(false);
特征转换器
分词器
分词是将文本(例如句子)分解为单个词项(通常是单词)的过程。一个简单的 Tokenizer 类提供了此功能。下面的示例演示了如何将句子分割成单词序列。
RegexTokenizer 允许基于正则表达式 (regex) 匹配进行更高级的分词。默认情况下,参数“pattern”(正则表达式,默认值:"\\s+"
)用作分隔符来分割输入文本。另外,用户可以将参数“gaps”设置为 false,表示正则表达式“pattern”表示“词元”而不是分隔符,并将所有匹配的出现作为分词结果。
示例
有关 API 的更多详细信息,请参阅 Tokenizer Python 文档和 RegexTokenizer Python 文档。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
有关 API 的更多详细信息,请参阅 Tokenizer Scala 文档和 RegexTokenizer Scala 文档。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)
val countTokens = udf { (words: Seq[String]) => words.length }
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
有关 API 的更多详细信息,请参阅 Tokenizer Java 文档和 RegexTokenizer Java 文档。
import java.util.Arrays;
import java.util.List;
import scala.collection.mutable.Seq;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;
List<Row> data = Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
spark.udf().register(
"countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
停用词移除器
停用词是应该从输入中排除的词,通常因为这些词出现频繁且意义不大。
StopWordsRemover
接受字符串序列作为输入(例如 Tokenizer 的输出),并从输入序列中删除所有停用词。停用词列表由 stopWords
参数指定。通过调用 StopWordsRemover.loadDefaultStopWords(language)
可以获取某些语言的默认停用词,可用选项包括“danish”、“dutch”、“english”、“finnish”、“french”、“german”、“hungarian”、“italian”、“norwegian”、“portuguese”、“russian”、“spanish”、“swedish”和“turkish”。布尔参数 caseSensitive
指示匹配是否应区分大小写(默认为 false)。
示例
假设我们有以下包含 id
和 raw
列的 DataFrame
id | raw
----|----------
0 | [I, saw, the, red, balloon]
1 | [Mary, had, a, little, lamb]
将 StopWordsRemover
应用于以 raw
作为输入列和 filtered
作为输出列的 DataFrame,我们应该得到以下结果:
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, balloon] | [saw, red, balloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
在 filtered
中,停用词“I”、“the”、“had”和“a”已被过滤掉。
有关 API 的更多详细信息,请参阅 StopWordsRemover Python 文档。
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
有关 API 的更多详细信息,请参阅 StopWordsRemover Scala 文档。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = spark.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "balloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show(false)
有关 API 的更多详细信息,请参阅 StopWordsRemover Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered");
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);
StructType schema = new StructType(new StructField[]{
new StructField(
"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
$n$-gram
一个 n-gram 是由 $n$ 个词元(通常是单词)组成的序列,其中 $n$ 为某个整数。NGram
类可用于将输入特征转换为 $n$-gram。
NGram
接受字符串序列作为输入(例如 Tokenizer 的输出)。参数 n
用于确定每个 $n$-gram 中的词项数量。输出将包含一个 $n$-gram 序列,其中每个 $n$-gram 由 $n$ 个连续单词组成的空格分隔字符串表示。如果输入序列包含的字符串少于 n
个,则不产生任何输出。
示例
有关 API 的更多详细信息,请参阅 NGram Python 文档。
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
有关 API 的更多详细信息,请参阅 NGram Scala 文档。
import org.apache.spark.ml.feature.NGram
val wordDataFrame = spark.createDataFrame(Seq(
(0, Array("Hi", "I", "heard", "about", "Spark")),
(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
(2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")
val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
有关 API 的更多详细信息,请参阅 NGram Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");
Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
二值化器
二值化是将数值特征阈值化为二进制(0/1)特征的过程。
Binarizer
接受常用参数 inputCol
和 outputCol
,以及用于二值化的 threshold
。大于阈值的特征值被二值化为 1.0;等于或小于阈值的特征值被二值化为 0.0。inputCol
支持 Vector 和 Double 类型。
示例
有关 API 的更多详细信息,请参阅 Binarizer Python 文档。
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
有关 API 的更多详细信息,请参阅 Binarizer Scala 文档。
import org.apache.spark.ml.feature.Binarizer
val data = immutable.ArraySeq.unsafeWrapArray(Array((0, 0.1), (1, 0.8), (2, 0.2)))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
有关 API 的更多详细信息,请参阅 Binarizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 0.1),
RowFactory.create(1, 0.8),
RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);
Binarizer binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5);
Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);
System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
PCA
PCA 是一种统计过程,它使用正交变换将一组可能相关的变量的观测值转换为一组线性不相关的变量值,这些变量值称为主成分。PCA 类训练一个模型,使用 PCA 将向量投影到低维空间。下面的示例演示了如何将 5 维特征向量投影到 3 维主成分中。
示例
有关 API 的更多详细信息,请参阅 PCA Python 文档。
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
有关 API 的更多详细信息,请参阅 PCA Scala 文档。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply)))
.toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val result = pca.transform(df).select("pcaFeatures")
result.show(false)
有关 API 的更多详细信息,请参阅 PCA Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df);
Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
多项式展开
多项式展开是将特征扩展到多项式空间的过程,该空间由原始维度的 n 次组合形成。PolynomialExpansion 类提供了此功能。下面的示例演示了如何将特征扩展到 3 次多项式空间。
示例
有关 API 的更多详细信息,请参阅 PolynomialExpansion Python 文档。
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)
有关 API 的更多详细信息,请参阅 PolynomialExpansion Scala 文档。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.dense(2.0, 1.0),
Vectors.dense(0.0, 0.0),
Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply)))
.toDF("features")
val polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polyExpansion.transform(df)
polyDF.show(false)
有关 API 的更多详细信息,请参阅 PolynomialExpansion Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(2.0, 1.0)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
离散余弦变换 (DCT)
的离散余弦变换将时域中长度为 $N$ 的实值序列转换为频域中另一个长度为 $N$ 的实值序列。DCT 类提供了此功能,实现了DCT-II 并将结果按 $1/\sqrt{2}$ 缩放,使得变换的表示矩阵是酉矩阵。变换后的序列没有应用移位(例如,变换序列的第 $0$ 个元素是第 $0$ 个 DCT 系数,而不是第 $N/2$ 个)。
示例
有关 API 的更多详细信息,请参阅 DCT Python 文档。
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)
有关 API 的更多详细信息,请参阅 DCT Scala 文档。
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
有关 API 的更多详细信息,请参阅 DCT Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
DCT dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false);
Dataset<Row> dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(false);
字符串索引器
StringIndexer
将标签的字符串列编码为标签索引列。StringIndexer
可以编码多个列。索引范围是 [0, numLabels)
,并支持四种排序选项:“frequencyDesc”:按标签频率降序排列(最频繁的标签分配索引 0),“frequencyAsc”:按标签频率升序排列(最不频繁的标签分配索引 0),“alphabetDesc”:按字母降序排列,以及“alphabetAsc”:按字母升序排列(默认值 = “frequencyDesc”)。请注意,在“frequencyDesc”/“frequencyAsc”下,如果频率相等,则字符串会进一步按字母顺序排序。
如果用户选择保留未见过的标签,它们将被放置在索引 numLabels 处。如果输入列是数值类型,我们将其转换为字符串并对字符串值进行索引。当下游管道组件(如 Estimator
或 Transformer
)使用此字符串索引标签时,您必须将组件的输入列设置为此字符串索引列名。在许多情况下,您可以使用 setInputCol
设置输入列。
示例
假设我们有以下包含 id
和 category
列的 DataFrame
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category
是一个包含三个标签:“a”、“b”和“c”的字符串列。将 StringIndexer
应用于以 category
作为输入列和 categoryIndex
作为输出列的 DataFrame,我们应该得到以下结果:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
“a”获得索引 0
,因为它是最常见的,其次是“c”获得索引 1
,以及“b”获得索引 2
。
此外,当您在一个数据集上拟合了 StringIndexer
然后将其用于转换另一个数据集时,StringIndexer
处理未见过的标签有三种策略:
- 抛出异常(默认行为)
- 完全跳过包含未见过标签的行
- 将未见过的标签放入一个特殊的额外桶中,索引为 numLabels
示例
让我们回到之前的示例,但这次在以下数据集上重用我们之前定义的 StringIndexer
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
4 | e
如果您尚未设置 StringIndexer
如何处理未见过的标签,或者将其设置为“error”,则会抛出异常。但是,如果您调用了 setHandleInvalid("skip")
,则会生成以下数据集:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
请注意,包含“d”或“e”的行没有出现。
如果您调用 setHandleInvalid("keep")
,则会生成以下数据集:
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | d | 3.0
4 | e | 3.0
请注意,包含“d”或“e”的行被映射到索引“3.0”
有关 API 的更多详细信息,请参阅 StringIndexer Python 文档。
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
有关 API 的更多详细信息,请参阅 StringIndexer Scala 文档。
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()
有关 API 的更多详细信息,请参阅 StringIndexer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex");
Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
索引到字符串
与 StringIndexer
对称,IndexToString
将标签索引列映射回包含原始标签的字符串列。一个常见的用例是使用 StringIndexer
从标签生成索引,然后使用这些索引训练模型,再使用 IndexToString
从预测索引列中检索原始标签。但是,您可以自由提供自己的标签。
示例
基于 StringIndexer
示例,假设我们有以下包含 id
和 categoryIndex
列的 DataFrame
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
将 IndexToString
应用于以 categoryIndex
作为输入列和 originalCategory
作为输出列的 DataFrame,我们能够检索原始标签(它们将从列的元数据中推断出来)
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
有关 API 的更多详细信息,请参阅 IndexToString Python 文档。
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
有关 API 的更多详细信息,请参阅 IndexToString Scala 文档。
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
println(s"Transformed string column '${indexer.getInputCol}' " +
s"to indexed column '${indexer.getOutputCol}'")
indexed.show()
val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
s"${Attribute.fromStructField(inputColSchema).toString}\n")
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
有关 API 的更多详细信息,请参阅 IndexToString Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
Dataset<Row> indexed = indexer.transform(df);
System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
"to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();
StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
Attribute.fromStructField(inputColSchema).toString() + "\n");
IndexToString converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);
System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
"original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
独热编码器
独热编码将表示为标签索引的类别特征映射到一个二进制向量,该向量中至多有一个 1 值,表示所有特征值集合中某个特定特征值的存在。这种编码允许期望连续特征的算法(如逻辑回归)使用类别特征。对于字符串类型的输入数据,通常首先使用 StringIndexer 对类别特征进行编码。
OneHotEncoder
可以转换多个列,为每个输入列返回一个独热编码的输出向量列。通常会使用 VectorAssembler 将这些向量合并为单个特征向量。
OneHotEncoder
支持 handleInvalid
参数,用于选择在转换数据时如何处理无效输入。可用选项包括“keep”(任何无效输入都分配给一个额外的类别索引)和“error”(抛出错误)。
示例
有关 API 的更多详细信息,请参阅 OneHotEncoder Python 文档。
from pyspark.ml.feature import OneHotEncoder
df = spark.createDataFrame([
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
有关 API 的更多详细信息,请参阅 OneHotEncoder Scala 文档。
import org.apache.spark.ml.feature.OneHotEncoder
val df = spark.createDataFrame(Seq(
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)
val encoded = model.transform(df)
encoded.show()
有关 API 的更多详细信息,请参阅 OneHotEncoder Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, 1.0),
RowFactory.create(1.0, 0.0),
RowFactory.create(2.0, 1.0),
RowFactory.create(0.0, 2.0),
RowFactory.create(0.0, 1.0),
RowFactory.create(2.0, 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
OneHotEncoder encoder = new OneHotEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryVec1", "categoryVec2"});
OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
目标编码器
目标编码是一种数据预处理技术,它将高基数类别特征转换为准连续标量属性,适用于回归类型模型。这种范式将独立特征的单个值映射到一个标量,该标量表示因变量的某个估计值(意味着与目标变量具有相似统计特征的类别值将具有相似的表示)。
通过利用类别特征和目标变量之间的关系,目标编码通常比独热编码表现更好,并且不需要最终的二进制向量编码,从而降低了数据集的整体维度。
用户可以通过设置 inputCol
和 outputCol
来指定单列用例的输入和输出列名,或者通过设置 inputCols
和 outputCols
来指定多列用例的输入和输出列名(两个数组必须具有相同的大小)。这些列预计包含类别索引(正整数),缺失值(null)被视为一个单独的类别。数据类型必须是‘NumericType’的任何子类。对于字符串类型的输入数据,通常首先使用 StringIndexer 对类别特征进行编码。
用户可以通过设置 label
来指定目标列名。该列预计包含将从中导出编码的真实标签。在计算估计值时,不考虑缺少标签(null)的观测值。数据类型必须是‘NumericType’的任何子类。
TargetEncoder
支持 handleInvalid
参数,用于选择在编码新数据时如何处理无效输入,即训练时未见过的类别。可用选项包括“keep”(任何无效输入都分配给一个额外的类别索引)和“error”(抛出异常)。
TargetEncoder
支持 targetType
参数,用于选择拟合数据时的标签类型,这会影响估计值的计算方式。可用选项包括“binary”(二进制)和“continuous”(连续)。
当设置为“binary”时,目标属性 $Y$ 预期为二进制,$Y\in{ 0,1 }$。转换将单个值 $X_{i}$ 映射到给定 $X=X_{i}$ 时 $Y$ 的条件概率:$\;\; S_{i}=P(Y\mid X=X_{i})$。这种方法也称为 bin-counting。
当设置为“continuous”时,目标属性 $Y$ 预期为连续的,$Y\in\mathbb{Q}$。转换将单个值 $X_{i}$ 映射到给定 $X=X_{i}$ 时 $Y$ 的平均值:$\;\; S_{i}=E[Y\mid X=X_{i}]$。这种方法也称为均值编码。
TargetEncoder
支持 smoothing
参数,用于调整类别内统计量和总体统计量的融合方式。高基数类别特征通常在 $X$ 的所有可能值中分布不均匀。因此,仅根据类别内统计量计算编码 $S_{i}$ 会使这些估计非常不可靠,并且很少见的类别很可能在学习中导致过拟合。
平滑通过根据特定类别在整个数据集上的相对大小,将类别内估计值与总体估计值加权,从而防止了这种行为。
对于二进制情况:$\;\;\; S_{i}=\lambda(n_{i})\, P(Y\mid X=X_{i})+(1-\lambda(n_{i}))\, P(Y)$
对于连续情况:$\;\;\; S_{i}=\lambda(n_{i})\, E[Y\mid X=X_{i}]+(1-\lambda(n_{i}))\, E[Y]$
其中 $\lambda(n_{i})$ 是关于 $n_{i}$ 的单调递增函数,值域在 0 到 1 之间。
通常 $\lambda(n_{i})$ 作为参数函数 $\lambda(n_{i})=\frac{n_{i}}{n_{i}+m}$ 实现,其中 $m$ 是平滑因子,由 TargetEncoder
中的 smoothing
参数表示。
示例
基于 TargetEncoder
示例,假设我们有以下包含 feature
和 target
(二进制和连续)列的 DataFrame
feature | target | target
| (bin) | (cont)
--------|--------|--------
1 | 0 | 1.3
1 | 1 | 2.5
1 | 0 | 1.6
2 | 1 | 1.8
2 | 0 | 2.4
3 | 1 | 3.2
将 TargetEncoder
应用于目标类型为“binary”、输入列为 feature
、标签列为 target (bin)
、输出列为 encoded
的数据,我们能够对数据进行模型拟合以学习编码,并根据这些映射转换数据
feature | target | encoded
| (bin) |
--------|--------|--------
1 | 0 | 0.333
1 | 1 | 0.333
1 | 0 | 0.333
2 | 1 | 0.5
2 | 0 | 0.5
3 | 1 | 1.0
将 TargetEncoder
应用于目标类型为“continuous”、输入列为 feature
、标签列为 target (cont)
、输出列为 encoded
的数据,我们能够对数据进行模型拟合以学习编码,并根据这些映射转换数据
feature | target | encoded
| (cont) |
--------|--------|--------
1 | 1.3 | 1.8
1 | 2.5 | 1.8
1 | 1.6 | 1.8
2 | 1.8 | 2.1
2 | 2.4 | 2.1
3 | 3.2 | 3.2
有关 API 的更多详细信息,请参阅 TargetEncoder Python 文档。
from pyspark.ml.feature import TargetEncoder
df = spark.createDataFrame(
[
(0.0, 1.0, 0, 10.0),
(1.0, 0.0, 1, 20.0),
(2.0, 1.0, 0, 30.0),
(0.0, 2.0, 1, 40.0),
(0.0, 1.0, 0, 50.0),
(2.0, 0.0, 1, 60.0),
],
["categoryIndex1", "categoryIndex2", "binaryLabel", "continuousLabel"],
)
# binary target
encoder = TargetEncoder(
inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryIndex1Target", "categoryIndex2Target"],
labelCol="binaryLabel",
targetType="binary"
)
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
# continuous target
encoder = TargetEncoder(
inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryIndex1Target", "categoryIndex2Target"],
labelCol="continuousLabel",
targetType="continuous"
)
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
有关 API 的更多详细信息,请参阅 TargetEncoder Scala 文档。
import org.apache.spark.ml.feature.TargetEncoder
val df = spark.createDataFrame(Seq(
(0.0, 1.0, 0, 10.0),
(1.0, 0.0, 1, 20.0),
(2.0, 1.0, 0, 30.0),
(0.0, 2.0, 1, 40.0),
(0.0, 1.0, 0, 50.0),
(2.0, 0.0, 1, 60.0)
)).toDF("categoryIndex1", "categoryIndex2",
"binaryLabel", "continuousLabel")
// binary target
val bin_encoder = new TargetEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryIndex1Target", "categoryIndex2Target"))
.setLabelCol("binaryLabel")
.setTargetType("binary");
val bin_model = bin_encoder.fit(df)
val bin_encoded = bin_model.transform(df)
bin_encoded.show()
// continuous target
val cont_encoder = new TargetEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryIndex1Target", "categoryIndex2Target"))
.setLabelCol("continuousLabel")
.setTargetType("continuous");
val cont_model = cont_encoder.fit(df)
val cont_encoded = cont_model.transform(df)
cont_encoded.show()
有关 API 的更多详细信息,请参阅 TargetEncoder Java 文档。
import org.apache.spark.ml.feature.TargetEncoder;
import org.apache.spark.ml.feature.TargetEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, 1.0, 0, 10.0),
RowFactory.create(1.0, 0.0, 1, 20.0),
RowFactory.create(2.0, 1.0, 0, 30.0),
RowFactory.create(0.0, 2.0, 1, 40.0),
RowFactory.create(0.0, 1.0, 0, 50.0),
RowFactory.create(2.0, 0.0, 1, 60.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("binaryLabel", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("continuousLabel", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// binary target
TargetEncoder bin_encoder = new TargetEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryIndex1Target", "categoryIndex2Target"})
.setLabelCol("binaryLabel")
.setTargetType("binary");
TargetEncoderModel bin_model = bin_encoder.fit(df);
Dataset<Row> bin_encoded = bin_model.transform(df);
bin_encoded.show();
// continuous target
TargetEncoder cont_encoder = new TargetEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryIndex1Target", "categoryIndex2Target"})
.setLabelCol("continuousLabel")
.setTargetType("continuous");
TargetEncoderModel cont_model = cont_encoder.fit(df);
Dataset<Row> cont_encoded = cont_model.transform(df);
cont_encoded.show();
向量索引器
VectorIndexer
帮助对 Vector
数据集中的类别特征进行索引。它可以自动决定哪些特征是类别特征,并将原始值转换为类别索引。具体来说,它执行以下操作:
- 接受一个 Vector 类型的输入列和一个参数
maxCategories
。 - 根据不同值的数量决定哪些特征应该是类别特征,其中具有最多
maxCategories
不同值的特征被声明为类别特征。 - 为每个类别特征计算基于 0 的类别索引。
- 索引类别特征并将原始特征值转换为索引。
对类别特征进行索引允许决策树和树集成等算法适当地处理类别特征,从而提高性能。
示例
在下面的示例中,我们读取了一个标记点的Kaggle数据集,然后使用 VectorIndexer
来决定哪些特征应被视为类别特征。我们将类别特征值转换为它们的索引。然后,这些转换后的数据可以传递给处理类别特征的算法,例如 DecisionTreeRegressor
。
有关 API 的更多详细信息,请参阅 VectorIndexer Python 文档。
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
有关 API 的更多详细信息,请参阅 VectorIndexer Scala 文档。
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
s"categorical features: ${categoricalFeatures.mkString(", ")}")
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
有关 API 的更多详细信息,请参阅 VectorIndexer Java 文档。
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
交互
Interaction
是一个 Transformer
,它接受向量或双精度浮点列,并生成一个包含每个输入列中一个值的所有组合乘积的单个向量列。
例如,如果您有两个向量类型列,每个列都具有 3 个维度作为输入列,那么您将得到一个 9 维向量作为输出列。
示例
假设我们有以下包含“id1”、“vec1”和“vec2”列的 DataFrame
id1|vec1 |vec2
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
将 Interaction
应用于这些输入列,然后输出列 interactedCol
包含
id1|vec1 |vec2 |interactedCol
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
有关 API 的更多详细信息,请参阅 Interaction Python 文档。
from pyspark.ml.feature import Interaction, VectorAssembler
df = spark.createDataFrame(
[(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)],
["id1", "id2", "id3", "id4", "id5", "id6", "id7"])
assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")
assembled1 = assembler1.transform(df)
assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")
assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")
interacted = interaction.transform(assembled2)
interacted.show(truncate=False)
有关 API 的更多详细信息,请参阅 Interaction Scala 文档。
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler
val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")
val assembler1 = new VectorAssembler().
setInputCols(Array("id2", "id3", "id4")).
setOutputCol("vec1")
val assembled1 = assembler1.transform(df)
val assembler2 = new VectorAssembler().
setInputCols(Array("id5", "id6", "id7")).
setOutputCol("vec2")
val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
val interaction = new Interaction()
.setInputCols(Array("id1", "vec1", "vec2"))
.setOutputCol("interactedCol")
val interacted = interaction.transform(assembled2)
interacted.show(truncate = false)
有关 API 的更多详细信息,请参阅 Interaction Java 文档。
List<Row> data = Arrays.asList(
RowFactory.create(1, 1, 2, 3, 8, 4, 5),
RowFactory.create(2, 4, 3, 8, 7, 9, 8),
RowFactory.create(3, 6, 1, 9, 2, 3, 6),
RowFactory.create(4, 10, 8, 6, 9, 4, 5),
RowFactory.create(5, 9, 2, 7, 10, 7, 3),
RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);
StructType schema = new StructType(new StructField[]{
new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VectorAssembler assembler1 = new VectorAssembler()
.setInputCols(new String[]{"id2", "id3", "id4"})
.setOutputCol("vec1");
Dataset<Row> assembled1 = assembler1.transform(df);
VectorAssembler assembler2 = new VectorAssembler()
.setInputCols(new String[]{"id5", "id6", "id7"})
.setOutputCol("vec2");
Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");
Interaction interaction = new Interaction()
.setInputCols(new String[]{"id1","vec1","vec2"})
.setOutputCol("interactedCol");
Dataset<Row> interacted = interaction.transform(assembled2);
interacted.show(false);
归一化器
Normalizer
是一个 Transformer
,它转换由 Vector
行组成的数据集,将每个 Vector
归一化为单位范数。它接受参数 p
,该参数指定用于归一化的p-范数。(默认情况下 $p = 2$。)这种归一化有助于标准化您的输入数据并改善学习算法的行为。
示例
以下示例演示了如何以 libsvm 格式加载数据集,然后将每行归一化为单位 $L^1$ 范数和单位 $L^\infty$ 范数。
有关 API 的更多详细信息,请参阅 Normalizer Python 文档。
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
有关 API 的更多详细信息,请参阅 Normalizer Scala 文档。
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
有关 API 的更多详细信息,请参阅 Normalizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
标准化器
StandardScaler
转换由 Vector
行组成的数据集,将每个特征标准化为单位标准差和/或零均值。它接受参数:
withStd
:默认为 True。将数据缩放到单位标准差。withMean
:默认为 False。在缩放之前,使用均值使数据居中。它将生成密集输出,因此应用于稀疏输入时请注意。
StandardScaler
是一个 Estimator
,可以在数据集上进行 fit
以生成一个 StandardScalerModel
;这相当于计算汇总统计量。然后该模型可以将数据集中的 Vector
列转换为具有单位标准差和/或零均值特征。
请注意,如果某个特征的标准差为零,它将在该特征的 Vector
中返回默认值 0.0
。
示例
以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征归一化为单位标准差。
有关 API 的更多详细信息,请参阅 StandardScaler Python 文档。
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 StandardScaler Scala 文档。
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 StandardScaler Java 文档。
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
鲁棒缩放器
RobustScaler
转换由 Vector
行组成的数据集,移除中位数并根据特定的分位数范围(默认为 IQR:四分位距,即第一四分位数和第三四分位数之间的范围)缩放数据。它的行为与 StandardScaler
相当相似,但使用中位数和分位数范围代替均值和标准差,这使其对异常值具有鲁棒性。它接受参数:
lower
:默认为 0.25。计算分位数范围的下分位数,所有特征共享。upper
:默认为 0.75。计算分位数范围的上分位数,所有特征共享。withScaling
:默认为 True。将数据缩放到分位数范围。withCentering
:默认为 False。在缩放之前,使用中位数使数据居中。它将生成密集输出,因此应用于稀疏输入时请注意。
RobustScaler
是一个 Estimator
,可以在数据集上进行 fit
以生成一个 RobustScalerModel
;这相当于计算分位数统计量。然后该模型可以将数据集中的 Vector
列转换为具有单位分位数范围和/或零中值特征。
请注意,如果某个特征的分位数范围为零,它将在该特征的 Vector
中返回默认值 0.0
。
示例
以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征归一化为单位分位数范围。
有关 API 的更多详细信息,请参阅 RobustScaler Python 文档。
from pyspark.ml.feature import RobustScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
withScaling=True, withCentering=False,
lower=0.25, upper=0.75)
# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)
# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 RobustScaler Scala 文档。
import org.apache.spark.ml.feature.RobustScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75)
// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)
// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 RobustScaler Java 文档。
import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
RobustScaler scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75);
// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);
// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
最小最大值缩放器
MinMaxScaler
转换由 Vector
行组成的数据集,将每个特征重新缩放到特定范围(通常是 [0, 1])。它接受参数:
min
:默认为 0.0。转换后的下界,所有特征共享。max
:默认为 1.0。转换后的上界,所有特征共享。
MinMaxScaler
计算数据集上的汇总统计量并生成一个 MinMaxScalerModel
。然后该模型可以单独转换每个特征,使其在给定范围内。
特征 E 的重新缩放值计算如下:\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation}
对于 $E_{max} == E_{min}$
的情况,$Rescaled(e_i) = 0.5 * (max + min)$
请注意,由于零值可能会转换为非零值,因此即使对于稀疏输入,转换器的输出也将是 DenseVector
。
示例
以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征重新缩放到 [0, 1] 范围。
有关 API 的更多详细信息,请参阅 MinMaxScaler Python 文档和 MinMaxScalerModel Python 文档。
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅 MinMaxScaler Scala 文档和 MinMaxScalerModel Scala 文档。
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅 MinMaxScaler Java 文档和 MinMaxScalerModel Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
+ scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
最大绝对值缩放器
MaxAbsScaler
转换由 Vector
行组成的数据集,通过除以每个特征中的最大绝对值,将每个特征重新缩放到 [-1, 1] 范围。它不会平移/居中数据,因此不会破坏任何稀疏性。
MaxAbsScaler
计算数据集上的汇总统计量并生成一个 MaxAbsScalerModel
。然后该模型可以单独转换每个特征,使其在 [-1, 1] 范围内。
示例
以下示例演示了如何以 libsvm 格式加载数据集,然后将每个特征重新缩放到 [-1, 1] 范围。
有关 API 的更多详细信息,请参阅 MaxAbsScaler Python 文档和 MaxAbsScalerModel Python 文档。
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅 MaxAbsScaler Scala 文档和 MaxAbsScalerModel Scala 文档。
import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -8.0)),
(1, Vectors.dense(2.0, 1.0, -4.0)),
(2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅 MaxAbsScaler Java 文档和 MaxAbsScalerModel Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MaxAbsScaler scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
分桶器
Bucketizer
将连续特征列转换为特征分桶列,其中分桶由用户指定。它接受参数:
splits
:用于将连续特征映射到分桶中的参数。有 n+1 个分割点时,就有 n 个分桶。由分割点 x,y 定义的分桶包含范围 [x,y) 内的值,除了最后一个分桶,它也包含 y。分割点应严格递增。必须明确提供 -inf 和 inf 处的值以覆盖所有 Double 值;否则,超出指定分割点的值将被视为错误。两个splits
的示例是Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)
和Array(0.0, 1.0, 2.0)
。
另请注意,如果您不知道目标列的上限和下限,您应该将 Double.NegativeInfinity
和 Double.PositiveInfinity
添加为分割点的边界,以防止可能出现 Bucketizer 越界异常。
另请注意,您提供的分割点必须是严格递增的,即 s0 < s1 < s2 < ... < sn
。
更多详细信息可以在 Bucketizer 的 API 文档中找到。
示例
以下示例演示了如何将 Double
列分桶到另一个索引化列中。
有关 API 的更多详细信息,请参阅 Bucketizer Python 文档。
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
有关 API 的更多详细信息,请参阅 Bucketizer Scala 文档。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(
immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply))).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()
val splitsArray = Array(
Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))
val data2 = Array(
(-999.9, -999.9),
(-0.5, -0.2),
(-0.3, -0.1),
(0.0, 0.0),
(0.2, 0.4),
(999.9, 999.9))
val dataFrame2 = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data2))
.toDF("features1", "features2")
val bucketizer2 = new Bucketizer()
.setInputCols(Array("features1", "features2"))
.setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
.setSplitsArray(splitsArray)
// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)
println(s"Bucketizer output with [" +
s"${bucketizer2.getSplitsArray(0).length-1}, " +
s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
有关 API 的更多详细信息,请参阅 Bucketizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
List<Row> data = Arrays.asList(
RowFactory.create(-999.9),
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2),
RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();
// Bucketize multiple columns at one pass.
double[][] splitsArray = {
{Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
{Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};
List<Row> data2 = Arrays.asList(
RowFactory.create(-999.9, -999.9),
RowFactory.create(-0.5, -0.2),
RowFactory.create(-0.3, -0.1),
RowFactory.create(0.0, 0.0),
RowFactory.create(0.2, 0.4),
RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);
Bucketizer bucketizer2 = new Bucketizer()
.setInputCols(new String[] {"features1", "features2"})
.setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
.setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);
System.out.println("Bucketizer output with [" +
(bucketizer2.getSplitsArray()[0].length-1) + ", " +
(bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
元素乘积
元素乘积通过逐元素乘法将每个输入向量乘以提供的“权重”向量。换句话说,它通过标量乘数缩放数据集的每一列。这表示输入向量 v
和转换向量 w
之间的哈达玛乘积,以产生结果向量。
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
示例
以下示例演示了如何使用转换向量值来转换向量。
有关 API 的更多详细信息,请参阅 ElementwiseProduct Python 文档。
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
有关 API 的更多详细信息,请参阅 ElementwiseProduct Scala 文档。
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
有关 API 的更多详细信息,请参阅 ElementwiseProduct Java 文档。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);
List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
SQL转换器
SQLTransformer
实现了由 SQL 语句定义的转换。目前,我们只支持类似 "SELECT ... FROM __THIS__ ..."
的 SQL 语法,其中 "__THIS__"
表示输入数据集的底层表。SELECT 子句指定要在输出中显示的字段、常量和表达式,并且可以是 Spark SQL 支持的任何 SELECT 子句。用户还可以使用 Spark SQL 内置函数和 UDF 来操作这些选定的列。例如,SQLTransformer
支持以下语句:
SELECT a, a + b AS a_b FROM __THIS__
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
示例
假设我们有以下包含 id
、v1
和 v2
列的 DataFrame
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
这是使用语句 "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"
的 SQLTransformer
的输出:
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
有关 API 的更多详细信息,请参阅 SQLTransformer Python 文档。
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
有关 API 的更多详细信息,请参阅 SQLTransformer Scala 文档。
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
有关 API 的更多详细信息,请参阅 SQLTransformer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, 1.0, 3.0),
RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
SQLTransformer sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
sqlTrans.transform(df).show();
向量组合器
VectorAssembler
是一个转换器,它将给定的列列表组合成单个向量列。它对于将原始特征和不同特征转换器生成的特征组合成一个特征向量非常有用,以便训练逻辑回归和决策树等机器学习模型。VectorAssembler
接受以下输入列类型:所有数值类型、布尔类型和向量类型。在每一行中,输入列的值将按指定顺序连接成一个向量。
示例
假设我们有一个包含 id
、hour
、mobile
、userFeatures
和 clicked
列的 DataFrame
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures
是一个包含三个用户特征的向量列。我们希望将 hour
、mobile
和 userFeatures
组合成一个名为 features
的特征向量,并使用它来预测是否 clicked
。如果我们将 VectorAssembler
的输入列设置为 hour
、mobile
和 userFeatures
,输出列设置为 features
,则转换后我们应该得到以下 DataFrame:
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
有关 API 的更多详细信息,请参阅 VectorAssembler Python 文档。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
有关 API 的更多详细信息,请参阅 VectorAssembler Scala 文档。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
有关 API 的更多详细信息,请参阅 VectorAssembler Java 文档。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
向量大小提示
有时,明确指定 VectorType
列的向量大小会很有用。例如,VectorAssembler
使用其输入列的大小信息来为其输出列生成大小信息和元数据。虽然在某些情况下可以通过检查列的内容来获取此信息,但在流式 DataFrame 中,直到流启动后才能获取内容。VectorSizeHint
允许用户明确指定列的向量大小,以便 VectorAssembler
或其他可能需要了解向量大小的转换器可以将该列用作输入。
要使用 VectorSizeHint
,用户必须设置 inputCol
和 size
参数。将此转换器应用于 DataFrame 会生成一个新的 DataFrame,其中 inputCol
的元数据已更新,指定了向量大小。对结果 DataFrame 的下游操作可以使用此元数据获取该大小。
VectorSizeHint
还可以接受一个可选的 handleInvalid
参数,用于控制当向量列包含空值或错误大小的向量时的行为。默认情况下,handleInvalid
设置为“error”,表示应抛出异常。此参数也可以设置为“skip”,表示应从结果 DataFrame 中过滤掉包含无效值的行;或设置为“optimistic”,表示不应检查列中的无效值,并应保留所有行。请注意,使用“optimistic”可能会导致结果 DataFrame 处于不一致状态,这意味着 VectorSizeHint
应用的列的元数据与该列的内容不匹配。用户应注意避免这种不一致状态。
有关 API 的更多详细信息,请参阅 VectorSizeHint Python 文档。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
(0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
sizeHint = VectorSizeHint(
inputCol="userFeatures",
handleInvalid="skip",
size=3)
datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
有关 API 的更多详细信息,请参阅 VectorSizeHint Scala 文档。
import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq(
(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3)
val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
有关 API 的更多详细信息,请参阅 VectorSizeHint Java 文档。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);
VectorSizeHint sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3);
Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
分位数离散化器
QuantileDiscretizer
接受一个包含连续特征的列,并输出一个包含分桶类别特征的列。桶的数量由 numBuckets
参数设置。使用的桶的数量可能小于此值,例如,如果输入的唯一值太少而无法创建足够多的唯一分位数。
NaN 值:在 QuantileDiscretizer
拟合期间,NaN 值将从列中移除。这将生成一个用于进行预测的 Bucketizer
模型。在转换期间,Bucketizer
在数据集中发现 NaN 值时会引发错误,但用户也可以通过设置 handleInvalid
来选择保留或移除数据集中的 NaN 值。如果用户选择保留 NaN 值,它们将得到特殊处理并放置在自己的桶中,例如,如果使用 4 个桶,那么非 NaN 数据将放入桶 [0-3] 中,而 NaN 值将计入一个特殊的桶 [4] 中。
算法:分箱范围是使用近似算法选择的(详细描述请参见 approxQuantile 的文档)。近似的精度可以通过 relativeError
参数控制。当设置为零时,将计算精确分位数(注意:计算精确分位数是一项耗时的操作)。下限和上限分箱边界将分别为 -Infinity
和 +Infinity
,覆盖所有实数值。
示例
假设我们有一个包含 id
, hour
列的 DataFrame。
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour
是一个 Double
类型的连续特征。我们希望将这个连续特征转换为一个类别特征。给定 numBuckets = 3
,我们应该得到以下 DataFrame:
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
有关 API 的更多详细信息,请参阅 QuantileDiscretizer Python 文档。
from pyspark.ml.feature import QuantileDiscretizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()
有关 API 的更多详细信息,请参阅 QuantileDiscretizer Scala 文档。
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data)).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show(false)
有关 API 的更多详细信息,请参阅 QuantileDiscretizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 18.0),
RowFactory.create(1, 19.0),
RowFactory.create(2, 8.0),
RowFactory.create(3, 5.0),
RowFactory.create(4, 2.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
QuantileDiscretizer discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3);
Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
填补器
Imputer
估算器使用缺失值所在列的均值、中位数或众数来填充数据集中的缺失值。输入列应为数值类型。目前 Imputer
不支持类别特征,并且可能为包含类别特征的列创建不正确的值。Imputer
可以通过 .setMissingValue(custom_value)
填充除 'NaN' 之外的自定义值。例如,.setMissingValue(0)
将填充所有出现的 (0)。
注意:输入列中的所有 null
值都被视为缺失值,因此也会被填充。
示例
假设我们有一个包含 a
和 b
列的 DataFrame。
a | b
------------|-----------
1.0 | Double.NaN
2.0 | Double.NaN
Double.NaN | 3.0
4.0 | 4.0
5.0 | 5.0
在此示例中,Imputer 将所有出现的 Double.NaN
(缺失值的默认值)替换为从相应列中其他值计算出的均值(默认填充策略)。在此示例中,列 a
和 b
的替代值分别为 3.0 和 4.0。转换后,输出列中的缺失值将替换为相关列的替代值。
a | b | out_a | out_b
------------|------------|-------|-------
1.0 | Double.NaN | 1.0 | 4.0
2.0 | Double.NaN | 2.0 | 4.0
Double.NaN | 3.0 | 3.0 | 3.0
4.0 | 4.0 | 4.0 | 4.0
5.0 | 5.0 | 5.0 | 5.0
有关 API 的更多详细信息,请参阅 Imputer Python 文档。
from pyspark.ml.feature import Imputer
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()
有关 API 的更多详细信息,请参阅 Imputer Scala 文档。
import org.apache.spark.ml.feature.Imputer
val df = spark.createDataFrame(Seq(
(1.0, Double.NaN),
(2.0, Double.NaN),
(Double.NaN, 3.0),
(4.0, 4.0),
(5.0, 5.0)
)).toDF("a", "b")
val imputer = new Imputer()
.setInputCols(Array("a", "b"))
.setOutputCols(Array("out_a", "out_b"))
val model = imputer.fit(df)
model.transform(df).show()
有关 API 的更多详细信息,请参阅 Imputer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1.0, Double.NaN),
RowFactory.create(2.0, Double.NaN),
RowFactory.create(Double.NaN, 3.0),
RowFactory.create(4.0, 4.0),
RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
createStructField("a", DoubleType, false),
createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Imputer imputer = new Imputer()
.setInputCols(new String[]{"a", "b"})
.setOutputCols(new String[]{"out_a", "out_b"});
ImputerModel model = imputer.fit(df);
model.transform(df).show();
特征选择器
向量切片器
VectorSlicer
是一个转换器,它接收一个特征向量并输出一个包含原始特征子数组的新特征向量。它对于从向量列中提取特征非常有用。
VectorSlicer
接受一个带有指定索引的向量列,然后输出一个新向量列,其值通过这些索引选择。有两种类型的索引:
-
表示向量索引的整数索引,
setIndices()
。 -
表示向量中特征名称的字符串索引,
setNames()
。这要求向量列具有AttributeGroup
,因为实现是根据Attribute
的名称字段进行匹配的。
整数和字符串指定都可接受。此外,您可以同时使用整数索引和字符串名称。必须至少选择一个特征。不允许重复特征,因此选定的索引和名称之间不能有重叠。请注意,如果选择特征名称,遇到空的输入属性时将抛出异常。
输出向量将首先按选定索引的顺序(按给定顺序)排列特征,然后按选定名称的顺序(按给定顺序)排列。
示例
假设我们有一个包含 userFeatures
列的 DataFrame。
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures
是一个包含三个用户特征的向量列。假设 userFeatures
的第一列都是零,所以我们想删除它,只选择最后两列。VectorSlicer
使用 setIndices(1, 2)
选择最后两个元素,然后生成一个名为 features
的新向量列。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
还假设我们有 userFeatures
的潜在输入属性,即 ["f1", "f2", "f3"]
,那么我们可以使用 setNames("f2", "f3")
来选择它们。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
有关 API 的更多详细信息,请参阅 VectorSlicer Python 文档。
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
output = slicer.transform(df)
output.select("userFeatures", "features").show()
有关 API 的更多详细信息,请参阅 VectorSlicer Scala 文档。
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
val data = Arrays.asList(
Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
Row(Vectors.dense(-2.0, 2.3, 0.0))
)
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))
val output = slicer.transform(dataset)
output.show(false)
有关 API 的更多详细信息,请参阅 VectorSlicer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
Attribute[] attrs = {
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);
Dataset<Row> dataset =
spark.createDataFrame(data, (new StructType()).add(group.toStructField()));
VectorSlicer vectorSlicer = new VectorSlicer()
.setInputCol("userFeatures").setOutputCol("features");
vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})
Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
RFormula
RFormula
根据 R 模型公式 选择列。目前我们支持 R 运算符的有限子集,包括 ' ~ ', ' . ', ' : ', ' + ' 和 ' - '。基本运算符是:
~
分隔目标和项+
连接项,“+ 0” 表示移除截距-
移除项,“- 1” 表示移除截距:
交互(数值相乘,或二值化类别值).
除目标外的所有列
假设 a
和 b
是双精度列,我们使用以下简单示例来说明 RFormula
的效果:
y ~ a + b
表示模型y ~ w0 + w1 * a + w2 * b
,其中w0
是截距,w1, w2
是系数。y ~ a + b + a:b - 1
表示模型y ~ w1 * a + w2 * b + w3 * a * b
,其中w1, w2, w3
是系数。
RFormula
产生一个特征向量列和一个双精度或字符串标签列。就像在 R 中用于线性回归的公式一样,数值列将被转换为双精度。对于字符串输入列,它们将首先使用 StringIndexer 进行转换,其排序由 stringOrderType
确定,排序后的最后一个类别将被删除,然后双精度值将被独热编码。
假设一个字符串特征列包含值 {'b', 'a', 'b', 'a', 'c', 'b'}
,我们设置 stringOrderType
来控制编码:
stringOrderType | Category mapped to 0 by StringIndexer | Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b') | least frequent category ('c')
'frequencyAsc' | least frequent category ('c') | most frequent category ('b')
'alphabetDesc' | last alphabetical category ('c') | first alphabetical category ('a')
'alphabetAsc' | first alphabetical category ('a') | last alphabetical category ('c')
如果标签列是字符串类型,它将首先使用 StringIndexer 转换为双精度,并使用 frequencyDesc
排序。如果 DataFrame 中不存在标签列,则输出标签列将从公式中指定的响应变量创建。
注意:排序选项 stringOrderType
不用于标签列。当标签列被索引时,它使用 StringIndexer
中默认的降序频率排序。
示例
假设我们有一个包含 id
、country
、hour
和 clicked
列的 DataFrame。
id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
如果我们使用 RFormula
,公式字符串为 clicked ~ country + hour
,这表示我们想基于 country
和 hour
预测 clicked
,转换后我们应该得到以下 DataFrame:
id | country | hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
有关 API 的更多详细信息,请参阅 RFormula Python 文档。
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
有关 API 的更多详细信息,请参阅 RFormula Scala 文档。
import org.apache.spark.ml.feature.RFormula
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
有关 API 的更多详细信息,请参阅 RFormula Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("country", StringType, false),
createStructField("hour", IntegerType, false),
createStructField("clicked", DoubleType, false)
});
List<Row> data = Arrays.asList(
RowFactory.create(7, "US", 18, 1.0),
RowFactory.create(8, "CA", 12, 0.0),
RowFactory.create(9, "NZ", 15, 0.0)
);
Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
卡方选择器
ChiSqSelector
代表卡方特征选择。它对带有类别特征的标记数据进行操作。ChiSqSelector
使用 卡方独立性检验 来决定选择哪些特征。它支持五种选择方法:numTopFeatures
、percentile
、fpr
、fdr
、fwe
。
numTopFeatures
根据卡方检验选择固定数量的顶部特征。这类似于选择预测能力最强的特征。percentile
与numTopFeatures
类似,但选择所有特征的某个百分比而不是固定数量。fpr
选择 p 值低于阈值的所有特征,从而控制选择的假阳性率。fdr
使用 Benjamini-Hochberg 过程 选择所有假发现率低于阈值的特征。fwe
选择所有 p 值低于阈值的特征。阈值按 1/numFeatures 缩放,从而控制选择的家族误差率。默认情况下,选择方法是numTopFeatures
,默认的顶部特征数量设置为 50。用户可以使用setSelectorType
选择一种选择方法。
示例
假设我们有一个包含 id
、features
和 clicked
列的 DataFrame,其中 clicked
是我们要预测的目标。
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
如果我们使用 ChiSqSelector
且 numTopFeatures = 1
,那么根据我们的标签 clicked
,features
中的最后一列被选为最有用的特征。
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
有关 API 的更多详细信息,请参阅 ChiSqSelector Python 文档。
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
有关 API 的更多详细信息,请参阅 ChiSqSelector Scala 文档。
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataset(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
有关 API 的更多详细信息,请参阅 ChiSqSelector Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
+ " features selected");
result.show();
单变量特征选择器
UnivariateFeatureSelector
对带有类别/连续特征的类别/连续标签进行操作。用户可以设置 featureType
和 labelType
,Spark 将根据指定的 featureType
和 labelType
选择要使用的评分函数。
featureType | labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous |categorical | ANOVATest (f_classif)
continuous |continuous | F-value (f_regression)
它支持五种选择模式:numTopFeatures
, percentile
, fpr
, fdr
, fwe
numTopFeatures
选择固定数量的顶部特征。percentile
与numTopFeatures
类似,但选择所有特征的某个百分比而不是固定数量。fpr
选择 p 值低于阈值的所有特征,从而控制选择的假阳性率。fdr
使用 Benjamini-Hochberg 过程 选择所有假发现率低于阈值的特征。fwe
选择所有 p 值低于阈值的特征。阈值按 1/numFeatures 缩放,从而控制选择的家族误差率。
默认情况下,选择模式为 numTopFeatures
,默认选择阈值设置为 50。
示例
假设我们有一个包含 id
、features
和 label
列的 DataFrame,其中 label
是我们要预测的目标。
id | features | label
---|--------------------------------|---------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0
如果我们设置 featureType
为 continuous
,labelType
为 categorical
,并且 numTopFeatures = 1
,那么 features
中的最后一列被选为最有用的特征。
id | features | label | selectedFeatures
---|--------------------------------|---------|------------------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0 | [2.3]
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0 | [4.1]
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0 | [2.5]
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0 | [3.8]
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0 | [3.0]
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0 | [2.1]
有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Python 文档。
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
(2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
(3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
(4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
(5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
(6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)
result = selector.fit(df).transform(df)
print("UnivariateFeatureSelector output with top %d features selected using f_classif"
% selector.getSelectionThreshold())
result.show()
有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Scala 文档。
import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)
val df = spark.createDataset(data).toDF("id", "features", "label")
val selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
s" features selected using f_classif")
result.show()
有关 API 的更多详细信息,请参阅 UnivariateFeatureSelector Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("UnivariateFeatureSelector output with top "
+ selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
方差阈值选择器
VarianceThresholdSelector
是一个选择器,用于移除低方差特征。方差不大于 varianceThreshold
的特征将被移除。如果未设置,varianceThreshold
默认为 0,这意味着只有方差为 0 的特征(即在所有样本中具有相同值的特征)将被移除。
示例
假设我们有一个包含 id
和 features
列的 DataFrame,其中 features
是我们要预测的目标。
id | features
---|--------------------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]
这6个特征的样本方差分别为16.67、0.67、8.17、10.17、5.07和11.47。如果我们使用 VarianceThresholdSelector
且 varianceThreshold = 8.0
,那么方差小于等于 8.0 的特征将被移除。
id | features | selectedFeatures
---|--------------------------------|-------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]
有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Python 文档。
from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
(2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
(3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
(4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
(5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
(6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])
selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")
result = selector.fit(df).transform(df)
print("Output: Features with variance lower than %f are removed." %
selector.getVarianceThreshold())
result.show()
有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Scala 文档。
import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)
val df = spark.createDataset(data).toDF("id", "features")
val selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"Output: Features with variance lower than" +
s" ${selector.getVarianceThreshold} are removed.")
result.show()
有关 API 的更多详细信息,请参阅 VarianceThresholdSelector Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VarianceThresholdSelector selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("Output: Features with variance lower than "
+ selector.getVarianceThreshold() + " are removed.");
result.show();
局部敏感哈希
局部敏感哈希 (LSH) 是一种重要的哈希技术,常用于聚类、近似最近邻搜索和大型数据集的异常值检测。
LSH 的一般思想是使用一组函数(“LSH 族”)将数据点哈希到桶中,使得彼此靠近的数据点以高概率位于同一桶中,而彼此远离的数据点则很可能位于不同桶中。LSH 族形式化定义如下:
在度量空间 (M, d)
中,其中 M
是一个集合,d
是 M
上的距离函数,LSH 族是满足以下性质的函数族 h
: \[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \]
这个 LSH 族被称为 (r1, r2, p1, p2)
-敏感。
在 Spark 中,不同的 LSH 族在单独的类中实现(例如,MinHash
),并且在每个类中都提供了用于特征转换、近似相似度连接和近似最近邻的 API。
在 LSH 中,我们将假阳性定义为一对相距较远的输入特征($d(p,q) \geq r2$
)被哈希到同一个桶中,将假阴性定义为一对相邻特征($d(p,q) \leq r1$
)被哈希到不同桶中。
LSH 操作
我们描述了 LSH 可以用于的主要操作类型。一个已拟合的 LSH 模型为这些操作中的每一个都提供了方法。
特征转换
特征转换是基本功能,用于将哈希值作为新列添加。这对于降维很有用。用户可以通过设置 inputCol
和 outputCol
来指定输入和输出列名。
LSH 也支持多个 LSH 哈希表。用户可以通过设置 numHashTables
来指定哈希表的数量。这也用于近似相似性连接和近似最近邻中的 OR-放大。增加哈希表的数量将提高准确性,但也会增加通信成本和运行时间。
outputCol
的类型是 Seq[Vector]
,其中数组的维度等于 numHashTables
,并且向量的维度当前设置为 1。在未来的版本中,我们将实现 AND-放大,以便用户可以指定这些向量的维度。
近似相似度连接
近似相似性连接接受两个数据集,并近似返回数据集中距离小于用户定义阈值的行对。近似相似性连接支持连接两个不同的数据集和自连接。自连接将产生一些重复的对。
近似相似性连接接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将自动进行转换。在这种情况下,哈希签名将创建为 outputCol
。
在连接后的数据集中,可以在 datasetA
和 datasetB
中查询原始数据集。输出数据集中将添加一个距离列,以显示返回的每对行之间的真实距离。
近似最近邻搜索
近似最近邻搜索接受一个数据集(特征向量)和一个键(单个特征向量),它近似返回数据集中与该向量最接近的指定数量的行。
近似最近邻搜索接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将自动进行转换。在这种情况下,哈希签名将创建为 outputCol
。
输出数据集中将添加一个距离列,以显示每个输出行与搜索键之间的真实距离。
注意:当哈希桶中没有足够的候选项时,近似最近邻搜索将返回少于 k
行。
LSH 算法
用于欧几里得距离的分桶随机投影
分桶随机投影是欧几里得距离的 LSH 族。欧几里得距离定义如下: \[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \]
其 LSH 族将特征向量 $\mathbf{x}$
投影到随机单位向量 $\mathbf{v}$
上,并将投影结果分到哈希桶中: \[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \]
其中 r
是用户定义的分桶长度。分桶长度可用于控制哈希桶的平均大小(以及桶的数量)。较大的分桶长度(即更少的桶)会增加特征被哈希到同一桶的概率(增加真阳性和假阳性的数量)。
分桶随机投影接受任意向量作为输入特征,并支持稀疏向量和密集向量。
有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Python 文档。
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.dense([1.0, 1.0]),),
(1, Vectors.dense([1.0, -1.0]),),
(2, Vectors.dense([-1.0, -1.0]),),
(3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(4, Vectors.dense([1.0, 0.0]),),
(5, Vectors.dense([-1.0, 0.0]),),
(6, Vectors.dense([0.0, 1.0]),),
(7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.dense([1.0, 0.0])
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
numHashTables=3)
model = brp.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Scala 文档。
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 1.0)),
(1, Vectors.dense(1.0, -1.0)),
(2, Vectors.dense(-1.0, -1.0)),
(3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(4, Vectors.dense(1.0, 0.0)),
(5, Vectors.dense(-1.0, 0.0)),
(6, Vectors.dense(0.0, 1.0)),
(7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")
val key = Vectors.dense(1.0, 0.0)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
val model = brp.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅 BucketedRandomProjectionLSH Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 1.0)),
RowFactory.create(1, Vectors.dense(1.0, -1.0)),
RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(4, Vectors.dense(1.0, 0.0)),
RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
RowFactory.create(6, Vectors.dense(0.0, 1.0)),
RowFactory.create(7, Vectors.dense(0.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
Vector key = Vectors.dense(1.0, 0.0);
BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes");
BucketedRandomProjectionLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
用于 Jaccard 距离的 MinHash
MinHash 是 Jaccard 距离的一种 LSH 族,其中输入特征是自然数集合。两个集合的 Jaccard 距离由它们的交集和并集的基数定义: \[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \]
MinHash 对集合中的每个元素应用一个随机哈希函数 g
,并取所有哈希值的最小值: \[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]
MinHash 的输入集合表示为二进制向量,其中向量索引表示元素本身,向量中的非零值表示该元素在集合中存在。虽然同时支持密集向量和稀疏向量,但通常建议使用稀疏向量以提高效率。例如,Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])
表示空间中有 10 个元素。该集合包含元素 2、元素 3 和元素 5。所有非零值都被视为二进制 "1" 值。
注意:MinHash 无法转换空集合,这意味着任何输入向量必须至少有一个非零条目。
有关 API 的更多详细信息,请参阅 MinHashLSH Python 文档。
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
(1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
(2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
(4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
(5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅 MinHashLSH Scala 文档。
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes")
val model = mh.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅 MinHashLSH Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);
MinHashLSH mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes");
MinHashLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();