ML Pipelines

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

在本节中,我们将介绍 ML Pipelines 的概念。ML Pipelines 提供了一组统一的、构建在 DataFrames 之上的高级 API,可以帮助用户创建和调整实用的机器学习 pipelines。

目录

Pipelines 中的主要概念

MLlib 标准化了机器学习算法的 API,从而更容易将多个算法组合成单个 pipeline 或工作流程。本节介绍 Pipelines API 引入的关键概念,其中 pipeline 概念主要受到 scikit-learn 项目的启发。

DataFrame

机器学习可以应用于各种数据类型,例如向量、文本、图像和结构化数据。 此 API 采用 Spark SQL 中的 DataFrame,以支持各种数据类型。

DataFrame 支持许多基本和结构化类型;有关支持类型的列表,请参阅 Spark SQL 数据类型参考。 除了 Spark SQL 指南中列出的类型之外,DataFrame 还可以使用 ML Vector 类型。

可以从常规 RDD 隐式或显式地创建 DataFrame。 有关示例,请参见下面的代码示例和 Spark SQL 编程指南

DataFrame 中的列已命名。 下面的代码示例使用诸如“text”,“features”和“label”之类的名称。

Pipeline 组件

Transformers

Transformer 是一个抽象,包括特征转换器和学习模型。 从技术上讲,Transformer 实现了一种方法 transform(),该方法将一个 DataFrame 转换为另一个 DataFrame,通常是通过附加一列或多列。 例如

Estimators

Estimator 抽象了学习算法或任何在数据上拟合或训练的算法的概念。 从技术上讲,Estimator 实现了一种方法 fit(),该方法接受 DataFrame 并生成 Model,后者是 Transformer。 例如,诸如 LogisticRegression 之类的学习算法是一个 Estimator,并且调用 fit() 会训练一个 LogisticRegressionModel,后者是 Model,因此也是 Transformer

pipeline 组件的属性

Transformer.transform()s 和 Estimator.fit()s 都是无状态的。将来,可以通过其他概念来支持有状态算法。

TransformerEstimator 的每个实例都有一个唯一的 ID,这在指定参数时很有用(如下所述)。

Pipeline

在机器学习中,通常运行一系列算法来处理数据并从中学习。 例如,一个简单的文本文档处理工作流程可能包括几个阶段

MLlib 将这样的工作流程表示为 Pipeline,它由一系列 PipelineStages(Transformers 和 Estimators)组成,这些阶段将按特定顺序运行。 在本节中,我们将使用这个简单的工作流程作为一个运行示例。

工作原理

Pipeline 指定为一系列阶段,每个阶段都是 TransformerEstimator。 这些阶段按顺序运行,并且输入 DataFrame 在通过每个阶段时会进行转换。 对于 Transformer 阶段,将在 DataFrame 上调用 transform() 方法。 对于 Estimator 阶段,将调用 fit() 方法以生成 Transformer(它成为 PipelineModel 或已拟合的 Pipeline 的一部分),并且将在 DataFrame 上调用该 Transformertransform() 方法。

我们为简单的文本文档工作流程说明了这一点。 下图是 Pipeline训练时间用法。

ML Pipeline Example

上面,顶行表示具有三个阶段的 Pipeline。 前两个(TokenizerHashingTF)是 Transformers(蓝色),第三个(LogisticRegression)是 Estimator(红色)。 底行表示通过 pipeline 的数据流,其中圆柱体表示 DataFrames。 在具有原始文本文档和标签的原始 DataFrame 上调用 Pipeline.fit() 方法。 Tokenizer.transform() 方法将原始文本文档拆分为单词,并将带有单词的新列添加到 DataFrameHashingTF.transform() 方法将单词列转换为特征向量,并将带有这些向量的新列添加到 DataFrame。 现在,由于 LogisticRegression 是一个 Estimator,因此 Pipeline 首先调用 LogisticRegression.fit() 以生成 LogisticRegressionModel。 如果 Pipeline 具有更多的 Estimators,则它会在将 DataFrame 传递到下一个阶段之前,在 DataFrame 上调用 LogisticRegressionModeltransform() 方法。

一个 Pipeline 是一个 Estimator。因此,在 Pipelinefit() 方法运行后,它会生成一个 PipelineModel,它是一个 Transformer。 这个 PipelineModel 在 *测试时 * 使用;下图说明了这种用法。

ML PipelineModel Example

在上图中,PipelineModel 具有与原始 Pipeline 相同数量的阶段,但原始 Pipeline 中的所有 Estimator 都已变为 Transformer。 当在测试数据集上调用 PipelineModeltransform() 方法时,数据会按顺序传递到已拟合的管道中。 每个阶段的 transform() 方法都会更新数据集并将其传递到下一阶段。

PipelinePipelineModel 有助于确保训练和测试数据通过相同的特征处理步骤。

详细信息

DAG PipelinesPipeline 的阶段被指定为一个有序数组。 这里给出的例子都是线性 Pipelines,即每个阶段都使用前一阶段产生的数据的 Pipelines。 只要数据流图形成有向无环图 (DAG),就可以创建非线性 Pipelines。 此图当前根据每个阶段的输入和输出列名(通常指定为参数)隐式指定。 如果 Pipeline 形成 DAG,则必须按拓扑顺序指定这些阶段。

运行时检查:由于 Pipelines 可以对具有不同类型的 DataFrames 进行操作,因此它们无法使用编译时类型检查。 相反,Pipelines 和 PipelineModels 在实际运行 Pipeline 之前会进行运行时检查。 此类型检查是使用 DataFrame *schema* 完成的,它描述了 DataFrame 中列的数据类型。

唯一的 Pipeline 阶段Pipeline 的阶段应该是唯一的实例。 例如,不应将相同的实例 myHashingTF 两次插入到 Pipeline 中,因为 Pipeline 阶段必须具有唯一的 ID。 但是,可以将不同的实例 myHashingTF1myHashingTF2(均为 HashingTF 类型)放入同一个 Pipeline 中,因为将使用不同的 ID 创建不同的实例。

参数

MLlib EstimatorTransformer 使用统一的 API 来指定参数。

Param 是一个具有自包含文档的命名参数。ParamMap 是一组(参数,值)对。

有两种主要方法可以将参数传递给算法

  1. 为实例设置参数。例如,如果 lrLogisticRegression 的一个实例,则可以调用 lr.setMaxIter(10) 使 lr.fit() 最多使用 10 次迭代。此 API 类似于 spark.mllib 包中使用的 API。
  2. ParamMap 传递给 fit()transform()ParamMap 中的任何参数都将覆盖先前通过 setter 方法指定的参数。

参数属于 EstimatorTransformer 的特定实例。 例如,如果我们有两个 LogisticRegression 实例 lr1lr2,那么我们可以构建一个指定了两个 maxIter 参数的 ParamMapParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。 如果 Pipeline 中有两个具有 maxIter 参数的算法,这将很有用。

ML 持久化:保存和加载 Pipelines

通常,将模型或管道保存到磁盘以供以后使用是值得的。 在 Spark 1.6 中,模型导入/导出功能已添加到 Pipeline API。 从 Spark 2.3 开始,基于 DataFrame 的 API 在 spark.mlpyspark.ml 中具有完整的覆盖范围。

ML 持久性适用于 Scala、Java 和 Python。 但是,R 目前使用修改后的格式,因此在 R 中保存的模型只能在 R 中加载回来; 这应该在将来修复,并在 SPARK-15572 中进行跟踪。

ML 持久化的向后兼容性

通常,MLlib 维护 ML 持久性的向后兼容性。 也就是说,如果在 Spark 的一个版本中保存 ML 模型或 Pipeline,那么应该能够将其加载回来并在 Spark 的未来版本中使用它。 但是,存在一些罕见的例外情况,如下所述。

模型持久性:是否可以使用 Spark 版本 X 中的 Apache Spark ML 持久性保存的模型或 Pipeline 可以由 Spark 版本 Y 加载?

模型行为:Spark 版本 X 中的模型或 Pipeline 在 Spark 版本 Y 中的行为是否相同?

对于模型持久性和模型行为,跨次要版本或补丁版本的任何重大更改都会在 Spark 版本发行说明中报告。 如果发行说明中未报告中断,则应将其视为要修复的错误。

代码示例

本节提供了代码示例,说明了上面讨论的功能。 有关更多信息,请参阅 API 文档(ScalaJavaPython)。

示例:Estimator、Transformer 和 Param

此示例涵盖了 EstimatorTransformerParam 的概念。

有关 API 的更多详细信息,请参阅 Estimator Python 文档Transformer Python 文档Params Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore

# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))
在 Spark 存储库的“examples/src/main/python/ml/estimator_transformer_param_example.py”中查找完整的示例代码。

有关 API 的详细信息,请参阅 Estimator Scala 文档Transformer Scala 文档Params Scala 文档

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala”中查找完整的示例代码。

有关 API 的详细信息,请参阅 Estimator Java 文档Transformer Java 文档Params Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Prepare training data.
List<Row> dataTraining = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
    RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");

// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);

// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());

// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
  .put(lr.maxIter().w(20))  // Specify 1 Param.
  .put(lr.maxIter(), 30)  // This overwrites the original maxIter.
  .put(lr.regParam().w(0.1), lr.threshold().w(0.55));  // Specify multiple Params.

// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
  .put(lr.probabilityCol().w("myProbability"));  // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());

// Prepare test documents.
List<Row> dataTest = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
    RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
    RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaEstimatorTransformerParamExample.java”中查找完整的示例代码。

示例:Pipeline

此示例遵循上图中所示的简单文本文档 Pipeline

有关 API 的更多详细信息,请参阅 Pipeline Python 文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )
在 Spark 存储库的“examples/src/main/python/ml/pipeline_example.py”中查找完整的示例代码。

有关 API 的详细信息,请参阅 Pipeline Scala 文档

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
在 Spark 存储库的“examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala”中查找完整的示例代码。

有关 API 的详细信息,请参阅 Pipeline Java 文档

import java.util.Arrays;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
  new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
  new JavaLabeledDocument(1L, "b d", 0.0),
  new JavaLabeledDocument(2L, "spark f g h", 1.0),
  new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);

// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
  new JavaDocument(4L, "spark i j k"),
  new JavaDocument(5L, "l m n"),
  new JavaDocument(6L, "spark hadoop spark"),
  new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);

// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
在 Spark 存储库的“examples/src/main/java/org/apache/spark/examples/ml/JavaPipelineExample.java”中查找完整的示例代码。

模型选择(超参数调优)

使用 ML Pipelines 的一个很大的好处是超参数优化。 有关自动模型选择的更多信息,请参见 ML Tuning Guide