与云基础设施集成

简介

所有主要的云提供商都通过对象存储提供持久性数据存储。它们不是经典的“POSIX”文件系统。为了在没有任何单点故障的情况下存储数百 PB 的数据,对象存储用更简单的 对象名 => 数据 模型取代了经典的文件系统目录树。为了实现远程访问,对象上的操作通常作为(慢速)HTTP REST 操作提供。

Spark 可以通过 Hadoop 中实现或由基础设施供应商自己提供的文件系统连接器来读写对象存储中的数据。这些连接器使对象存储看起来几乎像文件系统,具有目录、文件以及列表、删除和重命名等经典操作。

重要提示:云对象存储不是真正的文件系统

尽管这些存储看起来像是文件系统,但它们本质上仍然是对象存储,并且其差异是显著的

它们不能直接替代 HDFS 等集群文件系统,除非另有明确说明

主要差异包括

这如何影响 Spark?

  1. 读写数据可能比使用普通文件系统慢得多。
  2. 某些目录结构在查询拆分计算期间扫描效率可能非常低。
  3. Spark 在保存 RDD、DataFrame 或 Dataset 时通常采用的基于重命名的提交工作算法可能既慢又不可靠。

因此,将对象存储直接用作查询目标或查询链中的中间存储并不总是安全的。请查阅对象存储及其连接器的文档,以确定哪些用法被认为是安全的。

一致性

截至 2021 年,Amazon (S3)、Google Cloud (GCS) 和 Microsoft (Azure Storage, ADLS Gen1, ADLS Gen2) 的对象存储都具有一致性

这意味着文件一旦写入/更新,就可以被其他进程列出、查看和打开,并且会检索到最新版本。这是 AWS S3 的一个已知问题,特别是对象创建之前 HEAD 请求的 404 缓存问题。

即便如此:没有任何存储连接器能保证其客户端如何处理在流读取对象时被覆盖的对象。不要假定旧文件可以安全读取,也不要假定更改可见有任何有界时间段——或者,事实上,客户端不会在正在读取的文件被覆盖时直接失败。

因此:避免覆盖已知/可能被其他客户端积极读取的文件。

其他对象存储不一致

这包括OpenStack Swift

此类存储不总是能安全地用作工作目标——请查阅每个存储的特定文档。

安装

当相关库在 classpath 中且 Spark 配置了有效凭据时,可以通过使用其 URL 作为数据路径来读取或写入对象。例如,sparkContext.textFile("s3a://landsat-pds/scene_list.gz") 将使用 s3a 连接器创建存储在 S3 中的文件 scene_list.gz 的 RDD。

要将相关库添加到应用程序的 classpath 中,请包含 hadoop-cloud 模块及其依赖项。

在 Maven 中,将以下内容添加到 pom.xml 文件中,假设 spark.version 已设置为所选的 Spark 版本

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.13</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

基于 Apache Spark 的商业产品通常直接设置与云基础设施通信的 classpath,在这种情况下可能不需要此模块。

认证

Spark 作业必须向对象存储进行认证才能访问其中的数据。

  1. 当 Spark 在云基础设施中运行时,凭据通常会自动设置。
  2. spark-submit 能够读取 AWS_ENDPOINT_URLAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN 环境变量,并为 Amazon S3 的 s3ns3a 连接器设置相关的认证选项。
  3. 在 Hadoop 集群中,设置可以在 core-site.xml 文件中完成。
  4. 认证详细信息可以手动添加到 spark-defaults.conf 中的 Spark 配置中
  5. 或者,它们可以在用于配置应用程序 SparkContextSparkConf 实例中以编程方式设置。

重要提示:切勿将认证密钥检入源代码仓库,尤其是公共仓库

请查阅Hadoop 文档以获取相关的配置和安全选项。

配置

每个云连接器都有自己的一套配置参数,同样,请查阅相关文档。

对于其一致性模型意味着基于重命名的提交是安全的那些对象存储,请使用 FileOutputCommitter v2 算法以提高性能;使用 v1 算法以确保安全性。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

与“版本 1”算法相比,这在作业结束时进行的重命名操作更少。由于它仍然使用 rename() 来提交文件,因此当对象存储没有一致的元数据/列表时,使用它是不安全的。

提交器还可以设置为在清理临时文件时忽略失败;这降低了瞬时网络问题升级为作业失败的风险。

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

原始的 v1 提交算法将成功任务的输出重命名到作业尝试目录,然后在作业提交阶段将该目录中的所有文件重命名到最终目的地。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

Amazon S3 上模拟重命名操作的缓慢性能使得该算法非常非常慢。推荐的解决方案是切换到 S3 “零重命名”提交器(参见下文)。

供参考,以下是不同存储和连接器在重命名目录时的性能和安全性特征

存储 连接器 目录重命名安全性 重命名性能
Amazon S3 s3a 不安全 O(data)
Azure Storage wasb 安全 O(files)
Azure Datalake Gen 2 abfs 安全 O(1)
Google Cloud Storage gs 混合 O(files)
  1. 由于存储临时文件可能会产生费用;请定期删除名为 "_temporary" 的目录。
  2. 对于 AWS S3,请设置分段上传未完成的最长保留时间限制。这可以避免因未完成的上传而产生费用。
  3. 对于 Google Cloud,目录重命名是逐文件进行的。考虑使用 v2 提交器,并且只编写生成幂等输出(包括文件名)的代码,因为它并不比 v1 提交器更不安全,而且速度更快。

Parquet I/O 设置

使用 Parquet 数据时,为获得最佳性能,请使用以下设置

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

这些设置最大限度地减少了查询期间读取的数据量。

ORC I/O 设置

使用 ORC 数据时,为获得最佳性能,请使用以下设置

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

同样,这些设置最大限度地减少了查询期间读取的数据量。

Spark Streaming 与对象存储

Spark Streaming 可以监控添加到对象存储中的文件,方法是创建一个 FileInputDStream,通过调用 StreamingContext.textFileStream() 来监控存储中的路径。

  1. 扫描新文件所需的时间与路径下的文件总数成正比,而不是与文件数成正比,因此这可能成为一个缓慢的操作。需要设置窗口大小来处理此问题。

  2. 文件只有在完全写入后才会出现在对象存储中;不需要写入-然后-重命名的工作流程来确保文件在写入过程中不会被拾取。应用程序可以直接写入被监控的目录。

  3. 对于名为 FileContextBasedCheckpointFileManager 的默认检查点文件管理器,流应该只检查点到实现快速且原子 rename() 操作的存储。否则,检查点可能很慢且可能不可靠。在 AWS S3 上,使用 Hadoop 3.3.1 或更高版本,并使用 S3A 连接器时,可以使用基于可中止流的检查点文件管理器(通过将 spark.sql.streaming.checkpointFileManagerClass 配置设置为 org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager),这消除了缓慢的重命名操作。在这种情况下,用户必须特别小心,避免在并行运行的多个查询之间重复使用检查点位置,因为这可能导致检查点数据损坏。

安全快速地将工作提交到云存储。

如前所述,在任何表现出最终一致性(例如:S3)的对象存储上,通过重命名提交是危险的,而且通常比经典文件系统重命名更慢。

一些对象存储连接器提供自定义提交器,用于在不使用重命名的情况下提交任务和作业。

Hadoop S3A 提交器

在用 Hadoop 3.1 或更高版本构建的 Spark 版本中,hadoop-aws JAR 包含可安全用于通过 s3a 连接器访问 S3 存储的提交器。

这些提交器不是将数据写入存储上的临时目录以进行重命名,而是将文件写入最终目的地,但不会发出最终的 POST 命令以使大型“多部分”上传可见。这些操作会推迟到作业提交本身。因此,任务和作业提交速度快得多,并且任务失败不会影响结果。

要切换到 S3A 提交器,请使用用 Hadoop 3.1 或更高版本构建的 Spark 版本,并通过以下选项切换提交器。

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

它已通过 Spark 支持的最常见格式进行了测试。

mydataframe.write.format("parquet").save("s3a://bucket/destination")

有关这些提交器的更多详细信息,请参见最新的 Hadoop 文档,其中 S3A 提交器的详细信息在使用 S3A 提交器将工作提交到 S3中有所介绍。

注意:根据使用的提交器,在 Hadoop 3.3.1 之前的版本中,进行中的统计信息可能会报告不足。

Amazon EMR:EMRFS S3 优化提交器

Amazon EMR 拥有自己的 S3 感知提交器,用于处理 Parquet 数据。有关使用说明,请参见EMRFS S3 优化提交器

有关实现和性能的详细信息,请参见“[使用 EMRFS S3 优化提交器提高 Apache Spark 在 Apache Parquet 格式上的写入性能](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)

Azure 和 Google 云存储:MapReduce 中间清单提交器。

2022 年 9 月之后发布(3.3.5 及更高版本)的 hadoop-mapreduce-core JAR 版本包含一个提交器,该提交器已针对 Azure ADLS Generation 2 和 Google Cloud Storage 上的性能和弹性进行了优化。

这个提交器,即“清单提交器”,使用清单文件将目录列表信息从任务提交器传播到作业提交器。这些清单可以原子地写入,而不依赖于原子目录重命名,这是 GCS 所缺乏的。

作业提交器读取这些清单,并将文件从任务输出目录直接并行重命名到目标目录,并可选地进行速率限制以避免 IO 限制。这在对象存储上提供了性能和可伸缩性。

在 Azure 存储中使用此功能对于作业正确性而言并非关键;经典的 FileOutputCommitter 在那里是安全的——然而,对于具有深层和宽广目录树的大型作业,这个新的提交器具有更好的扩展性。

由于 Google GCS 不支持原子目录重命名,因此应在可用时使用清单提交器。

此提交器确实支持“动态分区覆盖”(见下文)。

有关此提交器的可用性和使用详细信息,请查阅所用 Hadoop 版本的 Hadoop 文档。

它在 Hadoop 3.3.4 或更早版本中不可用。

IBM 云对象存储:Stocator

IBM 为 IBM Cloud Object Storage 和 OpenStack Swift 提供了 Stocator 输出提交器。

源代码、文档和发布版本可在Stocator - Apache Spark 存储连接器中找到。

云提交器和 INSERT OVERWRITE TABLE

Spark 有一个名为“动态分区覆盖”的功能;可以更新表,并且只有那些添加了新数据的分区才会替换其内容。

这用于 INSERT OVERWRITE TABLE 形式的 SQL 语句,以及当 Datasets 以“覆盖”模式写入时。

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

此功能使用文件重命名,并且对提交器和文件系统都有特定要求

  1. 提交器的工作目录必须在目标文件系统中。
  2. 目标文件系统必须高效支持文件重命名。

S3A 提交器和 AWS S3 存储不满足这些条件。

其他云存储的提交器可能支持此功能,并向 Spark 声明它们兼容。如果通过 Hadoop 提交器写入数据时需要动态分区覆盖,则当使用原始 FileOutputCommitter 时,Spark 总是允许这样做。对于其他提交器,在实例化之后,Spark 将探测它们对兼容性的声明,如果它们声明兼容,则允许操作。

如果提交器不兼容,操作将失败,并显示错误消息 PathOutputCommitter does not support dynamicPartitionOverwrite

除非目标文件系统有兼容的提交器,否则唯一的解决方案是使用云友好的数据存储格式。

拓展阅读

以下是 Apache 和云提供商提供的标准连接器文档。

云提交器问题和与 Hive 兼容的解决方案