与云基础设施集成

介绍

所有主要的云提供商都在对象存储中提供持久性数据存储。这些不是传统的“POSIX”文件系统。为了存储数百 PB 的数据而没有任何单点故障,对象存储用更简单的 object-name => data 模型替换了传统的目录树文件系统。为了启用远程访问,对象上的操作通常作为(缓慢的)HTTP REST 操作提供。

Spark 可以通过 Hadoop 中实现或由基础设施供应商本身提供的文件系统连接器读取和写入对象存储中的数据。这些连接器使对象存储看起来几乎像文件系统,具有目录和文件以及对它们的经典操作,例如列出、删除和重命名。

重要提示:云对象存储不是真正的文件系统

虽然存储看起来像文件系统,但它们在底层仍然是对象存储,并且区别很大

它们不能直接替换集群文件系统,例如 HDFS,除非明确说明

主要区别在于

这对 Spark 有什么影响?

  1. 读取和写入数据的速度可能比使用普通文件系统慢得多。
  2. 某些目录结构在查询拆分计算期间扫描效率非常低。
  3. Spark 通常在保存 RDD、DataFrame 或 Dataset 时使用的基于重命名的算法可能既慢又不稳定。

出于这些原因,将对象存储用作查询的直接目标或查询链中的中间存储并不总是安全的。请咨询对象存储及其连接器的文档,以确定哪些用途被认为是安全的。

一致性

截至 2021 年,Amazon (S3)、Google Cloud (GCS) 和 Microsoft (Azure Storage、ADLS Gen1、ADLS Gen2) 的对象存储都是一致的

这意味着,只要文件被写入/更新,其他进程就可以列出、查看和打开它,并且将检索到最新版本。这是 AWS S3 的一个已知问题,尤其是在对象创建之前进行的 HEAD 请求的 404 缓存方面。

即使如此:所有存储连接器都没有提供任何保证,说明其客户端如何处理在流读取它们时被覆盖的对象。不要假设可以安全地读取旧文件,也不要假设更改在可见之前存在任何有限的时间段,或者客户端在读取的文件被覆盖时不会简单地失败。

因此:避免覆盖已知/可能被其他客户端积极读取的文件。

其他对象存储是不一致的

这包括 OpenStack Swift

此类存储并不总是安全用作工作目标,请咨询每个存储的特定文档。

安装

在类路径上有相关库,并且 Spark 配置了有效的凭据后,可以通过使用它们的 URL 作为数据路径来读取或写入对象。例如,sparkContext.textFile("s3a://landsat-pds/scene_list.gz") 将使用 s3a 连接器创建存储在 S3 中的文件 scene_list.gz 的 RDD。

要将相关库添加到应用程序的类路径,请包含 hadoop-cloud 模块及其依赖项。

在 Maven 中,将以下内容添加到 pom.xml 文件中,假设 spark.version 设置为所选的 Spark 版本

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.12</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

基于 Apache Spark 的商业产品通常直接设置与云基础设施通信的类路径,在这种情况下,可能不需要此模块。

身份验证

Spark 作业必须对对象存储进行身份验证才能访问其中的数据。

  1. 当 Spark 在云基础设施中运行时,凭据通常会自动设置。
  2. spark-submit 读取 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN 环境变量,并为 Amazon S3 的 s3ns3a 连接器设置相关的身份验证选项。
  3. 在 Hadoop 集群中,可以在 core-site.xml 文件中设置设置。
  4. 身份验证详细信息可以手动添加到 spark-defaults.conf 中的 Spark 配置中
  5. 或者,可以在用于配置应用程序 SparkContextSparkConf 实例中以编程方式设置它们。

重要提示:切勿将身份验证密钥检查到源代码存储库中,尤其是公共存储库。

请咨询 Hadoop 文档,了解相关的配置和安全选项。

配置

每个云连接器都有自己的一组配置参数,同样,请咨询相关文档。

对于其一致性模型意味着基于重命名的提交是安全的那些对象存储,请使用 FileOutputCommitter v2 算法以提高性能;v1 用于安全。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

这比“版本 1”算法在作业结束时进行的重命名操作更少。由于它仍然使用 rename() 来提交文件,因此在对象存储没有一致的元数据/列表时使用它是不安全的。

提交器还可以设置为在清理临时文件时忽略失败;这降低了瞬态网络问题升级为作业失败的风险

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

原始的 v1 提交算法将成功任务的输出重命名为作业尝试目录,然后在作业提交阶段将该目录中的所有文件重命名到最终目标

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

模拟 Amazon S3 上的重命名操作的缓慢性能使得该算法非常、非常慢。推荐的解决方案是切换到 S3“零重命名”提交器(见下文)。

作为参考,以下是不同存储和连接器在重命名目录时的性能和安全特性

存储 连接器 目录重命名安全性 重命名性能
Amazon S3 s3a 不安全 O(data)
Azure Storage wasb 安全 O(files)
Azure Datalake Gen 2 abfs 安全 O(1)
Google Cloud Storage gs 混合 O(files)
  1. 由于存储临时文件会产生费用;请定期删除名为 "_temporary" 的目录。
  2. 对于 AWS S3,请设置多部分上传可以保持挂起的最大时间。这避免了因未完成的上传而产生费用。
  3. 对于 Google 云,目录重命名是逐文件进行的。考虑使用 v2 提交器,并且只编写生成幂等输出的代码,包括文件名,因为它与 v1 提交器相比并不更不安全,而且速度更快。

Parquet I/O 设置

为了在使用 Parquet 数据时获得最佳性能,请使用以下设置

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

这些设置最大限度地减少了查询期间读取的数据量。

ORC I/O 设置

为了在使用 ORC 数据时获得最佳性能,请使用以下设置

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

同样,这些设置最大限度地减少了查询期间读取的数据量。

Spark Streaming 和对象存储

Spark Streaming 可以通过创建 FileInputDStream 来监控添加到对象存储的文件,通过调用 StreamingContext.textFileStream() 来监控存储中的路径。

  1. 扫描新文件的时间与路径下文件的数量成正比,而不是文件的数量,因此它可能成为一项缓慢的操作。窗口的大小需要设置为处理这种情况。

  2. 文件只有在完全写入后才会出现在对象存储中;不需要写入然后重命名的工作流程来确保在文件仍在写入时不会拾取它们。应用程序可以直接写入监控的目录。

  3. 在名为 FileContextBasedCheckpointFileManager 的默认检查点文件管理器的情况下,流应该只被检查点到实现快速且原子的 rename() 操作的存储中。否则,检查点可能很慢,并且可能不可靠。在使用 S3A 连接器的 AWS S3 上,使用 Hadoop 3.3.1 或更高版本,可以使用基于可中止流的检查点文件管理器(通过将 spark.sql.streaming.checkpointFileManagerClass 配置设置为 org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager),它消除了缓慢的重命名。在这种情况下,用户必须格外小心,避免在多个并行运行的查询之间重复使用检查点位置,因为这会导致检查点数据的损坏。

安全快速地将工作提交到云存储。

如前所述,在任何表现出最终一致性的对象存储(例如:S3)上,基于重命名的提交都是危险的,并且通常比经典的文件系统重命名慢。

一些对象存储连接器提供自定义提交器,用于在不使用重命名的情况下提交任务和作业。

Hadoop S3A 提交器

在使用 Hadoop 3.1 或更高版本构建的 Spark 版本中,hadoop-aws JAR 包含可安全用于通过 s3a 连接器访问的 S3 存储的提交者。

这些提交者不会将数据写入存储上的临时目录以进行重命名,而是将文件写入最终目标,但不会发出最终的 POST 命令以使大型“多部分”上传可见。这些操作将推迟到作业提交本身。因此,任务和作业提交速度更快,任务失败不会影响结果。

要切换到 S3A 提交者,请使用使用 Hadoop 3.1 或更高版本构建的 Spark 版本,并通过以下选项切换提交者。

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

它已通过 Spark 支持的最常见格式进行测试。

mydataframe.write.format("parquet").save("s3a://bucket/destination")

有关这些提交者的更多详细信息,请参阅 最新的 Hadoop 文档,其中包含在 使用 S3A 提交者将工作提交到 S3 中介绍的 S3A 提交者详细信息。

注意:根据使用的提交者,在 Hadoop 3.3.1 之前的版本中,进行中的统计信息可能会被低估。

Amazon EMR:EMRFS S3 优化提交器。

Amazon EMR 为 parquet 数据提供了自己的 S3 感知提交者。有关使用说明,请参阅 EMRFS S3 优化提交者

有关实现和性能详细信息,请参阅 [“使用 EMRFS S3 优化提交者提高 Apache Spark 在 Apache Parquet 格式上的写入性能”](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

Azure 和 Google 云存储:MapReduce 中间清单提交器。

2022 年 9 月之后发布的 hadoop-mapreduce-core JAR 版本(3.3.5 及更高版本)包含一个针对 Azure ADLS 第 2 代和 Google Cloud Storage 的性能和弹性进行了优化的提交者。

此提交者,“清单提交者”使用清单文件将目录列表信息从任务提交者传播到作业提交者。这些清单可以原子地写入,而无需依赖原子目录重命名,这是 GCS 缺少的功能。

作业提交者读取这些清单,并将文件从任务输出目录直接重命名到目标目录,并行进行,并可选地进行速率限制以避免限制 IO。这在对象存储上提供了性能和可扩展性。

对 Azure 存储使用它对于作业正确性并不重要;经典的 FileOutputCommitter 在那里是安全的 - 但是这个新的提交者在具有深层和广泛目录树的大型作业中扩展得更好。

由于 Google GCS 不支持原子目录重命名,因此应在可用时使用清单提交者。

此提交者确实支持“动态分区覆盖”(见下文)。

有关此提交者的可用性和使用情况的详细信息,请参阅所用 Hadoop 版本的 hadoop 文档。

它在 Hadoop 3.3.4 或更早版本上不可用。

IBM Cloud Object Storage:Stocator

IBM 为 IBM Cloud Object Storage 和 OpenStack Swift 提供 Stocator 输出提交者。

源代码、文档和版本可以在 https://github.com/CODAIT/stocator 中找到。

云提交器和 INSERT OVERWRITE TABLE

Spark 具有一个名为“动态分区覆盖”的功能;可以更新表,并且只有添加新数据的那些分区的内容会被替换。

这用于 INSERT OVERWRITE TABLE 形式的 SQL 语句,以及在“覆盖”模式下写入数据集时。

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

此功能使用文件重命名,并且对提交者和文件系统都有特定要求。

  1. 提交者的工作目录必须位于目标文件系统中。
  2. 目标文件系统必须支持高效的文件重命名。

S3A 提交者和 AWS S3 存储满足这些条件。

其他云存储的提交者可能支持此功能,并向 Spark 声明它们兼容。如果在通过 hadoop 提交者写入数据时需要动态分区覆盖,Spark 将始终在使用原始 FileOutputCommitter 时允许这样做。对于其他提交者,在实例化后,Spark 将探测它们是否声明兼容,如果它们声明兼容,则允许操作。

如果提交者不兼容,操作将失败,并显示错误消息 PathOutputCommitter does not support dynamicPartitionOverwrite

除非目标文件系统有兼容的提交者,否则唯一的解决方案是使用对云友好的格式存储数据。

进一步阅读

以下是 Apache 和云提供商提供的标准连接器的文档。

云提交者问题和 hive 兼容解决方案