从/到其他 DBMS¶
pandas API on Spark 中与其他 DBMS 交互的 API 与 pandas 中的 API 略有不同,因为 pandas API on Spark 利用 PySpark 中的 JDBC API 从/向其他 DBMS 读取和写入。
从/向外部 DBMS 读取/写入的 API 如下
|
将 SQL 数据库表读取到 DataFrame 中。 |
|
将 SQL 查询读取到 DataFrame 中。 |
|
将 SQL 查询或数据库表读取到 DataFrame 中。 |
pandas-on-Spark 需要 con
的规范 JDBC URL,并且能够为 PySpark JDBC API 中的选项 使用额外的关键字参数。
ps.read_sql(..., dbtable="...", driver="", keytab="", ...)
读取和写入 DataFrames¶
在下面的示例中,您将读取和写入 SQLite 中的一个表。
首先,通过 Python 的 SQLite 库创建如下所示的 example
数据库。稍后将读取到 pandas-on-Spark 中
import sqlite3
con = sqlite3.connect('example.db')
cur = con.cursor()
# Create table
cur.execute(
'''CREATE TABLE stocks
(date text, trans text, symbol text, qty real, price real)''')
# Insert a row of data
cur.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
# Save (commit) the changes
con.commit()
con.close()
Spark 上的 Pandas API 需要 JDBC 驱动程序才能读取,因此需要您的特定数据库的驱动程序位于 Spark 的类路径上。对于 SQLite JDBC 驱动程序,您可以下载它,例如,如下所示
curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar
之后,您应该首先将其添加到您的 Spark 会话中。一旦您添加了它,Spark 上的 Pandas API 将自动检测到 Spark 会话并利用它。
import os
from pyspark.sql import SparkSession
(SparkSession.builder
.master("local")
.appName("SQLite JDBC")
.config(
"spark.jars",
"{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
.config(
"spark.driver.extraClassPath",
"{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
.getOrCreate())
现在,您已准备好读取该表
import pyspark.pandas as ps
df = ps.read_sql("stocks", con="jdbc:sqlite:{}/example.db".format(os.getcwd()))
df
date trans symbol qty price
0 2006-01-05 BUY RHAT 100.0 35.14
您也可以将其写回如下所示的 stocks
表
df.price += 1
df.spark.to_spark_io(
format="jdbc", mode="append",
dbtable="stocks", url="jdbc:sqlite:{}/example.db".format(os.getcwd()))
ps.read_sql("stocks", con="jdbc:sqlite:{}/example.db".format(os.getcwd()))
date trans symbol qty price
0 2006-01-05 BUY RHAT 100.0 35.14
1 2006-01-05 BUY RHAT 100.0 36.14