提取、转换和选择特征
本节介绍处理特征的算法,大致分为以下几组:
- 提取:从“原始”数据中提取特征
- 转换:缩放、转换或修改特征
- 选择:从更大的特征集中选择一个子集
- 局部敏感哈希(LSH):这类算法结合了特征转换和其他算法的方面。
目录
- 特征提取器
- 特征转换器
- Tokenizer
- StopWordsRemover
- $n$ 元语法
- Binarizer
- PCA
- PolynomialExpansion
- 离散余弦变换(DCT)
- StringIndexer
- IndexToString
- OneHotEncoder
- VectorIndexer
- Interaction
- Normalizer
- StandardScaler
- RobustScaler
- MinMaxScaler
- MaxAbsScaler
- Bucketizer
- ElementwiseProduct
- SQLTransformer
- VectorAssembler
- VectorSizeHint
- QuantileDiscretizer
- Imputer
- 特征选择器
- 局部敏感哈希
特征提取器
TF-IDF
词频-逆文档频率 (TF-IDF) 是一种广泛应用于文本挖掘的特征向量化方法,用于反映语料库中某个词条对文档的重要性。用 $t$
表示词条,$d$
表示文档,$D$
表示语料库。词频 $TF(t, d)$
是词条 $t$
在文档 $d$
中出现的次数,而文档频率 $DF(t, D)$
是包含词条 $t$
的文档数量。如果我们只使用词频来衡量重要性,很容易过分强调那些出现频率很高但对文档信息量很小的词条,例如“a”、“the”和“of”。如果一个词条在整个语料库中出现频率很高,则意味着它不包含特定文档的特殊信息。逆文档频率是衡量词条提供多少信息的数值指标:\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
其中 $|D|$
是语料库中的文档总数。由于使用了对数,如果一个词条出现在所有文档中,则其 IDF 值变为 0。请注意,这里应用了一个平滑项,以避免对语料库之外的词条进行除零运算。TF-IDF 度量值只是 TF 和 IDF 的乘积:\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
词频和文档频率的定义有多种变体。在 MLlib 中,我们将 TF 和 IDF 分开以使其更加灵活。
TF:HashingTF
和 CountVectorizer
都可以用来生成词频向量。
HashingTF
是一个 Transformer
,它接收词条集并将这些集合转换为固定长度的特征向量。在文本处理中,“词条集”可能是一个词袋。HashingTF
利用了哈希技巧。原始特征通过应用哈希函数映射到索引(词条)。这里使用的哈希函数是MurmurHash 3。然后根据映射的索引计算词频。这种方法避免了计算全局词条到索引映射的需要,这对于大型语料库来说可能很昂贵,但它存在潜在的哈希冲突,即不同的原始特征在哈希后可能变成相同的词条。为了减少冲突的可能性,我们可以增加目标特征维度,即哈希表的桶数。由于对哈希值进行简单的模运算来确定向量索引,因此建议使用 2 的幂作为特征维度,否则特征将不会均匀地映射到向量索引。默认特征维度为 $2^{18} = 262,144$
。一个可选的二进制切换参数控制词频计数。当设置为 true 时,所有非零频率计数都设置为 1。这对于对二进制计数(而不是整数计数)进行建模的离散概率模型特别有用。
CountVectorizer
将文本文档转换为词频向量。有关更多详细信息,请参阅CountVectorizer。
IDF:IDF
是一个 Estimator
,它适合于数据集并生成一个 IDFModel
。IDFModel
接收特征向量(通常由 HashingTF
或 CountVectorizer
创建)并缩放每个特征。直观地说,它会降低语料库中频繁出现的特征的权重。
注意:spark.ml
不提供文本分段工具。我们建议用户参考斯坦福 NLP 小组和scalanlp/chalk。
示例
在下面的代码段中,我们从一组句子开始。我们使用 Tokenizer
将每个句子拆分为单词。对于每个句子(词袋),我们使用 HashingTF
将句子哈希到特征向量中。我们使用 IDF
重新缩放特征向量;当使用文本作为特征时,这通常会提高性能。然后,我们的特征向量可以传递给学习算法。
有关 API 的更多详细信息,请参阅HashingTF Python 文档和IDF Python 文档。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
有关 API 的更多详细信息,请参阅HashingTF Scala 文档和IDF Scala 文档。
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val sentenceData = spark.createDataFrame(Seq(
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
有关 API 的更多详细信息,请参阅HashingTF Java 文档和IDF Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, "Hi I heard about Spark"),
RowFactory.create(0.0, "I wish Java could use case classes"),
RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(numFeatures);
Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
Word2Vec
Word2Vec
是一个 Estimator
,它接收表示文档的单词序列并训练一个 Word2VecModel
。该模型将每个单词映射到一个唯一的固定大小的向量。Word2VecModel
使用文档中所有单词的平均值将每个文档转换为一个向量;然后,该向量可以用作预测、文档相似度计算等的特征。有关更多详细信息,请参阅MLlib 用户指南中的 Word2Vec。
示例
在下面的代码段中,我们从一组文档开始,每个文档都表示为一个单词序列。对于每个文档,我们将其转换为一个特征向量。然后,该特征向量可以传递给学习算法。
有关 API 的更多详细信息,请参阅Word2Vec Python 文档。
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
有关 API 的更多详细信息,请参阅Word2Vec Scala 文档。
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
有关 API 的更多详细信息,请参阅Word2Vec Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);
// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);
for (Row row : result.collectAsList()) {
List<String> text = row.getList(0);
Vector vector = (Vector) row.get(1);
System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
CountVectorizer
CountVectorizer
和 CountVectorizerModel
旨在帮助将文本文档集合转换为词例计数向量。当先验字典不可用时,CountVectorizer
可以用作 Estimator
来提取词汇表,并生成 CountVectorizerModel
。该模型为词汇表上的文档生成稀疏表示,然后可以将其传递给 LDA 等其他算法。
在拟合过程中,CountVectorizer
将根据语料库中的词频顺序选择排名靠前的 vocabSize
个词。可选参数 minDF
还通过指定一个词必须出现的最小文档数(如果 < 1.0 则为分数)来影响拟合过程,以将其包含在词汇表中。另一个可选的二进制切换参数控制输出向量。如果设置为 true,则所有非零计数都设置为 1。这对于对二进制计数(而不是整数计数)建模的离散概率模型特别有用。
示例
假设我们有以下包含 id
和 texts
列的 DataFrame
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
texts
中的每一行都是 Array[String] 类型的文档。调用 CountVectorizer
的 fit 会生成一个带有词汇表(a、b、c)的 CountVectorizerModel
。然后,转换后的输出列“vector”包含
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
每个向量表示文档在词汇表上的词例计数。
有关 API 的更多详细信息,请参阅 CountVectorizer Python 文档 和 CountVectorizerModel Python 文档。
from pyspark.ml.feature import CountVectorizer
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
有关 API 的更多详细信息,请参阅 CountVectorizer Scala 文档 和 CountVectorizerModel Scala 文档。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).show(false)
有关 API 的更多详细信息,请参阅 CountVectorizer Java 文档 和 CountVectorizerModel Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("a", "b", "c")),
RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
.setInputCol("text")
.setOutputCol("feature")
.setVocabSize(3)
.setMinDF(2)
.fit(df);
// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
.setInputCol("text")
.setOutputCol("feature");
cvModel.transform(df).show(false);
FeatureHasher
特征哈希将一组分类或数值特征投影到指定维度的特征向量中(通常远小于原始特征空间的维度)。这是使用 哈希技巧 将特征映射到特征向量中的索引来完成的。
FeatureHasher
变换器对多列进行操作。每列可以包含数字或分类特征。列数据类型的行为和处理如下
- 数字列:对于数字特征,列名的哈希值用于将特征值映射到特征向量中的索引。默认情况下,数字特征不被视为分类特征(即使它们是整数)。要将它们视为分类特征,请使用
categoricalCols
参数指定相关列。 - 字符串列:对于分类特征,字符串“column_name=value”的哈希值用于映射到向量索引,指标值为
1.0
。因此,分类特征是“独热”编码的(类似于使用 OneHotEncoder 并将dropLast
设置为 false)。 - 布尔列:布尔值的处理方式与字符串列相同。也就是说,布尔特征表示为“column_name=true”或“column_name=false”,指标值为
1.0
。
空(缺失)值将被忽略(在生成的特征向量中隐式为零)。
此处使用的哈希函数也是 HashingTF 中使用的 MurmurHash 3。由于对哈希值的简单模运算用于确定向量索引,因此建议使用 2 的幂作为 numFeatures 参数;否则,特征将不会均匀地映射到向量索引。
示例
假设我们有一个包含 4 个输入列的 DataFrame:real
、bool
、stringNum
和 string
。这些不同的数据类型作为输入将说明变换的行为,以生成特征向量列。
real| bool|stringNum|string
----|-----|---------|------
2.2| true| 1| foo
3.3|false| 2| bar
4.4|false| 3| baz
5.5|false| 4| foo
然后,FeatureHasher.transform
在此 DataFrame 上的输出为
real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1 |foo |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2 |bar |(262144,[6031, 80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3 |baz |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4 |foo |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])
然后可以将生成的特征向量传递给学习算法。
有关 API 的更多详细信息,请参阅 FeatureHasher Python 文档。
from pyspark.ml.feature import FeatureHasher
dataset = spark.createDataFrame([
(2.2, True, "1", "foo"),
(3.3, False, "2", "bar"),
(4.4, False, "3", "baz"),
(5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)
有关 API 的更多详细信息,请参阅 FeatureHasher Scala 文档。
import org.apache.spark.ml.feature.FeatureHasher
val dataset = spark.createDataFrame(Seq(
(2.2, true, "1", "foo"),
(3.3, false, "2", "bar"),
(4.4, false, "3", "baz"),
(5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")
val hasher = new FeatureHasher()
.setInputCols("real", "bool", "stringNum", "string")
.setOutputCol("features")
val featurized = hasher.transform(dataset)
featurized.show(false)
有关 API 的更多详细信息,请参阅 FeatureHasher Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(2.2, true, "1", "foo"),
RowFactory.create(3.3, false, "2", "bar"),
RowFactory.create(4.4, false, "3", "baz"),
RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
FeatureHasher hasher = new FeatureHasher()
.setInputCols(new String[]{"real", "bool", "stringNum", "string"})
.setOutputCol("features");
Dataset<Row> featurized = hasher.transform(dataset);
featurized.show(false);
特征转换器
Tokenizer
分词 是获取文本(例如句子)并将其分解为单个词项(通常是单词)的过程。一个简单的 Tokenizer 类提供了此功能。下面的示例显示了如何将句子拆分为单词序列。
RegexTokenizer 允许基于正则表达式 (regex) 匹配进行更高级的分词。默认情况下,参数“pattern”(正则表达式,默认值:"\\s+"
)用作分隔符来拆分输入文本。或者,用户可以将参数“gaps”设置为 false,表示正则表达式“pattern”表示“词例”而不是拆分间隙,并将所有匹配的出现作为分词结果。
示例
有关 API 的更多详细信息,请参阅 Tokenizer Python 文档 和 RegexTokenizer Python 文档。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
有关 API 的更多详细信息,请参阅 Tokenizer Scala 文档 和 RegexTokenizer Scala 文档。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)
val countTokens = udf { (words: Seq[String]) => words.length }
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
有关 API 的更多详细信息,请参阅 Tokenizer Java 文档 和 RegexTokenizer Java 文档。
import java.util.Arrays;
import java.util.List;
import scala.collection.mutable.Seq;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;
List<Row> data = Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
spark.udf().register(
"countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
StopWordsRemover
停用词 是应该从输入中排除的词,通常是因为这些词出现频率很高,而且没有太多含义。
StopWordsRemover
将字符串序列(例如 Tokenizer 的输出)作为输入,并从输入序列中删除所有停用词。停用词列表由 stopWords
参数指定。可以通过调用 StopWordsRemover.loadDefaultStopWords(language)
来访问某些语言的默认停用词,可用的选项包括“danish”、“dutch”、“english”、“finnish”、“french”、“german”、“hungarian”、“italian”、“norwegian”、“portuguese”、“russian”、“spanish”、“swedish”和“turkish”。布尔参数 caseSensitive
指示匹配是否区分大小写(默认为 false)。
示例
假设我们有以下包含 id
和 raw
列的 DataFrame
id | raw
----|----------
0 | [I, saw, the, red, balloon]
1 | [Mary, had, a, little, lamb]
将 StopWordsRemover
应用于 raw
作为输入列,filtered
作为输出列,我们应该得到以下结果
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, balloon] | [saw, red, balloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
在 filtered
中,停用词“I”、“the”、“had”和“a”已被过滤掉。
有关 API 的更多详细信息,请参阅 StopWordsRemover Python 文档。
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
有关 API 的更多详细信息,请参阅 StopWordsRemover Scala 文档。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = spark.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "balloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show(false)
有关 API 的更多详细信息,请参阅 StopWordsRemover Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered");
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);
StructType schema = new StructType(new StructField[]{
new StructField(
"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
$n$ 元语法
n 元语法 是对于某个整数 $n$ 的 $n$ 个词例(通常是单词)的序列。NGram
类可用于将输入特征转换为 $n$ 元语法。
NGram
将字符串序列(例如 Tokenizer 的输出)作为输入。参数 n
用于确定每个 $n$ 元语法中的词例数。输出将包含一系列 $n$ 元语法,其中每个 $n$ 元语法由一个以空格分隔的 $n$ 个连续词的字符串表示。如果输入序列包含少于 n
个字符串,则不会产生输出。
示例
有关 API 的更多详细信息,请参阅 NGram Python 文档。
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
有关 API 的更多详细信息,请参阅 NGram Scala 文档。
import org.apache.spark.ml.feature.NGram
val wordDataFrame = spark.createDataFrame(Seq(
(0, Array("Hi", "I", "heard", "about", "Spark")),
(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
(2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")
val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
有关 API 的更多详细信息,请参阅 NGram Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");
Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
Binarizer
二值化是将数值特征阈值化为二进制 (0/1) 特征的过程。
Binarizer
采用通用参数 inputCol
和 outputCol
,以及用于二值化的 threshold
。大于阈值的特征值将被二值化为 1.0;小于或等于阈值的特征值将被二值化为 0.0。 inputCol
支持 Vector 和 Double 类型。
示例
有关 API 的更多详细信息,请参阅Binarizer Python 文档。
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
有关 API 的更多详细信息,请参阅Binarizer Scala 文档。
import org.apache.spark.ml.feature.Binarizer
val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
有关 API 的更多详细信息,请参阅Binarizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 0.1),
RowFactory.create(1, 0.8),
RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);
Binarizer binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5);
Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);
System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
PCA
PCA(主成分分析)是一种统计过程,它使用正交变换将一组可能相关的变量的观测值转换为一组线性不相关的变量的值,这些变量称为主成分。 PCA 类训练一个模型,使用 PCA 将向量投影到低维空间。下面的示例显示了如何将 5 维特征向量投影到 3 维主成分。
示例
有关 API 的更多详细信息,请参阅PCA Python 文档。
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
有关 API 的更多详细信息,请参阅PCA Scala 文档。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val result = pca.transform(df).select("pcaFeatures")
result.show(false)
有关 API 的更多详细信息,请参阅PCA Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df);
Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
PolynomialExpansion
多项式展开是将特征扩展到多项式空间的过程,该空间由原始维度的 n 次组合构成。 PolynomialExpansion 类提供了此功能。下面的示例显示了如何将特征扩展到 3 次多项式空间。
示例
有关 API 的更多详细信息,请参阅PolynomialExpansion Python 文档。
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)
有关 API 的更多详细信息,请参阅PolynomialExpansion Scala 文档。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.dense(2.0, 1.0),
Vectors.dense(0.0, 0.0),
Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polyExpansion.transform(df)
polyDF.show(false)
有关 API 的更多详细信息,请参阅PolynomialExpansion Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(2.0, 1.0)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
离散余弦变换(DCT)
离散余弦变换将时域中的长度为 $N$ 的实值序列转换为频域中的另一个长度为 $N$ 的实值序列。 DCT 类提供了此功能,实现了 DCT-II 并将结果缩放 $1/\sqrt{2}$,使得变换的表示矩阵是酉矩阵。不对变换后的序列应用移位(例如,变换后的序列的第 $0$ 个元素是第 $0$ 个 DCT 系数,而*不是*第 $N/2$ 个)。
示例
有关 API 的更多详细信息,请参阅DCT Python 文档。
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)
有关 API 的更多详细信息,请参阅DCT Scala 文档。
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
有关 API 的更多详细信息,请参阅DCT Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
DCT dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false);
Dataset<Row> dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(false);
StringIndexer
StringIndexer
将标签的字符串列编码为标签索引列。 StringIndexer
可以编码多列。索引位于 [0, numLabels)
中,并支持四种排序选项:“frequencyDesc”:按标签频率降序排列(最频繁的标签分配为 0),“frequencyAsc”:按标签频率升序排列(最不频繁的标签分配为 0),“alphabetDesc”:按字母降序排列,以及“alphabetAsc”:按字母升序排列(默认值为“frequencyDesc”)。请注意,如果在“frequencyDesc”/“frequencyAsc”下频率相同,则字符串将按字母顺序进一步排序。
如果用户选择保留未出现的标签,则它们将位于索引 numLabels 处。如果输入列是数字,我们会将其转换为字符串并对字符串值进行索引。当下游管道组件(例如 Estimator
或 Transformer
)使用此字符串索引标签时,必须将组件的输入列设置为此字符串索引列名称。在许多情况下,可以使用 setInputCol
设置输入列。
示例
假设我们有以下包含 id
和 category
列的 DataFrame
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category
是一个包含三个标签的字符串列:“a”、“b”和“c”。将 StringIndexer
应用于 category
作为输入列,并将 categoryIndex
作为输出列,我们应该得到以下结果
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
“a”的索引为 0
,因为它最频繁,其次是索引为 1
的“c”和索引为 2
的“b”。
此外,关于 StringIndexer
如何处理未出现的标签,有三种策略,当您在一个数据集上拟合 StringIndexer
然后将其用于转换另一个数据集时
- 抛出异常(这是默认行为)
- 完全跳过包含未出现标签的行
- 将未出现的标签放在一个特殊的附加存储桶中,位于索引 numLabels 处
示例
让我们回到之前的示例,但这次在以下数据集上重用我们之前定义的 StringIndexer
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
4 | e
如果您没有设置 StringIndexer
如何处理未出现的标签或将其设置为“error”,则会抛出异常。但是,如果您调用了 setHandleInvalid("skip")
,则会生成以下数据集
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
请注意,包含“d”或“e”的行不会出现。
如果您调用 setHandleInvalid("keep")
,则会生成以下数据集
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | d | 3.0
4 | e | 3.0
请注意,包含“d”或“e”的行映射到索引“3.0”
有关 API 的更多详细信息,请参阅StringIndexer Python 文档。
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
有关 API 的更多详细信息,请参阅StringIndexer Scala 文档。
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()
有关 API 的更多详细信息,请参阅StringIndexer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex");
Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
IndexToString
与 StringIndexer
对称,IndexToString
将标签索引列映射回包含原始标签的列,这些标签为字符串。一个常见的用例是使用 StringIndexer
从标签生成索引,使用这些索引训练模型,并使用 IndexToString
从预测索引列中检索原始标签。但是,您可以自由提供自己的标签。
示例
在 StringIndexer
示例的基础上,假设我们有以下包含 id
和 categoryIndex
列的 DataFrame
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
将 IndexToString
应用于 categoryIndex
作为输入列,将 originalCategory
作为输出列,我们能够检索原始标签(它们将从列的元数据中推断出来)
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
有关 API 的更多详细信息,请参阅IndexToString Python 文档。
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
有关 API 的更多详细信息,请参阅IndexToString Scala 文档。
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
println(s"Transformed string column '${indexer.getInputCol}' " +
s"to indexed column '${indexer.getOutputCol}'")
indexed.show()
val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
s"${Attribute.fromStructField(inputColSchema).toString}\n")
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
有关 API 的更多详细信息,请参阅IndexToString Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
Dataset<Row> indexed = indexer.transform(df);
System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
"to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();
StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
Attribute.fromStructField(inputColSchema).toString() + "\n");
IndexToString converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);
System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
"original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
OneHotEncoder
独热编码将表示为标签索引的分类特征映射到一个二进制向量,该向量最多只有一个值,表示在所有特征值集合中是否存在特定特征值。这种编码允许期望连续特征的算法(例如逻辑回归)使用分类特征。对于字符串类型的输入数据,通常首先使用 StringIndexer 对分类特征进行编码。
OneHotEncoder
可以转换多列,为每个输入列返回一个独热编码的输出向量列。通常使用 VectorAssembler 将这些向量合并到单个特征向量中。
OneHotEncoder
支持 handleInvalid
参数,以选择如何在转换数据期间处理无效输入。可用选项包括“keep”(任何无效输入都分配给额外的分类索引)和“error”(抛出错误)。
示例
有关 API 的更多详细信息,请参阅OneHotEncoder Python 文档。
from pyspark.ml.feature import OneHotEncoder
df = spark.createDataFrame([
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
有关 API 的更多详细信息,请参阅OneHotEncoder Scala 文档。
import org.apache.spark.ml.feature.OneHotEncoder
val df = spark.createDataFrame(Seq(
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)
val encoded = model.transform(df)
encoded.show()
有关 API 的更多详细信息,请参阅 OneHotEncoder Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, 1.0),
RowFactory.create(1.0, 0.0),
RowFactory.create(2.0, 1.0),
RowFactory.create(0.0, 2.0),
RowFactory.create(0.0, 1.0),
RowFactory.create(2.0, 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
OneHotEncoder encoder = new OneHotEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryVec1", "categoryVec2"});
OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
VectorIndexer
VectorIndexer
帮助对 Vector
数据集中的分类特征进行索引。它既可以自动确定哪些特征是分类特征,也可以将原始值转换为类别索引。具体来说,它执行以下操作
- 获取一个 Vector 类型的输入列和一个参数
maxCategories
。 - 根据不同值的个数来决定哪些特征应该是分类特征,其中最多具有
maxCategories
个不同值的特征被声明为分类特征。 - 为每个分类特征计算从 0 开始的类别索引。
- 对分类特征进行索引,并将原始特征值转换为索引。
对分类特征进行索引允许决策树和树集成等算法适当地处理分类特征,从而提高性能。
示例
在下面的示例中,我们读入一个标记点数据集,然后使用 VectorIndexer
来决定哪些特征应该被视为分类特征。我们将分类特征值转换为它们的索引。然后,可以将此转换后的数据传递给处理分类特征的算法,例如 DecisionTreeRegressor
。
有关 API 的更多详细信息,请参阅 VectorIndexer Python 文档。
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
有关 API 的更多详细信息,请参阅 VectorIndexer Scala 文档。
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
s"categorical features: ${categoricalFeatures.mkString(", ")}")
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
有关 API 的更多详细信息,请参阅 VectorIndexer Java 文档。
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
Interaction
Interaction
是一个 Transformer
,它接受向量或双精度值列,并生成一个包含每个输入列中一个值的组合的乘积的向量列。
例如,如果您有 2 个向量类型列,每个列都有 3 个维度作为输入列,那么您将得到一个 9 维向量作为输出列。
示例
假设我们有以下 DataFrame,其中包含 “id1”、“vec1” 和 “vec2” 列
id1|vec1 |vec2
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
使用这些输入列应用 Interaction
,则作为输出列的 interactedCol
包含
id1|vec1 |vec2 |interactedCol
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
有关 API 的更多详细信息,请参阅 Interaction Python 文档。
from pyspark.ml.feature import Interaction, VectorAssembler
df = spark.createDataFrame(
[(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)],
["id1", "id2", "id3", "id4", "id5", "id6", "id7"])
assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")
assembled1 = assembler1.transform(df)
assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")
assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")
interacted = interaction.transform(assembled2)
interacted.show(truncate=False)
有关 API 的更多详细信息,请参阅 Interaction Scala 文档。
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler
val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")
val assembler1 = new VectorAssembler().
setInputCols(Array("id2", "id3", "id4")).
setOutputCol("vec1")
val assembled1 = assembler1.transform(df)
val assembler2 = new VectorAssembler().
setInputCols(Array("id5", "id6", "id7")).
setOutputCol("vec2")
val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
val interaction = new Interaction()
.setInputCols(Array("id1", "vec1", "vec2"))
.setOutputCol("interactedCol")
val interacted = interaction.transform(assembled2)
interacted.show(truncate = false)
有关 API 的更多详细信息,请参阅 Interaction Java 文档。
List<Row> data = Arrays.asList(
RowFactory.create(1, 1, 2, 3, 8, 4, 5),
RowFactory.create(2, 4, 3, 8, 7, 9, 8),
RowFactory.create(3, 6, 1, 9, 2, 3, 6),
RowFactory.create(4, 10, 8, 6, 9, 4, 5),
RowFactory.create(5, 9, 2, 7, 10, 7, 3),
RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);
StructType schema = new StructType(new StructField[]{
new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VectorAssembler assembler1 = new VectorAssembler()
.setInputCols(new String[]{"id2", "id3", "id4"})
.setOutputCol("vec1");
Dataset<Row> assembled1 = assembler1.transform(df);
VectorAssembler assembler2 = new VectorAssembler()
.setInputCols(new String[]{"id5", "id6", "id7"})
.setOutputCol("vec2");
Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");
Interaction interaction = new Interaction()
.setInputCols(new String[]{"id1","vec1","vec2"})
.setOutputCol("interactedCol");
Dataset<Row> interacted = interaction.transform(assembled2);
interacted.show(false);
Normalizer
Normalizer
是一个 Transformer
,它转换 Vector
行的数据集,将每个 Vector
归一化为单位范数。它接受参数 p
,该参数指定用于归一化的 p-范数。(默认情况下,$p = 2$。)此归一化可以帮助标准化您的输入数据并改善学习算法的行为。
示例
以下示例演示如何加载 libsvm 格式的数据集,然后将每行归一化为单位 $L^1$ 范数和单位 $L^\infty$ 范数。
有关 API 的更多详细信息,请参阅 Normalizer Python 文档。
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
有关 API 的更多详细信息,请参阅 Normalizer Scala 文档。
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
有关 API 的更多详细信息,请参阅 Normalizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
StandardScaler
StandardScaler
转换 Vector
行的数据集,将每个特征归一化为单位标准偏差和/或零均值。它接受以下参数
withStd
:默认为 True。将数据缩放到单位标准偏差。withMean
:默认为 False。在缩放之前,将数据以均值为中心。它将构建一个密集输出,因此在应用于稀疏输入时要小心。
StandardScaler
是一个 Estimator
,可以在数据集上进行 fit
以生成 StandardScalerModel
;这相当于计算汇总统计信息。然后,该模型可以转换数据集中的 Vector
列,使其具有单位标准偏差和/或零均值特征。
请注意,如果某个特征的标准偏差为零,则它将在该特征的 Vector
中返回默认值 0.0
。
示例
以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征归一化为单位标准偏差。
有关 API 的更多详细信息,请参阅 StandardScaler Python 文档。
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 StandardScaler Scala 文档。
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 StandardScaler Java 文档。
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
RobustScaler
RobustScaler
转换 Vector
行的数据集,移除中位数并根据特定的分位数范围(默认情况下为 IQR:四分位距,即第 1 个四分位数和第 3 个四分位数之间的分位数范围)对数据进行缩放。它的行为与 StandardScaler
非常相似,但是使用中位数和分位数范围来代替均值和标准偏差,这使得它对异常值具有鲁棒性。它接受以下参数
lower
:默认为 0.25。用于计算分位数范围的下分位数,所有特征共享。upper
:默认为 0.75。用于计算分位数范围的上分位数,所有特征共享。withScaling
:默认为 True。将数据缩放到分位数范围。withCentering
:默认为 False。在缩放之前,将数据以中位数为中心。它将构建一个密集输出,因此在应用于稀疏输入时要小心。
RobustScaler
是一个 Estimator
,可以在数据集上进行 fit
以生成 RobustScalerModel
;这相当于计算分位数统计信息。然后,该模型可以转换数据集中的 Vector
列,使其具有单位分位数范围和/或零中位数特征。
请注意,如果某个特征的分位数范围为零,则它将在该特征的 Vector
中返回默认值 0.0
。
示例
以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征归一化为单位分位数范围。
有关 API 的更多详细信息,请参阅 RobustScaler Python 文档。
from pyspark.ml.feature import RobustScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
withScaling=True, withCentering=False,
lower=0.25, upper=0.75)
# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)
# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 RobustScaler Scala 文档。
import org.apache.spark.ml.feature.RobustScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75)
// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)
// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
有关 API 的更多详细信息,请参阅 RobustScaler Java 文档。
import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
RobustScaler scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75);
// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);
// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
MinMaxScaler
MinMaxScaler
转换 Vector
行的数据集,将每个特征重新缩放到特定范围(通常为 [0, 1])。它接受以下参数
min
:默认为 0.0。转换后的下限,所有特征共享。max
:默认为 1.0。转换后的上限,所有特征共享。
MinMaxScaler
计算数据集的汇总统计信息并生成 MinMaxScalerModel
。然后,该模型可以分别转换每个特征,使其位于给定范围内。
特征 E 的重新缩放值计算如下,\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation}
对于 $E_{max} == E_{min}$
的情况,$Rescaled(e_i) = 0.5 * (max + min)$
请注意,由于零值可能会被转换为非零值,因此即使对于稀疏输入,转换器的输出也将是 DenseVector
。
示例
以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征重新缩放到 [0, 1]。
有关 API 的更多详细信息,请参阅 MinMaxScaler Python 文档 和 MinMaxScalerModel Python 文档。
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅 MinMaxScaler Scala 文档 和 MinMaxScalerModel Scala 文档。
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅MinMaxScaler Java 文档和MinMaxScalerModel Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
+ scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
MaxAbsScaler
MaxAbsScaler
转换 Vector
行的数据集,通过除以每个特征中的最大绝对值,将每个特征重新缩放至 [-1, 1] 范围。它不会移动/中心化数据,因此不会破坏任何稀疏性。
MaxAbsScaler
计算数据集的汇总统计信息,并生成 MaxAbsScalerModel
。然后,模型可以将每个特征分别转换为 [-1, 1] 范围。
示例
以下示例演示如何加载 libsvm 格式的数据集,然后将每个特征重新缩放至 [-1, 1]。
有关 API 的更多详细信息,请参阅MaxAbsScaler Python 文档和MaxAbsScalerModel Python 文档。
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅MaxAbsScaler Scala 文档和MaxAbsScalerModel Scala 文档。
import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -8.0)),
(1, Vectors.dense(2.0, 1.0, -4.0)),
(2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
有关 API 的更多详细信息,请参阅MaxAbsScaler Java 文档和MaxAbsScalerModel Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MaxAbsScaler scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
Bucketizer
Bucketizer
将连续特征列转换为特征桶列,其中桶由用户指定。它采用参数
splits
:用于将连续特征映射到桶的参数。使用 n+1 个分割点,有 n 个桶。由分割点 x,y 定义的桶包含范围 [x,y) 中的值,最后一个桶除外,它还包括 y。分割点应严格递增。必须明确提供 -inf、inf 处的值以覆盖所有 Double 值;否则,超出指定分割点的值将被视为错误。splits
的两个示例是Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)
和Array(0.0, 1.0, 2.0)
。
请注意,如果您不知道目标列的上限和下限,则应添加 Double.NegativeInfinity
和 Double.PositiveInfinity
作为分割点的边界,以防止潜在的 Bucketizer 边界异常。
另请注意,您提供的分割点必须严格按升序排列,即 s0 < s1 < s2 < ... < sn
。
更多详细信息,请参阅Bucketizer 的 API 文档。
示例
以下示例演示如何将 Double
列分桶到另一个索引列中。
有关 API 的更多详细信息,请参阅Bucketizer Python 文档。
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
有关 API 的更多详细信息,请参阅Bucketizer Scala 文档。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()
val splitsArray = Array(
Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))
val data2 = Array(
(-999.9, -999.9),
(-0.5, -0.2),
(-0.3, -0.1),
(0.0, 0.0),
(0.2, 0.4),
(999.9, 999.9))
val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")
val bucketizer2 = new Bucketizer()
.setInputCols(Array("features1", "features2"))
.setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
.setSplitsArray(splitsArray)
// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)
println(s"Bucketizer output with [" +
s"${bucketizer2.getSplitsArray(0).length-1}, " +
s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
有关 API 的更多详细信息,请参阅Bucketizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
List<Row> data = Arrays.asList(
RowFactory.create(-999.9),
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2),
RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();
// Bucketize multiple columns at one pass.
double[][] splitsArray = {
{Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
{Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};
List<Row> data2 = Arrays.asList(
RowFactory.create(-999.9, -999.9),
RowFactory.create(-0.5, -0.2),
RowFactory.create(-0.3, -0.1),
RowFactory.create(0.0, 0.0),
RowFactory.create(0.2, 0.4),
RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);
Bucketizer bucketizer2 = new Bucketizer()
.setInputCols(new String[] {"features1", "features2"})
.setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
.setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);
System.out.println("Bucketizer output with [" +
(bucketizer2.getSplitsArray()[0].length-1) + ", " +
(bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
ElementwiseProduct
ElementwiseProduct 使用元素乘法将每个输入向量乘以提供的“权重”向量。换句话说,它通过标量乘数缩放数据集的每一列。这表示输入向量 v
和变换向量 w
之间的哈达玛积,以产生结果向量。
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
示例
以下示例演示如何使用变换向量值变换向量。
有关 API 的更多详细信息,请参阅ElementwiseProduct Python 文档。
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
有关 API 的更多详细信息,请参阅ElementwiseProduct Scala 文档。
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
有关 API 的更多详细信息,请参阅ElementwiseProduct Java 文档。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);
List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
SQLTransformer
SQLTransformer
实现由 SQL 语句定义的转换。目前,我们仅支持 "SELECT ... FROM __THIS__ ..."
之类的 SQL 语法,其中 "__THIS__"
表示输入数据集的基础表。select 子句指定要在输出中显示的字段、常量和表达式,并且可以是 Spark SQL 支持的任何 select 子句。用户还可以使用 Spark SQL 内置函数和 UDF 对这些选定的列进行操作。例如,SQLTransformer
支持以下语句
SELECT a, a + b AS a_b FROM __THIS__
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
示例
假设我们有以下 DataFrame,其中包含列 id
、v1
和 v2
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
这是语句为 "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"
的 SQLTransformer
的输出
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
有关 API 的更多详细信息,请参阅SQLTransformer Python 文档。
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
有关 API 的更多详细信息,请参阅SQLTransformer Scala 文档。
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
有关 API 的更多详细信息,请参阅SQLTransformer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, 1.0, 3.0),
RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
SQLTransformer sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
sqlTrans.transform(df).show();
VectorAssembler
VectorAssembler
是一个转换器,它将给定的列列表组合成单个向量列。它对于将原始特征和由不同特征转换器生成的特征组合成单个特征向量非常有用,以便训练逻辑回归和决策树等 ML 模型。VectorAssembler
接受以下输入列类型:所有数字类型、布尔类型和向量类型。在每一行中,输入列的值将按指定顺序连接成一个向量。
示例
假设我们有一个 DataFrame,其中包含列 id
、hour
、mobile
、userFeatures
和 clicked
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures
是一个包含三个用户特征的向量列。我们希望将 hour
、mobile
和 userFeatures
组合成一个名为 features
的特征向量,并使用它来预测是否 clicked
。如果我们将 VectorAssembler
的输入列设置为 hour
、mobile
和 userFeatures
,并将输出列设置为 features
,则在转换后,我们应该获得以下 DataFrame
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
有关 API 的更多详细信息,请参阅VectorAssembler Python 文档。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
有关 API 的更多详细信息,请参阅VectorAssembler Scala 文档。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
有关 API 的更多详细信息,请参阅VectorAssembler Java 文档。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
VectorSizeHint
有时,为 VectorType
列显式指定向量的大小会很有用。例如,VectorAssembler
使用其输入列的大小信息来生成其输出列的大小信息和元数据。虽然在某些情况下,可以通过检查列的内容来获取此信息,但在流式 DataFrame 中,在流启动之前,内容不可用。VectorSizeHint
允许用户为列显式指定向量大小,以便 VectorAssembler
或可能需要知道向量大小的其他转换器可以使用该列作为输入。
要使用 VectorSizeHint
,用户必须设置 inputCol
和 size
参数。将此转换器应用于 DataFrame 会生成一个新的 DataFrame,其中包含更新的 inputCol
元数据,用于指定向量大小。对生成的 DataFrame 的下游操作可以使用元数据获取此大小。
VectorSizeHint
还可以接受一个可选的 handleInvalid
参数,该参数控制当向量列包含空值或大小错误的向量时的行为。默认情况下,handleInvalid
设置为“error”,表示应抛出异常。此参数也可以设置为“skip”,表示应从结果数据框中过滤掉包含无效值的行,或设置为“optimistic”,表示不应检查列中是否存在无效值,并且应保留所有行。请注意,使用“optimistic”可能会导致结果数据框处于不一致状态,这意味着应用于 VectorSizeHint
列的元数据与该列的内容不匹配。用户应注意避免这种不一致状态。
有关 API 的更多详细信息,请参阅 VectorSizeHint Python 文档。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
(0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
sizeHint = VectorSizeHint(
inputCol="userFeatures",
handleInvalid="skip",
size=3)
datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
有关 API 的更多详细信息,请参阅 VectorSizeHint Scala 文档。
import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq(
(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3)
val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
有关 API 的更多详细信息,请参阅 VectorSizeHint Java 文档。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);
VectorSizeHint sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3);
Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
QuantileDiscretizer
QuantileDiscretizer
接受一个具有连续特征的列,并输出一个具有分箱分类特征的列。分箱数量由 numBuckets
参数设置。使用的分箱数量可能小于此值,例如,如果输入的不同值太少而无法创建足够多的不同分位数。
NaN 值:在 QuantileDiscretizer
拟合期间,将从列中删除 NaN 值。这将生成一个用于进行预测的 Bucketizer
模型。在转换过程中,Bucketizer
在数据集中找到 NaN 值时会引发错误,但用户也可以通过设置 handleInvalid
来选择保留或删除数据集中的 NaN 值。如果用户选择保留 NaN 值,则将对它们进行特殊处理并将其放入自己的分箱中,例如,如果使用了 4 个分箱,则非 NaN 数据将放入分箱 [0-3] 中,但 NaN 将被计入特殊分箱 [4] 中。
算法:使用近似算法选择分箱范围(有关详细说明,请参阅 approxQuantile 的文档)。可以使用 relativeError
参数控制近似的精度。设置为零时,将计算精确的分位数(**注意:**计算精确的分位数是一项昂贵的操作)。下限和上限将为 -Infinity
和 +Infinity
,涵盖所有实数值。
示例
假设我们有一个包含 id
、hour
列的 DataFrame
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour
是一个具有 Double
类型的连续特征。我们想将连续特征转换为分类特征。给定 numBuckets = 3
,我们应该得到以下 DataFrame
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
有关 API 的更多详细信息,请参阅 QuantileDiscretizer Python 文档。
from pyspark.ml.feature import QuantileDiscretizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()
有关 API 的更多详细信息,请参阅 QuantileDiscretizer Scala 文档。
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show(false)
有关 API 的更多详细信息,请参阅 QuantileDiscretizer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 18.0),
RowFactory.create(1, 19.0),
RowFactory.create(2, 8.0),
RowFactory.create(3, 5.0),
RowFactory.create(4, 2.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
QuantileDiscretizer discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3);
Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
Imputer
Imputer
估计器使用缺失值所在列的均值、中位数或众数来填充数据集中的缺失值。输入列应为数字类型。目前,Imputer
不支持分类特征,并且可能会为包含分类特征的列创建不正确的值。Imputer 可以通过 .setMissingValue(custom_value)
来估算除“NaN”以外的自定义值。例如,.setMissingValue(0)
将估算所有出现的 (0)。
**注意** 输入列中的所有 null
值都被视为缺失值,因此也会被估算。
示例
假设我们有一个包含 a
和 b
列的 DataFrame
a | b
------------|-----------
1.0 | Double.NaN
2.0 | Double.NaN
Double.NaN | 3.0
4.0 | 4.0
5.0 | 5.0
在本例中,Imputer 会将所有出现的 Double.NaN
(缺失值的默认值)替换为从相应列中的其他值计算出的均值(默认估算策略)。在本例中,列 a
和 b
的替代值分别为 3.0 和 4.0。转换后,输出列中的缺失值将由相关列的替代值替换。
a | b | out_a | out_b
------------|------------|-------|-------
1.0 | Double.NaN | 1.0 | 4.0
2.0 | Double.NaN | 2.0 | 4.0
Double.NaN | 3.0 | 3.0 | 3.0
4.0 | 4.0 | 4.0 | 4.0
5.0 | 5.0 | 5.0 | 5.0
有关 API 的更多详细信息,请参阅 Imputer Python 文档。
from pyspark.ml.feature import Imputer
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()
有关 API 的更多详细信息,请参阅 Imputer Scala 文档。
import org.apache.spark.ml.feature.Imputer
val df = spark.createDataFrame(Seq(
(1.0, Double.NaN),
(2.0, Double.NaN),
(Double.NaN, 3.0),
(4.0, 4.0),
(5.0, 5.0)
)).toDF("a", "b")
val imputer = new Imputer()
.setInputCols(Array("a", "b"))
.setOutputCols(Array("out_a", "out_b"))
val model = imputer.fit(df)
model.transform(df).show()
有关 API 的更多详细信息,请参阅 Imputer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1.0, Double.NaN),
RowFactory.create(2.0, Double.NaN),
RowFactory.create(Double.NaN, 3.0),
RowFactory.create(4.0, 4.0),
RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
createStructField("a", DoubleType, false),
createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Imputer imputer = new Imputer()
.setInputCols(new String[]{"a", "b"})
.setOutputCols(new String[]{"out_a", "out_b"});
ImputerModel model = imputer.fit(df);
model.transform(df).show();
特征选择器
VectorSlicer
VectorSlicer
是一个转换器,它接受一个特征向量并输出一个具有原始特征子数组的新特征向量。它对于从向量列中提取特征非常有用。
VectorSlicer
接受一个具有指定索引的向量列,然后输出一个新的向量列,其值是通过这些索引选择的。索引有两种类型:
-
表示向量索引的整数索引,
setIndices()
。 -
表示向量中特征名称的字符串索引,
setNames()
。*这要求向量列具有AttributeGroup
,因为实现匹配Attribute
的名称字段。*
可以通过整数和字符串进行指定。此外,您可以同时使用整数索引和字符串名称。必须至少选择一个特征。不允许重复的特征,因此所选索引和名称之间不能重叠。请注意,如果选择了特征名称,则在遇到空输入属性时将引发异常。
输出向量将首先按照给定的顺序对具有所选索引的特征进行排序,然后按照给定的顺序对所选名称进行排序。
示例
假设我们有一个包含 userFeatures
列的 DataFrame
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures
是一个包含三个用户特征的向量列。假设 userFeatures
的第一列全为零,因此我们想删除它并仅选择最后两列。VectorSlicer
使用 setIndices(1, 2)
选择最后两个元素,然后生成一个名为 features
的新向量列
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
还假设我们有 userFeatures
的潜在输入属性,即 ["f1", "f2", "f3"]
,那么我们可以使用 setNames("f2", "f3")
来选择它们。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
有关 API 的更多详细信息,请参阅 VectorSlicer Python 文档。
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
output = slicer.transform(df)
output.select("userFeatures", "features").show()
有关 API 的更多详细信息,请参阅 VectorSlicer Scala 文档。
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
val data = Arrays.asList(
Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
Row(Vectors.dense(-2.0, 2.3, 0.0))
)
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))
val output = slicer.transform(dataset)
output.show(false)
有关 API 的更多详细信息,请参阅 VectorSlicer Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
Attribute[] attrs = {
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);
Dataset<Row> dataset =
spark.createDataFrame(data, (new StructType()).add(group.toStructField()));
VectorSlicer vectorSlicer = new VectorSlicer()
.setInputCol("userFeatures").setOutputCol("features");
vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})
Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
RFormula
RFormula
选择由 R 模型公式 指定的列。目前,我们支持 R 运算符的一个有限子集,包括“~”、“.”、“:”、“+”和“-”。基本运算符是
~
分隔目标和项+
连接项,“+ 0”表示删除截距-
删除一项,“- 1”表示删除截距:
交互(数字值的乘法或二值化分类值).
除目标以外的所有列
假设 a
和 b
是双精度列,我们使用以下简单示例来说明 RFormula
的效果
y ~ a + b
表示模型y ~ w0 + w1 * a + w2 * b
,其中w0
是截距,w1、w2
是系数。y ~ a + b + a:b - 1
表示模型y ~ w1 * a + w2 * b + w3 * a * b
,其中w1、w2、w3
是系数。
RFormula
生成一个特征向量列和一个双精度或字符串列的标签。就像在 R 中将公式用于线性回归时一样,数字列将被转换为双精度。至于字符串输入列,它们将首先使用由 stringOrderType
确定的顺序使用 StringIndexer 进行转换,并删除排序后的最后一类,然后对双精度进行独热编码。
假设一个字符串特征列包含值 {'b', 'a', 'b', 'a', 'c', 'b'}
,我们设置 stringOrderType
来控制编码
stringOrderType | Category mapped to 0 by StringIndexer | Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b') | least frequent category ('c')
'frequencyAsc' | least frequent category ('c') | most frequent category ('b')
'alphabetDesc' | last alphabetical category ('c') | first alphabetical category ('a')
'alphabetAsc' | first alphabetical category ('a') | last alphabetical category ('c')
如果标签列的类型为字符串,则将首先使用 frequencyDesc
排序使用 StringIndexer 将其转换为双精度。如果 DataFrame 中不存在标签列,则将根据公式中指定的响应变量创建输出标签列。
**注意:**排序选项 stringOrderType
不适用于标签列。当对标签列进行索引时,它使用 StringIndexer
中的默认降序频率排序。
示例
假设我们有一个包含 id
、country
、hour
和 clicked
列的 DataFrame
id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
如果我们使用带有公式字符串 clicked ~ country + hour
的 RFormula
,这表示我们想要根据 country
和 hour
预测 clicked
,那么在转换之后,我们应该得到以下 DataFrame
id | country | hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
有关 API 的更多详细信息,请参阅RFormula Python 文档。
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
有关 API 的更多详细信息,请参阅RFormula Scala 文档。
import org.apache.spark.ml.feature.RFormula
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
有关 API 的更多详细信息,请参阅RFormula Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("country", StringType, false),
createStructField("hour", IntegerType, false),
createStructField("clicked", DoubleType, false)
});
List<Row> data = Arrays.asList(
RowFactory.create(7, "US", 18, 1.0),
RowFactory.create(8, "CA", 12, 0.0),
RowFactory.create(9, "NZ", 15, 0.0)
);
Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
ChiSqSelector
ChiSqSelector
代表卡方特征选择。它对具有分类特征的标记数据进行操作。ChiSqSelector 使用卡方独立性检验来决定选择哪些特征。它支持五种选择方法:numTopFeatures
、percentile
、fpr
、fdr
、fwe
numTopFeatures
根据卡方检验选择固定数量的顶部特征。这类似于产生具有最大预测能力的特征。percentile
类似于numTopFeatures
,但选择的是所有特征的一部分,而不是固定数量。fpr
选择所有 p 值低于阈值的特征,从而控制选择的误报率。fdr
使用Benjamini-Hochberg 程序选择所有错误发现率低于阈值的特征。fwe
选择所有 p 值低于阈值的特征。阈值按 1/numFeatures 进行缩放,从而控制选择的家族错误率。默认情况下,选择方法为numTopFeatures
,默认的顶部特征数量设置为 50。用户可以使用setSelectorType
选择选择方法。
示例
假设我们有一个包含 id
、features
和 clicked
列的 DataFrame,它被用作我们要预测的目标
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
如果我们使用 ChiSqSelector
且 numTopFeatures = 1
,那么根据我们的标签 clicked
,features
中的最后一列将被选为最有用的特征
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
有关 API 的更多详细信息,请参阅ChiSqSelector Python 文档。
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
有关 API 的更多详细信息,请参阅ChiSqSelector Scala 文档。
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataset(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
有关 API 的更多详细信息,请参阅ChiSqSelector Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
+ " features selected");
result.show();
UnivariateFeatureSelector
UnivariateFeatureSelector
对具有分类/连续特征的分类/连续标签进行操作。用户可以设置 featureType
和 labelType
,Spark 将根据指定的 featureType
和 labelType
选择要使用的评分函数。
featureType | labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous |categorical | ANOVATest (f_classif)
continuous |continuous | F-value (f_regression)
它支持五种选择模式:numTopFeatures
、percentile
、fpr
、fdr
、fwe
numTopFeatures
选择固定数量的顶部特征。percentile
类似于numTopFeatures
,但选择的是所有特征的一部分,而不是固定数量。fpr
选择所有 p 值低于阈值的特征,从而控制选择的误报率。fdr
使用Benjamini-Hochberg 程序选择所有错误发现率低于阈值的特征。fwe
选择所有 p 值低于阈值的特征。阈值按 1/numFeatures 进行缩放,从而控制选择的家族错误率。
默认情况下,选择模式为 numTopFeatures
,默认的 selectionThreshold 设置为 50。
示例
假设我们有一个包含 id
、features
和 label
列的 DataFrame,它被用作我们要预测的目标
id | features | label
---|--------------------------------|---------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0
如果我们将 featureType
设置为 continuous
,将 labelType
设置为 categorical
,并将 numTopFeatures
设置为 1,则 features
中的最后一列将被选为最有用的特征
id | features | label | selectedFeatures
---|--------------------------------|---------|------------------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0 | [2.3]
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0 | [4.1]
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0 | [2.5]
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0 | [3.8]
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0 | [3.0]
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0 | [2.1]
有关 API 的更多详细信息,请参阅UnivariateFeatureSelector Python 文档。
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
(2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
(3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
(4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
(5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
(6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)
result = selector.fit(df).transform(df)
print("UnivariateFeatureSelector output with top %d features selected using f_classif"
% selector.getSelectionThreshold())
result.show()
有关 API 的更多详细信息,请参阅UnivariateFeatureSelector Scala 文档。
import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)
val df = spark.createDataset(data).toDF("id", "features", "label")
val selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
s" features selected using f_classif")
result.show()
有关 API 的更多详细信息,请参阅UnivariateFeatureSelector Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("UnivariateFeatureSelector output with top "
+ selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
VarianceThresholdSelector
VarianceThresholdSelector
是一种选择器,用于删除低方差特征。(样本)方差不大于 varianceThreshold
的特征将被删除。如果未设置,则 varianceThreshold
默认为 0,这意味着只有方差为 0 的特征(即在所有样本中具有相同值的特征)才会被删除。
示例
假设我们有一个包含 id
和 features
列的 DataFrame,它被用作我们要预测的目标
id | features
---|--------------------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]
6 个特征的样本方差分别为 16.67、0.67、8.17、10.17、5.07 和 11.47。如果我们使用 VarianceThresholdSelector
且 varianceThreshold = 8.0
,则方差 <= 8.0 的特征将被删除
id | features | selectedFeatures
---|--------------------------------|-------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]
有关 API 的更多详细信息,请参阅VarianceThresholdSelector Python 文档。
from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
(2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
(3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
(4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
(5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
(6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])
selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")
result = selector.fit(df).transform(df)
print("Output: Features with variance lower than %f are removed." %
selector.getVarianceThreshold())
result.show()
有关 API 的更多详细信息,请参阅VarianceThresholdSelector Scala 文档。
import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)
val df = spark.createDataset(data).toDF("id", "features")
val selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"Output: Features with variance lower than" +
s" ${selector.getVarianceThreshold} are removed.")
result.show()
有关 API 的更多详细信息,请参阅VarianceThresholdSelector Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VarianceThresholdSelector selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("Output: Features with variance lower than "
+ selector.getVarianceThreshold() + " are removed.");
result.show();
局部敏感哈希
局部敏感哈希 (LSH) 是一类重要的哈希技术,通常用于处理大型数据集的聚类、近似最近邻搜索和异常值检测。
LSH 的总体思路是使用一系列函数(“LSH 族”)将数据点哈希到桶中,以便彼此接近的数据点以高概率位于相同的桶中,而彼此远离的数据点很可能位于不同的桶中。LSH 族在形式上定义如下。
在度量空间 (M, d)
中,其中 M
是一个集合,d
是 M
上的距离函数,LSH 族是满足以下属性的函数族 h
:\[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \]
这个 LSH 族被称为 (r1, r2, p1, p2)
敏感的。
在 Spark 中,不同的 LSH 族在不同的类中实现(例如,MinHash
),并且每个类都提供了用于特征转换、近似相似性连接和近似最近邻的 API。
在 LSH 中,我们将误报定义为一对哈希到同一个桶中的远距离输入特征($d(p,q) \geq r2$
),并将漏报定义为一对哈希到不同桶中的近距离特征($d(p,q) \leq r1$
)。
LSH 操作
我们描述了 LSH 可用于的主要操作类型。拟合的 LSH 模型对每个操作都有对应的方法。
特征转换
特征转换是添加哈希值作为新列的基本功能。这对于降维很有用。用户可以通过设置 inputCol
和 outputCol
来指定输入和输出列名称。
LSH 还支持多个 LSH 哈希表。用户可以通过设置 numHashTables
来指定哈希表的数量。这还用于近似相似性连接和近似最近邻中的OR-amplification。增加哈希表的数量会提高准确性,但也会增加通信成本和运行时间。
outputCol
的类型是 Seq[Vector]
,其中数组的维度等于 numHashTables
,向量的维度当前设置为 1。在未来的版本中,我们将实现 AND 放大,以便用户可以指定这些向量的维度。
近似相似度连接
近似相似性连接接受两个数据集,并近似返回数据集中距离小于用户定义阈值的行的对。近似相似性连接支持连接两个不同的数据集和自连接。自连接将产生一些重复的对。
近似相似性连接接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将被自动转换。在这种情况下,哈希签名将创建为 outputCol
。
在连接的数据集中,可以在 datasetA
和 datasetB
中查询原始数据集。距离列将添加到输出数据集中,以显示返回的每对行之间的真实距离。
近似最近邻搜索
近似最近邻搜索接受一个数据集(特征向量)和一个键(单个特征向量),并近似返回数据集中最接近该向量的指定行数。
近似最近邻搜索接受已转换和未转换的数据集作为输入。如果使用未转换的数据集,它将被自动转换。在这种情况下,哈希签名将创建为 outputCol
。
距离列将添加到输出数据集中,以显示每个输出行与搜索键之间的真实距离。
**注意:**当哈希桶中没有足够的候选项时,近似最近邻搜索返回的行数将少于 k
行。
LSH 算法
用于欧几里得距离的分桶随机投影
分桶随机投影 是欧几里得距离的 LSH 族。欧几里得距离定义如下:\[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \]
其 LSH 族将特征向量 $\mathbf{x}$
投影到随机单位向量 $\mathbf{v}$
上,并将投影结果划分到哈希桶中:\[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \]
其中 r
是用户定义的桶长度。桶长度可用于控制哈希桶的平均大小(以及桶的数量)。较大的桶长度(即,较少的桶)会增加将特征哈希到同一个桶的概率(增加真假阳性的数量)。
分桶随机投影接受任意向量作为输入特征,并支持稀疏向量和密集向量。
有关 API 的更多详细信息,请参阅BucketedRandomProjectionLSH Python 文档。
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.dense([1.0, 1.0]),),
(1, Vectors.dense([1.0, -1.0]),),
(2, Vectors.dense([-1.0, -1.0]),),
(3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(4, Vectors.dense([1.0, 0.0]),),
(5, Vectors.dense([-1.0, 0.0]),),
(6, Vectors.dense([0.0, 1.0]),),
(7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.dense([1.0, 0.0])
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
numHashTables=3)
model = brp.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅BucketedRandomProjectionLSH Scala 文档。
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 1.0)),
(1, Vectors.dense(1.0, -1.0)),
(2, Vectors.dense(-1.0, -1.0)),
(3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(4, Vectors.dense(1.0, 0.0)),
(5, Vectors.dense(-1.0, 0.0)),
(6, Vectors.dense(0.0, 1.0)),
(7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")
val key = Vectors.dense(1.0, 0.0)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
val model = brp.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅BucketedRandomProjectionLSH Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 1.0)),
RowFactory.create(1, Vectors.dense(1.0, -1.0)),
RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(4, Vectors.dense(1.0, 0.0)),
RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
RowFactory.create(6, Vectors.dense(0.0, 1.0)),
RowFactory.create(7, Vectors.dense(0.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
Vector key = Vectors.dense(1.0, 0.0);
BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes");
BucketedRandomProjectionLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
用于 Jaccard 距离的 MinHash
MinHash 是用于 Jaccard 距离的 LSH 族,其中输入特征是自然数集。两个集合的 Jaccard 距离由其交集和并集的基数定义:\[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \]
MinHash 对集合中的每个元素应用随机哈希函数 g
,并取所有哈希值的最小值:\[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]
MinHash 的输入集表示为二进制向量,其中向量索引表示元素本身,向量中的非零值表示该元素在集合中的存在。虽然支持密集向量和稀疏向量,但通常建议使用稀疏向量以提高效率。例如,Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])
表示空间中有 10 个元素。此集合包含元素 2、元素 3 和元素 5。所有非零值都被视为二进制“1”值。
**注意:**空集不能由 MinHash 转换,这意味着任何输入向量必须至少有一个非零条目。
有关 API 的更多详细信息,请参阅MinHashLSH Python 文档。
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
(1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
(2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
(4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
(5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅MinHashLSH Scala 文档。
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes")
val model = mh.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
有关 API 的更多详细信息,请参阅MinHashLSH Java 文档。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);
MinHashLSH mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes");
MinHashLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();