快速入门:Spark 上的 Pandas API#

这是对 Spark 上的 pandas API 的简要介绍,主要面向新用户。本笔记展示了 pandas 和 Spark 上的 pandas API 之间的一些关键区别。您可以在快速入门页面的“实时笔记:Spark 上的 pandas API”中自行运行这些示例。

通常,我们按如下方式导入 Spark 上的 pandas API

[1]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

对象创建#

通过传入值列表创建 Spark 上的 pandas Series,让 Spark 上的 pandas API 创建默认的整数索引

[2]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])
[3]:
s
[3]:
0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

通过传入可转换为 Series 类似对象的字典来创建 Spark 上的 pandas DataFrame。

[4]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
[5]:
psdf
[5]:
a b c
10 1 100 one
20 2 200 two
30 3 300 three
40 4 400 four
50 5 500 five
60 6 600 six

通过传入带有日期时间索引和标签列的 numpy 数组来创建 pandas DataFrame

[6]:
dates = pd.date_range('20130101', periods=6)
[7]:
dates
[7]:
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')
[8]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
[9]:
pdf
[9]:
A B C D
2013-01-01 0.912558 -0.795645 -0.289115 0.187606
2013-01-02 -0.059703 -1.233897 0.316625 -1.226828
2013-01-03 0.332871 -1.262010 -0.434844 -0.579920
2013-01-04 0.924016 -1.022019 -0.405249 -1.036021
2013-01-05 -0.772209 -1.228099 0.068901 0.896679
2013-01-06 1.485582 -0.709306 -0.202637 -0.248766

现在,这个 pandas DataFrame 可以转换为 Spark 上的 pandas DataFrame

[10]:
psdf = ps.from_pandas(pdf)
[11]:
type(psdf)
[11]:
pyspark.pandas.frame.DataFrame

它的外观和行为与 pandas DataFrame 相同。

[12]:
psdf
[12]:
A B C D
2013-01-01 0.912558 -0.795645 -0.289115 0.187606
2013-01-02 -0.059703 -1.233897 0.316625 -1.226828
2013-01-03 0.332871 -1.262010 -0.434844 -0.579920
2013-01-04 0.924016 -1.022019 -0.405249 -1.036021
2013-01-05 -0.772209 -1.228099 0.068901 0.896679
2013-01-06 1.485582 -0.709306 -0.202637 -0.248766

此外,也可以轻松地从 Spark DataFrame 创建 Spark 上的 pandas DataFrame。

从 pandas DataFrame 创建 Spark DataFrame

[13]:
spark = SparkSession.builder.getOrCreate()
[14]:
sdf = spark.createDataFrame(pdf)
[15]:
sdf.show()
+--------------------+-------------------+--------------------+--------------------+
|                   A|                  B|                   C|                   D|
+--------------------+-------------------+--------------------+--------------------+
|    0.91255803205208|-0.7956452608556638|-0.28911463069772175| 0.18760566615081622|
|-0.05970271470242...| -1.233896949308984|  0.3166246451758431| -1.2268284000402265|
| 0.33287106947536615|-1.2620100816441786| -0.4348444277082644| -0.5799199651437185|
|  0.9240158461589916|-1.0220190956326003| -0.4052488880650239| -1.0360212104348547|
| -0.7722090016558953|-1.2280986385313222|  0.0689011451939635|  0.8966790729426755|
|  1.4855822995785612|-0.7093056426018517| -0.2026366848847041|-0.24876619876451092|
+--------------------+-------------------+--------------------+--------------------+

从 Spark DataFrame 创建 Spark 上的 pandas DataFrame。

[16]:
psdf = sdf.pandas_api()
[17]:
psdf
[17]:
A B C D
0 0.912558 -0.795645 -0.289115 0.187606
1 -0.059703 -1.233897 0.316625 -1.226828
2 0.332871 -1.262010 -0.434844 -0.579920
3 0.924016 -1.022019 -0.405249 -1.036021
4 -0.772209 -1.228099 0.068901 0.896679
5 1.485582 -0.709306 -0.202637 -0.248766

具有特定的dtypes。目前支持 Spark 和 pandas 共同的类型。

[18]:
psdf.dtypes
[18]:
A    float64
B    float64
C    float64
D    float64
dtype: object

以下是如何显示下面框中的前几行。

请注意,Spark DataFrame 中的数据默认不保留自然顺序。可以通过设置compute.ordered_head选项来保留自然顺序,但这会带来内部排序的性能开销。

[19]:
psdf.head()
[19]:
A B C D
0 0.912558 -0.795645 -0.289115 0.187606
1 -0.059703 -1.233897 0.316625 -1.226828
2 0.332871 -1.262010 -0.434844 -0.579920
3 0.924016 -1.022019 -0.405249 -1.036021
4 -0.772209 -1.228099 0.068901 0.896679

显示索引、列以及底层的 numpy 数据。

[20]:
psdf.index
[20]:
Index([0, 1, 2, 3, 4, 5], dtype='int64')
[21]:
psdf.columns
[21]:
Index(['A', 'B', 'C', 'D'], dtype='object')
[22]:
psdf.to_numpy()
[22]:
array([[ 0.91255803, -0.79564526, -0.28911463,  0.18760567],
       [-0.05970271, -1.23389695,  0.31662465, -1.2268284 ],
       [ 0.33287107, -1.26201008, -0.43484443, -0.57991997],
       [ 0.92401585, -1.0220191 , -0.40524889, -1.03602121],
       [-0.772209  , -1.22809864,  0.06890115,  0.89667907],
       [ 1.4855823 , -0.70930564, -0.20263668, -0.2487662 ]])

显示数据的快速统计摘要

[23]:
psdf.describe()
[23]:
A B C D
计数 6.000000 6.000000 6.000000 6.000000
平均值 0.470519 -1.041829 -0.157720 -0.334542
标准差 0.809428 0.241511 0.294520 0.793014
最小值 -0.772209 -1.262010 -0.434844 -1.226828
25% -0.059703 -1.233897 -0.405249 -1.036021
50% 0.332871 -1.228099 -0.289115 -0.579920
75% 0.924016 -0.795645 0.068901 0.187606
最大值 1.485582 -0.709306 0.316625 0.896679

转置数据

[24]:
psdf.T
[24]:
0 1 2 3 4 5
A 0.912558 -0.059703 0.332871 0.924016 -0.772209 1.485582
B -0.795645 -1.233897 -1.262010 -1.022019 -1.228099 -0.709306
C -0.289115 0.316625 -0.434844 -0.405249 0.068901 -0.202637
D 0.187606 -1.226828 -0.579920 -1.036021 0.896679 -0.248766

按索引排序

[25]:
psdf.sort_index(ascending=False)
[25]:
A B C D
5 1.485582 -0.709306 -0.202637 -0.248766
4 -0.772209 -1.228099 0.068901 0.896679
3 0.924016 -1.022019 -0.405249 -1.036021
2 0.332871 -1.262010 -0.434844 -0.579920
1 -0.059703 -1.233897 0.316625 -1.226828
0 0.912558 -0.795645 -0.289115 0.187606

按值排序

[26]:
psdf.sort_values(by='B')
[26]:
A B C D
2 0.332871 -1.262010 -0.434844 -0.579920
1 -0.059703 -1.233897 0.316625 -1.226828
4 -0.772209 -1.228099 0.068901 0.896679
3 0.924016 -1.022019 -0.405249 -1.036021
0 0.912558 -0.795645 -0.289115 0.187606
5 1.485582 -0.709306 -0.202637 -0.248766

缺失数据#

Spark 上的 Pandas API 主要使用值np.nan来表示缺失数据。默认情况下,它不包含在计算中。

[27]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
[28]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1
[29]:
psdf1 = ps.from_pandas(pdf1)
[30]:
psdf1
[30]:
A B C D E
2013-01-01 0.912558 -0.795645 -0.289115 0.187606 1.0
2013-01-02 -0.059703 -1.233897 0.316625 -1.226828 1.0
2013-01-03 0.332871 -1.262010 -0.434844 -0.579920 NaN
2013-01-04 0.924016 -1.022019 -0.405249 -1.036021 NaN

删除任何包含缺失数据的行。

[31]:
psdf1.dropna(how='any')
[31]:
A B C D E
2013-01-01 0.912558 -0.795645 -0.289115 0.187606 1.0
2013-01-02 -0.059703 -1.233897 0.316625 -1.226828 1.0

填充缺失数据。

[32]:
psdf1.fillna(value=5)
[32]:
A B C D E
2013-01-01 0.912558 -0.795645 -0.289115 0.187606 1.0
2013-01-02 -0.059703 -1.233897 0.316625 -1.226828 1.0
2013-01-03 0.332871 -1.262010 -0.434844 -0.579920 5.0
2013-01-04 0.924016 -1.022019 -0.405249 -1.036021 5.0

操作#

统计#

执行描述性统计

[33]:
psdf.mean()
[33]:
A    0.470519
B   -1.041829
C   -0.157720
D   -0.334542
dtype: float64

Spark 配置#

PySpark 中的各种配置都可以在 Spark 上的 pandas API 内部应用。例如,您可以启用 Arrow 优化以大幅加速内部 pandas 转换。另请参阅 PySpark 文档中关于 PySpark 使用 Apache Arrow 的 Pandas 指南。

[34]:
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")  # Keep its default value.
ps.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.
[35]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()
900 ms ± 186 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[36]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()
3.08 s ± 227 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[37]:
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)  # Set its default value back.

分组#

“分组(group by)”是指涉及以下一个或多个步骤的过程

  • 根据某些标准将数据分成组

  • 独立地将函数应用于每个组

  • 将结果组合成数据结构

[38]:
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})
[39]:
psdf
[39]:
A B C D
0 foo one 1.039632 -0.571950
1 bar one 0.972089 1.085353
2 foo two -1.931621 -2.579164
3 bar three -0.654371 -0.340704
4 foo two -0.157080 0.893736
5 bar two 0.882795 0.024978
6 foo one -0.149384 0.201667
7 foo three -1.355136 0.693883

分组,然后将sum()函数应用于结果组。

[40]:
psdf.groupby('A').sum()
[40]:
B C D
A
bar onethreetwo 1.200513 0.769627
foo onetwotwoonethree -2.553589 -1.361828

按多列分组会形成一个分层索引,我们仍然可以应用 sum 函数。

[41]:
psdf.groupby(['A', 'B']).sum()
[41]:
C D
A B
foo one 0.890248 -0.370283
two -2.088701 -1.685428
bar three -0.654371 -0.340704
foo three -1.355136 0.693883
bar two 0.882795 0.024978
one 0.972089 1.085353

绘图#

[42]:
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))
[43]:
psser = ps.Series(pser)
[44]:
psser = psser.cummax()
[45]:
psser.plot()

在 DataFrame 上,plot()方法是一种方便的方式来绘制所有带有标签的列

[46]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])
[47]:
psdf = ps.from_pandas(pdf)
[48]:
psdf = psdf.cummax()
[49]:
psdf.plot()

更多详细信息,请参阅绘图文档。

数据导入/导出#

CSV#

CSV 简单易用。请参阅此处写入 CSV 文件,以及此处读取 CSV 文件。

[50]:
psdf.to_csv('foo.csv')
ps.read_csv('foo.csv').head(10)
[50]:
A B C D
0 -1.187097 -0.134645 0.377094 -0.627217
1 0.331741 0.166218 0.377094 -0.627217
2 0.331741 0.439450 0.377094 0.365970
3 0.621620 0.439450 1.190180 0.365970
4 0.621620 0.439450 1.190180 0.365970
5 2.169198 1.069183 1.395642 0.365970
6 2.755738 1.069183 1.395642 1.045868
7 2.755738 1.069183 1.395642 1.045868
8 2.755738 1.069183 1.395642 1.045868
9 2.755738 1.508732 1.395642 1.556933

Parquet#

Parquet 是一种高效紧凑的文件格式,可实现更快的读写。请参阅此处写入 Parquet 文件,以及此处读取 Parquet 文件。

[51]:
psdf.to_parquet('bar.parquet')
ps.read_parquet('bar.parquet').head(10)
[51]:
A B C D
0 -1.187097 -0.134645 0.377094 -0.627217
1 0.331741 0.166218 0.377094 -0.627217
2 0.331741 0.439450 0.377094 0.365970
3 0.621620 0.439450 1.190180 0.365970
4 0.621620 0.439450 1.190180 0.365970
5 2.169198 1.069183 1.395642 0.365970
6 2.755738 1.069183 1.395642 1.045868
7 2.755738 1.069183 1.395642 1.045868
8 2.755738 1.069183 1.395642 1.045868
9 2.755738 1.508732 1.395642 1.556933

Spark IO#

此外,Spark 上的 pandas API 完全支持 Spark 的各种数据源,例如 ORC 和外部数据源。请参阅此处将其写入指定数据源,以及此处从数据源读取。

[52]:
psdf.spark.to_spark_io('zoo.orc', format="orc")
ps.read_spark_io('zoo.orc', format="orc").head(10)
[52]:
A B C D
0 -1.187097 -0.134645 0.377094 -0.627217
1 0.331741 0.166218 0.377094 -0.627217
2 0.331741 0.439450 0.377094 0.365970
3 0.621620 0.439450 1.190180 0.365970
4 0.621620 0.439450 1.190180 0.365970
5 2.169198 1.069183 1.395642 0.365970
6 2.755738 1.069183 1.395642 1.045868
7 2.755738 1.069183 1.395642 1.045868
8 2.755738 1.069183 1.395642 1.045868
9 2.755738 1.508732 1.395642 1.556933

更多详细信息,请参阅输入/输出文档。