第七章:加载与呈现 - 数据加载、存储、文件格式#

[1]:
!pip install pyspark==4.0.0.dev2
Requirement already satisfied: pyspark==4.0.0.dev2 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (4.0.0.dev2)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (from pyspark==4.0.0.dev2) (0.10.9.7)
[2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Data Loading and Storage Example") \
    .getOrCreate()

本节介绍如何使用 PySpark 以各种格式读取和写入数据。您将学习如何从常见文件类型(例如 CSV、JSON、Parquet、ORC)加载数据并高效存储数据。

读取数据#

1.1 读取 CSV 文件#

CSV 是最常见的数据交换格式之一。以下是如何将 CSV 文件加载到 DataFrame 中

[3]:
csv_df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
csv_df.show()
+-----------+-----------------+-----------------+
|Employee ID|             Role|         Location|
+-----------+-----------------+-----------------+
|      19238|     Data Analyst|      Seattle, WA|
|      19239|Software Engineer|      Seattle, WA|
|      19240|    IT Specialist|      Seattle, WA|
|      19241|     Data Analyst|     New York, NY|
|      19242|        Recruiter|San Francisco, CA|
|      19243|  Product Manager|     New York, NY|
+-----------+-----------------+-----------------+

说明: - header=True:将第一行视为列名。 - inferSchema=True:自动推断列的数据类型。

1.2 读取 JSON 文件#

加载 JSON 文件很简单,并且允许您处理单行和多行 JSON 结构

[4]:
json_df = spark.read.option("multiline", "true").json("../data/employees.json")
json_df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19238|      Seattle, WA|     Data Analyst|
|      19239|      Seattle, WA|Software Engineer|
|      19240|      Seattle, WA|    IT Specialist|
|      19241|     New York, NY|     Data Analyst|
|      19242|San Francisco, CA|        Recruiter|
|      19243|     New York, NY|  Product Manager|
+-----------+-----------------+-----------------+

说明: - multiline="true":允许读取多行 JSON 结构。

1.3 读取 Parquet 文件#

Parquet 是一种支持高效数据压缩和编码的列式格式

[5]:
parquet_df = spark.read.parquet("../data/employees.parquet")
parquet_df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19239|      Seattle, WA|Software Engineer|
|      19243|     New York, NY|  Product Manager|
|      19242|San Francisco, CA|        Recruiter|
|      19241|     New York, NY|     Data Analyst|
|      19240|      Seattle, WA|    IT Specialist|
|      19238|      Seattle, WA|     Data Analyst|
+-----------+-----------------+-----------------+

提示: Parquet 文件因其列式存储和压缩而非常高效地存储数据。

1.4 读取 ORC 文件#

ORC 是另一种列式文件格式,常用于 Hadoop 环境

[6]:
orc_df = spark.read.orc("../data/employees.orc")
orc_df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19242|San Francisco, CA|        Recruiter|
|      19239|      Seattle, WA|Software Engineer|
|      19240|      Seattle, WA|    IT Specialist|
|      19243|     New York, NY|  Product Manager|
|      19238|      Seattle, WA|     Data Analyst|
|      19241|     New York, NY|     Data Analyst|
+-----------+-----------------+-----------------+

写入数据#

2.1 将数据写入 CSV#

[7]:
csv_df.write.csv("../data/employees_out.csv", mode="overwrite", header=True)

说明: - mode="overwrite":如果目录存在,则将其替换。 - header=True:将列名作为第一行写入。

2.2 将数据写入 Parquet#

Parquet 格式推荐用于大型数据集

[8]:
parquet_df.write.parquet("../data/employees_out.parquet", mode="overwrite")

2.3 将数据写入 ORC#

[9]:
json_df.write.orc("../data/employees_out.orc", mode="overwrite")

提示: Parquet 和 ORC 格式最适合高效存储和快速读取。

附加选项和配置#

您可以使用附加选项自定义数据的读取和写入方式。以下是一些示例

CSV 中的自定义分隔符:#

[10]:
spark.read.option("delimiter", ";").csv("../data/employees.csv").show(truncate=False)
+-------------------------------------+
|_c0                                  |
+-------------------------------------+
|Employee ID,Role,Location            |
|19238,Data Analyst,"Seattle, WA"     |
|19239,Software Engineer,"Seattle, WA"|
|19240,IT Specialist,"Seattle, WA"    |
|19241,Data Analyst,"New York, NY"    |
|19242,Recruiter,"San Francisco, CA"  |
|19243,Product Manager,"New York, NY" |
+-------------------------------------+

处理空值:#

[11]:
spark.read.option("nullValue", "NULL").csv("../data/employees.csv").show(truncate=False)
+-----------+-----------------+-----------------+
|_c0        |_c1              |_c2              |
+-----------+-----------------+-----------------+
|Employee ID|Role             |Location         |
|19238      |Data Analyst     |Seattle, WA      |
|19239      |Software Engineer|Seattle, WA      |
|19240      |IT Specialist    |Seattle, WA      |
|19241      |Data Analyst     |New York, NY     |
|19242      |Recruiter        |San Francisco, CA|
|19243      |Product Manager  |New York, NY     |
+-----------+-----------------+-----------------+

压缩选项:#

[12]:
parquet_df.write.option("compression", "gzip").parquet("../data/employees_out.parquet", mode="overwrite")

请查阅 PySpark API 参考的输入/输出部分,以查看所有支持的函数和选项。