第七章:加载与呈现 - 数据加载、存储、文件格式#
[1]:
!pip install pyspark==4.0.0.dev2
Requirement already satisfied: pyspark==4.0.0.dev2 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (4.0.0.dev2)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (from pyspark==4.0.0.dev2) (0.10.9.7)
[2]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Data Loading and Storage Example") \
.getOrCreate()
本节介绍如何使用 PySpark 以各种格式读取和写入数据。您将学习如何从常见文件类型(例如 CSV、JSON、Parquet、ORC)加载数据并高效存储数据。
读取数据#
1.1 读取 CSV 文件#
CSV 是最常见的数据交换格式之一。以下是如何将 CSV 文件加载到 DataFrame 中
[3]:
csv_df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
csv_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Role| Location|
+-----------+-----------------+-----------------+
| 19238| Data Analyst| Seattle, WA|
| 19239|Software Engineer| Seattle, WA|
| 19240| IT Specialist| Seattle, WA|
| 19241| Data Analyst| New York, NY|
| 19242| Recruiter|San Francisco, CA|
| 19243| Product Manager| New York, NY|
+-----------+-----------------+-----------------+
说明: - header=True
:将第一行视为列名。 - inferSchema=True
:自动推断列的数据类型。
1.2 读取 JSON 文件#
加载 JSON 文件很简单,并且允许您处理单行和多行 JSON 结构
[4]:
json_df = spark.read.option("multiline", "true").json("../data/employees.json")
json_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19238| Seattle, WA| Data Analyst|
| 19239| Seattle, WA|Software Engineer|
| 19240| Seattle, WA| IT Specialist|
| 19241| New York, NY| Data Analyst|
| 19242|San Francisco, CA| Recruiter|
| 19243| New York, NY| Product Manager|
+-----------+-----------------+-----------------+
说明: - multiline="true"
:允许读取多行 JSON 结构。
1.3 读取 Parquet 文件#
Parquet 是一种支持高效数据压缩和编码的列式格式
[5]:
parquet_df = spark.read.parquet("../data/employees.parquet")
parquet_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19239| Seattle, WA|Software Engineer|
| 19243| New York, NY| Product Manager|
| 19242|San Francisco, CA| Recruiter|
| 19241| New York, NY| Data Analyst|
| 19240| Seattle, WA| IT Specialist|
| 19238| Seattle, WA| Data Analyst|
+-----------+-----------------+-----------------+
提示: Parquet 文件因其列式存储和压缩而非常高效地存储数据。
1.4 读取 ORC 文件#
ORC 是另一种列式文件格式,常用于 Hadoop 环境
[6]:
orc_df = spark.read.orc("../data/employees.orc")
orc_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19242|San Francisco, CA| Recruiter|
| 19239| Seattle, WA|Software Engineer|
| 19240| Seattle, WA| IT Specialist|
| 19243| New York, NY| Product Manager|
| 19238| Seattle, WA| Data Analyst|
| 19241| New York, NY| Data Analyst|
+-----------+-----------------+-----------------+
写入数据#
2.1 将数据写入 CSV#
[7]:
csv_df.write.csv("../data/employees_out.csv", mode="overwrite", header=True)
说明: - mode="overwrite"
:如果目录存在,则将其替换。 - header=True
:将列名作为第一行写入。
2.2 将数据写入 Parquet#
Parquet 格式推荐用于大型数据集
[8]:
parquet_df.write.parquet("../data/employees_out.parquet", mode="overwrite")
2.3 将数据写入 ORC#
[9]:
json_df.write.orc("../data/employees_out.orc", mode="overwrite")
提示: Parquet 和 ORC 格式最适合高效存储和快速读取。
附加选项和配置#
您可以使用附加选项自定义数据的读取和写入方式。以下是一些示例
CSV 中的自定义分隔符:#
[10]:
spark.read.option("delimiter", ";").csv("../data/employees.csv").show(truncate=False)
+-------------------------------------+
|_c0 |
+-------------------------------------+
|Employee ID,Role,Location |
|19238,Data Analyst,"Seattle, WA" |
|19239,Software Engineer,"Seattle, WA"|
|19240,IT Specialist,"Seattle, WA" |
|19241,Data Analyst,"New York, NY" |
|19242,Recruiter,"San Francisco, CA" |
|19243,Product Manager,"New York, NY" |
+-------------------------------------+
处理空值:#
[11]:
spark.read.option("nullValue", "NULL").csv("../data/employees.csv").show(truncate=False)
+-----------+-----------------+-----------------+
|_c0 |_c1 |_c2 |
+-----------+-----------------+-----------------+
|Employee ID|Role |Location |
|19238 |Data Analyst |Seattle, WA |
|19239 |Software Engineer|Seattle, WA |
|19240 |IT Specialist |Seattle, WA |
|19241 |Data Analyst |New York, NY |
|19242 |Recruiter |San Francisco, CA|
|19243 |Product Manager |New York, NY |
+-----------+-----------------+-----------------+
压缩选项:#
[12]:
parquet_df.write.option("compression", "gzip").parquet("../data/employees_out.parquet", mode="overwrite")
请查阅 PySpark API 参考的输入/输出部分,以查看所有支持的函数和选项。