第 2 章:PySpark 数据类型概览#
PySpark 中的基本数据类型#
理解 PySpark 中的基本数据类型对于定义 DataFrame 模式和执行高效数据处理至关重要。以下是每种类型的详细概述,包括描述、Python 等价类型和示例
数值类型#
ByteType
用于存储范围从 -128
到 127
的字节长度整数。非常适合高效地存储小数据。- Python 等价类型:int
(-128
到 127
) Python 示例
[2]:
byte_example = 127 # Maximum value for a signed byte
ShortType
表示短整数,存储介于 -32768
和 32767
之间的值。对于数值范围较小的数据,比使用 IntegerType
更高效。- Python 等价类型:int
(-32768
到 32767
) Python 示例
[3]:
short_example = 32767 # Maximum value for a signed short
IntegerType
用于存储整数值。非常适合计数、索引和任何离散数量。- Python 等价类型:int
(-2147483648
到 2147483647
) Python 示例
[4]:
integer_example = 123
LongType
适用于存储大整数值,常用于标识符或大计数。- Python 等价类型:int
(-9223372036854775808
到 9223372036854775807
) Python 示例
[5]:
long_integer_example = 1234567890123456789
DoubleType
提供双精度浮点数,用于准确和精确的计算。- Python 等价类型:float
(双精度) Python 示例
[6]:
double_example = 12345.6789
FloatType
用于浮点数,在可接受较低精度以换取性能的情况下使用。- Python 等价类型:float
(单精度) Python 示例
[7]:
float_example = 123.456
DecimalType
允许固定精度和范围,用于需要精确十进制表示的场景,例如金融计算。- Python 等价类型:decimal.Decimal
Python 示例
[8]:
from decimal import Decimal
decimal_example = Decimal('12345.6789')
字符串类型#
用于文本数据;支持 Unicode 并能够存储任何字符串数据。- Python 等价类型:str
Python 示例
[9]:
string_example = "Hello, World!"
二进制类型#
用于原始字节数据,例如文件内容或图像,以二进制流形式存储。- Python 等价类型:bytes
Python 示例
[10]:
binary_example = b'Hello, binary world!'
布尔类型#
表示布尔值,广泛用于条件操作和过滤器。- Python 等价类型:bool
Python 示例
[11]:
boolean_example = True
日期时间类型#
DateType
用于不带时间的日期,适用于存储生日或特定日期等日历日期。- Python 等价类型:datetime.date
Python 示例
[12]:
from datetime import date
date_example = date(2020, 1, 1)
TimestampType
存储日期和时间,对于记录精确的时间点(如日志时间戳)至关重要。- Python 等价类型:datetime.datetime
Python 示例
[13]:
from datetime import datetime
timestamp_example = datetime(2020, 1, 1, 12, 0)
在 PySpark 中从 Python 对象创建 DataFrame#
以下是如何使用与每个基本数据类型对应的 Python 对象在 PySpark 中定义模式并创建 DataFrame
[14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, DoubleType, FloatType
from pyspark.sql.types import DecimalType, StringType, BinaryType, BooleanType, DateType, TimestampType
from decimal import Decimal
from datetime import date, datetime
# Define the schema of the DataFrame
schema = StructType([
StructField("integer_field", IntegerType(), nullable=False),
StructField("long_field", LongType(), nullable=False),
StructField("double_field", DoubleType(), nullable=False),
StructField("float_field", FloatType(), nullable=False),
StructField("decimal_field", DecimalType(10, 2), nullable=False),
StructField("string_field", StringType(), nullable=False),
StructField("binary_field", BinaryType(), nullable=False),
StructField("boolean_field", BooleanType(), nullable=False),
StructField("date_field", DateType(), nullable=False),
StructField("timestamp_field", TimestampType(), nullable=False)
])
# Sample data using the Python objects corresponding to each PySpark type
data = [
(123, 1234567890123456789, 12345.6789, 123.456, Decimal('12345.67'), "Hello, World!",
b'Hello, binary world!', True, date(2020, 1, 1), datetime(2020, 1, 1, 12, 0)),
(456, 9223372036854775807, 98765.4321, 987.654, Decimal('98765.43'), "Goodbye, World!",
b'Goodbye, binary world!', False, date(2025, 12, 31), datetime(2025, 12, 31, 23, 59)),
(-1, -1234567890123456789, -12345.6789, -123.456, Decimal('-12345.67'), "Negative Values",
b'Negative binary!', False, date(1990, 1, 1), datetime(1990, 1, 1, 0, 0)),
(0, 0, 0.0, 0.0, Decimal('0.00'), "", b'', True, date(2000, 1, 1), datetime(2000, 1, 1, 0, 0))
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show()
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
|integer_field| long_field|double_field|float_field|decimal_field| string_field| binary_field|boolean_field|date_field| timestamp_field|
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
| 123| 1234567890123456789| 12345.6789| 123.456| 12345.67| Hello, World!|[48 65 6C 6C 6F 2...| true|2020-01-01|2020-01-01 12:00:00|
| 456| 9223372036854775807| 98765.4321| 987.654| 98765.43|Goodbye, World!|[47 6F 6F 64 62 7...| false|2025-12-31|2025-12-31 23:59:00|
| -1|-1234567890123456789| -12345.6789| -123.456| -12345.67|Negative Values|[4E 65 67 61 74 6...| false|1990-01-01|1990-01-01 00:00:00|
| 0| 0| 0.0| 0.0| 0.00| | []| true|2000-01-01|2000-01-01 00:00:00|
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
双精度浮点数、单精度浮点数和十进制数的精度#
理解数值数据类型中的精度对于数据完整性至关重要,尤其是在需要高精度的领域,如金融分析、科学计算和工程。PySpark 提供不同的数据类型来满足这些需求。
FloatType
PySpark 中的 FloatType
表示单精度 32 位 IEEE 754 浮点数。它的精度较低,但占用存储空间较少,并且处理速度比 DoubleType
快。这使得它适用于需要快速处理大量数值数据且不要求极高精度的应用。使用场景:在处理大型数据集时,可用于机器学习算法以加快计算速度。
DoubleType
DoubleType
对应双精度 64 位 IEEE 754 浮点数。它在精度和性能之间提供了良好的平衡,适用于大多数需要精度的数值计算。使用场景:金融计算的理想选择,其中精度比计算速度更重要。
DecimalType
DecimalType
用于处理高精度固定比例的十进制数。用户可以定义精度和范围,这使其对于财务报告等应用非常宝贵,因为精确的十进制表示有助于避免舍入错误。使用场景:在需要精确到分位的会计应用中至关重要。
示例:计算财务统计数据#
此示例演示如何在 PySpark 中使用不同的数值数据类型进行财务计算,例如聚合收入和以适当的精度计算平均值。
[15]:
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, FloatType, DoubleType, DecimalType
from pyspark.sql.functions import sum, avg, col, format_number
# Define the schema of the DataFrame
schema = StructType([
StructField("revenue_float", FloatType(), nullable=False),
StructField("revenue_double", DoubleType(), nullable=False),
StructField("revenue_decimal", DecimalType(10, 2), nullable=False)
])
# Sample data
data = [
(12345.67, 12345.6789, Decimal('12345.68')),
(98765.43, 98765.4321, Decimal('98765.43')),
(54321.10, 54321.0987, Decimal('54321.10'))
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Calculations
result = df.select(
format_number(sum(col("revenue_float")), 2).alias("Total_Revenue_Float"),
format_number(avg(col("revenue_float")), 2).alias("Average_Revenue_Float"),
format_number(sum(col("revenue_double")), 2).alias("Total_Revenue_Double"),
format_number(avg(col("revenue_double")), 2).alias("Average_Revenue_Double"),
format_number(sum(col("revenue_decimal")), 2).alias("Total_Revenue_Decimal"),
format_number(avg(col("revenue_decimal")), 2).alias("Average_Revenue_Decimal")
)
result.show()
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
|Total_Revenue_Float|Average_Revenue_Float|Total_Revenue_Double|Average_Revenue_Double|Total_Revenue_Decimal|Average_Revenue_Decimal|
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
| 165,432.20| 55,144.07| 165,432.21| 55,144.07| 165,432.21| 55,144.07|
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
PySpark 中的复杂数据类型#
PySpark 中的复杂数据类型有助于处理嵌套和结构化数据,这对于处理大数据生态系统中常见的现代数据格式(如 JSON、XML 等)至关重要。本节探讨 PySpark 中主要的复杂数据类型:ArrayType
、StructType
、MapType
及其用例。
ArrayType
允许在单个列中存储相同类型的多个值。非常适合自然形成列表的数据,例如标签、类别或历史数据点。- Python 等价类型:list
Python 示例
[16]:
array_example = ['apple', 'banana', 'cherry']
使用场景:管理与每条记录相关的项目列表,例如单个联系人的多个电话号码或电子邮件地址。
StructType
启用 DataFrame 列的嵌套,允许在单个 DataFrame 单元格中创建复杂的分层数据结构。StructType
中的每个字段本身都可以是复杂类型。它类似于 DataFrame 中的一行,通常用于封装具有结构化模式的记录。- Python 等价类型:pyspark.sql.Row
Python 示例
[17]:
from pyspark.sql import Row
struct_example = Row(name="John Doe", age=30, address=Row(street="123 Elm St", city="Somewhere"))
使用场景:常用于表示 JSON 对象,可以将每个 JSON 字段像 DataFrame 中的列一样进行操作。
MapType
在 DataFrame 列中表示键值对,其中每个键和值都可以是任何数据类型。适用于动态结构化数据。- Python 等价类型:dict
Python 示例
[18]:
map_example = {'food': 'pizza', 'color': 'blue', 'car': 'Tesla'}
使用场景:在单个 DataFrame 列中存储和处理键值对集合,例如产品属性,其中键是属性名称,值是属性值。
示例:处理复杂的嵌套数据#
为了说明这些复杂数据类型的使用,让我们考虑一个涉及嵌套数据结构的实际示例,例如包含多个地址和各种类别偏好的客户记录。
[19]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
# Define the schema of the DataFrame
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("addresses", ArrayType(
StructType([
StructField("street", StringType(), nullable=False),
StructField("city", StringType(), nullable=False),
StructField("zip", StringType(), nullable=False)
])
), nullable=True),
StructField("preferences", MapType(StringType(), StringType()), nullable=True)
])
# Sample data using Row objects for StructType
data = [
Row(name="John Doe",
addresses=[Row(street="123 Elm St", city="Somewhere", zip="12345"),
Row(street="456 Oak St", city="Anywhere", zip="67890")],
preferences={"food": "pizza", "color": "blue", "car": "Tesla"}),
Row(name="Jane Smith",
addresses=[Row(street="789 Pine St", city="Everywhere", zip="10112")],
preferences={"food": "sushi", "color": "green", "car": "Honda"})
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show(truncate=False)
+----------+---------------------------------------------------------------+---------------------------------------------+
|name |addresses |preferences |
+----------+---------------------------------------------------------------+---------------------------------------------+
|John Doe |[{123 Elm St, Somewhere, 12345}, {456 Oak St, Anywhere, 67890}]|{color -> blue, car -> Tesla, food -> pizza} |
|Jane Smith|[{789 Pine St, Everywhere, 10112}] |{color -> green, car -> Honda, food -> sushi}|
+----------+---------------------------------------------------------------+---------------------------------------------+
在此示例中:- ArrayType
用于存储每个客户的多个地址。- StructType
嵌套在 ArrayType
中,将每个地址表示为结构化记录。- MapType
存储偏好作为键值对,允许动态数据存储。
在 PySpark 中进行列转换 (Casting)#
列转换是数据处理中的基本操作,它将 DataFrame 中列的数据类型从一种类型转换为另一种类型。PySpark 提供了简单的方法,使您能够将输入数据类型与数据处理操作或应用程序的要求对齐。
如何进行列转换#
要在 PySpark 中进行列转换,可以使用列上的 cast()
或 astype()
方法。这是一个完整的示例,演示如何执行基本的转换操作
[20]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType
# Define the schema of the DataFrame
schema = StructType([
StructField("float_column", FloatType(), nullable=True),
StructField("string_column", StringType(), nullable=True)
])
# Sample data
data = [
(123.456, "123"),
(789.012, "456"),
(None, "789")
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Display original DataFrame
print("Original DataFrame:")
df.show()
# Example of casting a float column to string
df = df.withColumn('string_from_float', col('float_column').cast('string'))
# Example of casting a string column to integer
df = df.withColumn('integer_from_string', col('string_column').cast('integer'))
# Display DataFrame after casting
print("DataFrame after Casting:")
df.show()
Original DataFrame:
+------------+-------------+
|float_column|string_column|
+------------+-------------+
| 123.456| 123|
| 789.012| 456|
| NULL| 789|
+------------+-------------+
DataFrame after Casting:
+------------+-------------+-----------------+-------------------+
|float_column|string_column|string_from_float|integer_from_string|
+------------+-------------+-----------------+-------------------+
| 123.456| 123| 123.456| 123|
| 789.012| 456| 789.012| 456|
| NULL| 789| NULL| 789|
+------------+-------------+-----------------+-------------------+
谨慎转换:潜在的数据丢失#
在进行列转换时,了解 PySpark 如何处理不兼容或无效的转换操作非常重要
静默转换为 Null - 如果 ANSI 模式被禁用,当值在转换过程中无法转换为所需类型时,PySpark 不会抛出错误。相反,它会溢出或将值转换为 null
。这种行为可能导致数据集中的数据丢失,而这可能不会立即显现。- 如果 ANSI 模式被启用,PySpark 在这种情况下会抛出错误。如果可以接受,请改用 try_cast
。
示例:检查数据丢失 - 检查由于转换操作导致的意外 null 值是一种好习惯,尤其是在从字符串转换为数字类型时,因为格式问题可能导致失败。
[21]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
# Disable ANSI mode
spark.conf.set("spark.sql.ansi.enabled", False)
# Define the schema of the DataFrame
schema = StructType([
StructField("original_column", StringType(), nullable=True)
])
# Sample data
data = [
("123",), # Valid integer in string form
("abc",), # Invalid, will result in null when cast to integer
(None,) # Original null, remains null
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Display original DataFrame
print("Original DataFrame:")
df.show()
# Add a new column with casted values
df = df.withColumn('casted_column', col('original_column').cast('integer'))
# Show rows where casting resulted in nulls but the original column had data
print("DataFrame Showing Potential Data Loss:")
df.filter(col('original_column').isNotNull() & col('casted_column').isNull()).show()
spark.conf.unset("spark.sql.ansi.enabled")
Original DataFrame:
+---------------+
|original_column|
+---------------+
| 123|
| abc|
| NULL|
+---------------+
DataFrame Showing Potential Data Loss:
+---------------+-------------+
|original_column|casted_column|
+---------------+-------------+
| abc| NULL|
+---------------+-------------+
转换的最佳实践#
首先验证数据 - 在转换列之前,特别是在将字符串转换为数字类型时,请验证和清理数据以确保其符合预期格式。
示例:在将数字字符串转换为整数之前检查其格式是否正确
[22]:
from pyspark.sql.functions import col, regexp_extract
# Sample DataFrame with a string column
df = spark.createDataFrame([("100",), ("20x",), ("300",)], ["data"])
# Checking and filtering rows where data can be safely cast to an integer
valid_df = df.filter(regexp_extract(col("data"), '^[0-9]+$', 0) != "")
valid_df.show()
+----+
|data|
+----+
| 100|
| 300|
+----+
使用显式模式 - 读取数据时,使用显式模式以避免不正确的数据类型推断,这可以最大限度地减少转换的需要。
示例:读取数据时指定模式以确保从一开始就应用正确的数据类型
[23]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define a schema
schema = StructType([
StructField("Employee ID", IntegerType(), True),
StructField("Role", StringType(), True),
StructField("Location", StringType(), True)
])
# Read data with an explicit schema
df = spark.read.csv("../data/employees.csv", schema=schema)
df.printSchema()
root
|-- Employee ID: integer (nullable = true)
|-- Role: string (nullable = true)
|-- Location: string (nullable = true)
PySpark 中的半结构化数据处理#
本节探讨 PySpark 处理半结构化数据格式的能力,特别侧重于 JSON 和 XML,并探讨了管理类似 VARIANT 数据的方法,这种数据类型常用于某些 SQL 数据库。
JSON 处理#
JSON 是 Web 服务和数据交换中广泛使用的格式。PySpark 简化了将 JSON 数据解析为结构化 DataFrame 的过程,使其易于操作和分析。
主要函数 - from_json()
: 将 JSON 字符串转换为具有结构化数据类型的 DataFrame 列。- to_json()
: 将 DataFrame 的列转换为 JSON 字符串。
示例:解析 JSON 字符串
[24]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
json_schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType())
])
df = spark.createDataFrame([("{\"name\":\"John\", \"age\":30}",), ("{\"name\":\"Jane\", \"age\":25}",)], ["json_str"])
df.select(from_json(col("json_str"), json_schema).alias("parsed_json")).show()
+-----------+
|parsed_json|
+-----------+
| {John, 30}|
| {Jane, 25}|
+-----------+
示例:读取和处理 JSON 数据
[25]:
df = spark.read.json('../data/books.json')
df.select("author", "title", "genre").show()
+-------------+--------------------+---------+
| author| title| genre|
+-------------+--------------------+---------+
|George Orwell| 1984|Dystopian|
| Jane Austen| Pride and Prejudice| Romance|
| Mark Twain|Adventures of Huc...| Fiction|
+-------------+--------------------+---------+
XML 处理#
注意:本节适用于 Spark 4.0
XML 是另一种常见的半结构化数据格式,广泛用于各种企业应用程序。示例:读取和处理 XML 数据
[26]:
df = spark.read \
.format('xml') \
.option('rowTag', 'book') \
.load('../data/books.xml')
df.select("author", "title", "genre").show()
+-------------+--------------------+---------+
| author| title| genre|
+-------------+--------------------+---------+
|George Orwell| 1984|Dystopian|
| Jane Austen| Pride and Prejudice| Romance|
| Mark Twain|Adventures of Huc...| Fiction|
+-------------+--------------------+---------+
在 PySpark 中处理 VARIANT 数据类型#
注意:本节适用于 Spark 4.0
随着 VARIANT 数据类型的引入,半结构化数据的处理变得更加简化。VARIANT 类型旨在直接在 DataFrame 列中存储不符合固定模式的数据,例如 JSON 或 XML。
PySpark 中 VARIANT 的特性 - 灵活性:VARIANT 类型可以存储 JSON 或 XML 等数据结构,而无需预定义模式约束,为数据摄取和操作提供了高度灵活性。- 集成:提供了与使用半结构化数据的系统更好的集成,允许更直接的数据交换和查询。
使用 VARIANT 时的注意事项 - 性能:虽然 VARIANT 提供了灵活性,但由于其动态特性,可能会影响性能。测试和优化涉及 VARIANT 类型的数据操作非常重要。- 兼容性:如果您正在利用此数据类型,请确保数据管道的所有部分都支持 VARIANT,尤其是在将数据导出到外部系统时。
实际示例:使用 VARIANT 处理 JSON 数据 此示例演示了如何在 PySpark 中有效地使用 VARIANT 处理 JSON 数据
[27]:
from datetime import date, datetime
from decimal import Decimal
from pyspark.sql.functions import try_parse_json, try_variant_get, col
# Sample JSON data
data = [
'1234567890123456789',
'12345.6789',
'"Hello, World!"',
'true',
'{"id": 1, "attributes": {"key1": "value1", "key2": "value2"}}',
'{"id": 2, "attributes": {"key1": "value3", "key2": "value4"}}',
]
# Load data into DataFrame with VARIANT
df = spark.createDataFrame(data, StringType()).select(try_parse_json(col("value")).alias("variant_data"))
df.printSchema()
df.show(truncate=False)
# Accessing elements inside the VARIANT
df.select(
try_variant_get(col("variant_data"), "$", "long").alias("long_value"),
try_variant_get(col("variant_data"), "$.id", "int").alias("id"),
try_variant_get(col("variant_data"), "$.attributes.key1", "string").alias("key1"),
try_variant_get(col("variant_data"), "$.attributes.key2", "string").alias("key2"),
).show()
# Collect data and convert to Python objects
[row["variant_data"].toPython() for row in df.collect()]
root
|-- variant_data: variant (nullable = true)
+-------------------------------------------------------+
|variant_data |
+-------------------------------------------------------+
|1234567890123456789 |
|12345.6789 |
|"Hello, World!" |
|true |
|{"attributes":{"key1":"value1","key2":"value2"},"id":1}|
|{"attributes":{"key1":"value3","key2":"value4"},"id":2}|
+-------------------------------------------------------+
+-------------------+----+------+------+
| long_value| id| key1| key2|
+-------------------+----+------+------+
|1234567890123456789|NULL| NULL| NULL|
| 12345|NULL| NULL| NULL|
| NULL|NULL| NULL| NULL|
| 1|NULL| NULL| NULL|
| NULL| 1|value1|value2|
| NULL| 2|value3|value4|
+-------------------+----+------+------+
[27]:
[1234567890123456789,
Decimal('12345.6789'),
'Hello, World!',
True,
{'attributes': {'key1': 'value1', 'key2': 'value2'}, 'id': 1},
{'attributes': {'key1': 'value3', 'key2': 'value4'}, 'id': 2}]