与云基础设施集成

简介

所有主要的云提供商都在对象存储中提供持久性数据存储。 这些不是经典的“POSIX”文件系统。 为了存储数百 PB 的数据而没有任何单点故障,对象存储用更简单的 对象名称 => 数据 模型替换了经典的文件系统目录树。 为了实现远程访问,通常以(慢速)HTTP REST 操作的形式提供对对象的操作。

Spark 可以通过 Hadoop 中实现的文件系统连接器或由基础设施供应商自己提供的连接器来读取和写入对象存储中的数据。 这些连接器使对象存储看起来几乎像文件系统,具有目录和文件以及对其的经典操作,例如列表、删除和重命名。

重要提示:云对象存储不是真正的文件系统

虽然存储看起来像文件系统,但在底层它们仍然是对象存储,并且差异很大

除非明确声明,否则它们不能用作 HDFS 等集群文件系统的直接替代品。

主要区别是

这对 Spark 有什么影响?

  1. 读取和写入数据的速度可能比使用普通文件系统慢得多。
  2. 某些目录结构在查询拆分计算期间扫描可能非常低效。
  3. 当保存 RDD、DataFrame 或 Dataset 时,Spark 通常提交工作的基于重命名的算法可能既慢又不可靠。

由于这些原因,将对象存储用作查询的直接目标或查询链中的中间存储并不总是安全的。 请查阅对象存储及其连接器的文档,以确定哪些用法被认为是安全的。

一致性

截至 2021 年,Amazon (S3)、Google Cloud (GCS) 和 Microsoft (Azure Storage、ADLS Gen1、ADLS Gen2) 的对象存储都是一致的

这意味着一旦写入/更新文件,其他进程就可以列出、查看和打开该文件 - 并且将检索最新版本。 这是 AWS S3 的一个已知问题,特别是对象创建之前发出的 HEAD 请求的 404 缓存。

即便如此:没有一个存储连接器保证其客户端如何处理在流读取对象时被覆盖的对象。 不要假定可以安全地读取旧文件,也不要假定更改在多长时间内可见 - 或者实际上,如果正在读取的文件被覆盖,客户端不会简单地失败。

因此:避免覆盖已知/可能其他客户端正在主动读取的文件。

其他对象存储是不一致的

这包括 OpenStack Swift

这种存储作为工作目标并不总是安全的 - 请查阅每个存储的特定文档。

安装

如果类路径上有相关的库,并且 Spark 配置了有效的凭据,则可以通过使用其 URL 作为数据路径来读取或写入对象。 例如 sparkContext.textFile("s3a://landsat-pds/scene_list.gz") 将使用 s3a 连接器创建存储在 S3 中的文件 scene_list.gz 的 RDD。

要将相关库添加到应用程序的类路径,请包含 hadoop-cloud 模块及其依赖项。

在 Maven 中,假设 spark.version 设置为所选的 Spark 版本,请将以下内容添加到 pom.xml 文件中

<dependencyManagement>
  ...
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hadoop-cloud_2.12</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
  </dependency>
  ...
</dependencyManagement>

基于 Apache Spark 的商业产品通常直接设置类路径以与云基础设施通信,在这种情况下可能不需要此模块。

身份验证

Spark 作业必须通过对象存储进行身份验证才能访问其中的数据。

  1. 当 Spark 在云基础设施中运行时,通常会自动设置凭据。
  2. spark-submit 读取 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN 环境变量,并为 Amazon S3 的 s3ns3a 连接器设置相关的身份验证选项。
  3. 在 Hadoop 集群中,可以在 core-site.xml 文件中设置设置。
  4. 身份验证详细信息可以手动添加到 spark-defaults.conf 中的 Spark 配置中
  5. 或者,它们可以在用于配置应用程序 SparkContextSparkConf 实例中以编程方式设置。

重要提示:切勿将身份验证密钥检查到源代码存储库中,尤其是公共存储库

有关相关的配置和安全选项,请查阅 Hadoop 文档

配置

每个云连接器都有自己的一组配置参数,同样,请查阅相关的文档。

对于其一致性模型意味着基于重命名的提交是安全的对象存储,请使用 FileOutputCommitter v2 算法以获得性能; v1 用于安全。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

与“版本 1”算法相比,它在作业结束时执行的重命名操作更少。 由于它仍然使用 rename() 来提交文件,因此当对象存储没有一致的元数据/列表时,使用它是不安全的。

提交器还可以设置为忽略清除临时文件时的失败; 这降低了瞬态网络问题升级为作业失败的风险

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

原始的 v1 提交算法将成功任务的输出重命名为作业尝试目录,然后在作业提交阶段将该目录中的所有文件重命名为最终目标

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

在 Amazon S3 上模拟重命名的速度很慢,这使得该算法非常非常慢。 推荐的解决方案是切换到 S3“零重命名”提交器(见下文)。

作为参考,以下是在重命名目录时不同存储和连接器的性能和安全特性

存储 连接器 目录重命名安全 重命名性能
Amazon S3 s3a 不安全 O(data)
Azure 存储 wasb 安全 O(files)
Azure Datalake Gen 2 abfs 安全 O(1)
Google Cloud Storage gs 混合 O(files)
  1. 由于存储临时文件会产生费用; 定期删除名为 "_temporary" 的目录。
  2. 对于 AWS S3,请限制未完成的分段上传可以保持未完成状态的时间。 这避免了因未完成上传而产生的费用。
  3. 对于 Google Cloud,目录重命名是逐个文件进行的。 考虑使用 v2 提交器,并且只编写生成幂等输出的代码 - 包括文件名,因为它比 v1 提交器不再不安全,并且速度更快。

Parquet I/O 设置

为了在使用 Parquet 数据时获得最佳性能,请使用以下设置

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

这些设置最大限度地减少了查询期间读取的数据量。

ORC I/O 设置

为了在使用 ORC 数据时获得最佳性能,请使用以下设置

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

同样,这些设置最大限度地减少了查询期间读取的数据量。

Spark Streaming 和对象存储

Spark Streaming 可以通过创建一个 FileInputDStream 来监视添加到对象存储的文件,该 FileInputDStream 通过调用 StreamingContext.textFileStream() 来监视存储中的路径。

  1. 扫描新文件所需的时间与路径下的文件数量成正比,而不是与文件的数量成正比,因此它可能成为一项缓慢的操作。 需要设置窗口大小来处理这种情况。

  2. 只有在文件完全写入后,它们才会出现在对象存储中;不需要先写入再重命名的工作流程来确保文件在仍在写入时不会被选取。应用程序可以直接写入到受监控的目录。

  3. 如果使用默认的检查点文件管理器 FileContextBasedCheckpointFileManager,则应仅将流检查点保存到实现了快速且原子性的 rename() 操作的存储中。否则,检查点保存可能会很慢且潜在地不可靠。在使用 S3A 连接器的 Hadoop 3.3.1 或更高版本的 AWS S3 上,可以使用基于可中止流的检查点文件管理器(通过将 spark.sql.streaming.checkpointFileManagerClass 配置设置为 org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager),从而消除缓慢的重命名。在这种情况下,用户必须格外小心,避免多个并行运行的查询之间重复使用检查点位置,因为这可能会导致检查点数据损坏。

安全快速地将工作提交到云存储中。

正如前面提到的,提交通过重命名在任何表现出最终一致性的对象存储(例如:S3)上都是危险的,并且通常比传统文件系统重命名慢。

一些对象存储连接器提供自定义提交器,用于提交任务和作业,而无需使用重命名。

Hadoop S3A 提交器

在使用 Hadoop 3.1 或更高版本构建的 Spark 版本中,hadoop-aws JAR 包含可安全用于通过 s3a 连接器访问的 S3 存储的提交器。

这些提交器不是将数据写入存储上的临时目录以进行重命名,而是将文件写入最终目标位置,但不发出最终的 POST 命令以使大型“多部分”上传可见。 这些操作会推迟到作业提交本身。 因此,任务和作业提交速度更快,并且任务失败不会影响结果。

要切换到 S3A 提交器,请使用使用 Hadoop 3.1 或更高版本构建的 Spark 版本,并通过以下选项切换提交器。

spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

它已经过 Spark 支持的最常见格式的测试。

mydataframe.write.format("parquet").save("s3a://bucket/destination")

有关这些提交器的更多详细信息,请参见最新的 Hadoop 文档,其中 S3A 提交器的详细信息在使用 S3A 提交器将工作提交到 S3中进行了介绍。

注意:根据所使用的提交器,在 Hadoop 3.3.1 之前的版本中,正在进行的统计信息可能会被低估。

Amazon EMR:EMRFS S3 优化提交器

Amazon EMR 具有自己的 S3 感知提交器,用于 parquet 数据。 有关使用说明,请参见EMRFS S3 优化提交器

有关实现和性能详细信息,请参见[“使用 EMRFS S3 优化提交器提高 Apache Parquet 格式的 Apache Spark 写入性能”](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

Azure 和 Google 云存储:MapReduce 中间清单提交器。

2022 年 9 月之后发布的 hadoop-mapreduce-core JAR 版本(3.3.5 及更高版本)包含一个提交器,该提交器针对 Azure ADLS Generation 2 和 Google Cloud Storage 上的性能和弹性进行了优化。

这个 “manifest committer”(清单提交器) 使用清单文件将目录列表信息从任务提交器传播到作业提交器。 这些清单可以原子地写入,而无需依赖原子目录重命名,而 GCS 缺少该功能。

作业提交器读取这些清单,并将文件从任务输出目录直接并行重命名到目标目录中,并可以选择速率限制以避免限制 IO。 这在对象存储上提供了性能和可伸缩性。

对于 Azure 存储来说,使用它来保证作业的正确性并不是至关重要的; 经典的 FileOutputCommitter 在这里是安全的 - 但是这个新的提交器对于具有深度和宽目录树的大型作业来说扩展性更好。

因为 Google GCS 不支持原子目录重命名,所以应尽可能使用清单提交器。

此提交器确实支持“动态分区覆盖”(见下文)。

有关此提交器的可用性和使用的详细信息,请查阅所使用的 Hadoop 版本的 Hadoop 文档。

它在 Hadoop 3.3.4 或更早版本上不可用。

IBM Cloud 对象存储:Stocator

IBM 为 IBM Cloud Object Storage 和 OpenStack Swift 提供 Stocator 输出提交器。

源代码、文档和版本可在 https://github.com/CODAIT/stocator 找到。

云提交器和 INSERT OVERWRITE TABLE

Spark 有一个名为“动态分区覆盖”的功能; 可以更新表,并且只有添加新数据的分区才会替换其内容。

这用于 INSERT OVERWRITE TABLE 形式的 SQL 语句,以及数据集以“overwrite”模式写入时。

eventDataset.write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(tablePath)

此功能使用文件重命名,并且对提交器和文件系统都有特定要求

  1. 提交器的工作目录必须位于目标文件系统中。
  2. 目标文件系统必须有效地支持文件重命名。

S3A 提交器和 AWS S3 存储满足这些条件。

其他云存储的提交器可能支持此功能,并向 Spark 声明它们是兼容的。 如果通过 hadoop 提交器写入数据时需要动态分区覆盖,则在使用原始 FileOutputCommitter 时,Spark 将始终允许这样做。 对于其他提交器,在实例化后,Spark 将探测它们是否声明了兼容性,如果它们声明为兼容,则允许该操作。

如果提交器不兼容,则操作将失败,并显示错误消息 PathOutputCommitter does not support dynamicPartitionOverwrite

除非目标文件系统有兼容的提交器,否则唯一的解决方案是使用对云友好的数据存储格式。

进一步阅读

以下是来自 Apache 和云提供商的标准连接器的文档。

云提交器问题和 hive 兼容的解决方案