入门

起点:SparkSession

Spark 中所有功能的入口点是 SparkSession 类。要创建一个基本的 SparkSession,只需使用 SparkSession.builder

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
完整的示例代码位于 Spark 仓库中的“examples/src/main/python/sql/basic.py”文件。

Spark 中所有功能的入口点是 SparkSession 类。要创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。

Spark 中所有功能的入口点是 SparkSession 类。要创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。

Spark 中所有功能的入口点是 SparkSession 类。要初始化一个基本的 SparkSession,只需调用 sparkR.session()

sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
完整的示例代码位于 Spark 仓库中的“examples/src/main/r/RSparkSQLExample.R”文件。

请注意,当首次调用时,sparkR.session() 会初始化一个全局的 SparkSession 单例实例,并且在后续调用中始终返回此实例的引用。通过这种方式,用户只需初始化 SparkSession 一次,然后像 read.df 这样的 SparkR 函数将能够隐式地访问此全局实例,用户无需在各处传递 SparkSession 实例。

Spark 2.0 中的 SparkSession 内置支持 Hive 功能,包括使用 HiveQL 编写查询、访问 Hive UDFs 以及从 Hive 表读取数据。要使用这些功能,您无需拥有现有的 Hive 设置。

创建 DataFrame

通过 SparkSession,应用程序可以从现有的 RDD、Hive 表或Spark 数据源创建 DataFrame。

例如,以下示例根据 JSON 文件的内容创建了一个 DataFrame

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/python/sql/basic.py”文件。

通过 SparkSession,应用程序可以从现有的 RDD、Hive 表或Spark 数据源创建 DataFrame。

例如,以下示例根据 JSON 文件的内容创建了一个 DataFrame

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。

通过 SparkSession,应用程序可以从现有的 RDD、Hive 表或Spark 数据源创建 DataFrame。

例如,以下示例根据 JSON 文件的内容创建了一个 DataFrame

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。

通过 SparkSession,应用程序可以从本地 R data.frame、Hive 表或Spark 数据源创建 DataFrame。

例如,以下示例根据 JSON 文件的内容创建了一个 DataFrame

df <- read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame
head(df)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
## +----+-------+
## | age|   name|
## +----+-------+
## |null|Michael|
## |  30|   Andy|
## |  19| Justin|
## +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/r/RSparkSQLExample.R”文件。

非类型化 Dataset 操作 (即 DataFrame 操作)

DataFrames 在 PythonScalaJavaR 中提供了用于结构化数据操作的领域特定语言。

如上所述,在 Spark 2.0 中,DataFrame 在 Scala 和 Java API 中只是 Row 的 Dataset。这些操作也被称为“非类型化转换”,与强类型 Scala/Java Dataset 附带的“类型化转换”形成对比。

这里我们提供了一些使用 Dataset 进行结构化数据处理的基本示例

在 Python 中,可以通过属性(df.age)或索引(df['age'])来访问 DataFrame 的列。虽然前者方便进行交互式数据探索,但强烈建议用户使用后一种形式,因为它更具前瞻性,并且不会因列名与 DataFrame 类中的属性重名而导致问题。

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+
完整的示例代码位于 Spark 仓库中的“examples/src/main/python/sql/basic.py”文件。

有关可在 DataFrame 上执行的操作类型的完整列表,请参阅API 文档

除了简单的列引用和表达式之外,DataFrame 还拥有丰富的函数库,包括字符串操作、日期运算、常见数学运算等。完整列表可在DataFrame 函数参考中找到。

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。

有关可在 Dataset 上执行的操作类型的完整列表,请参阅API 文档

除了简单的列引用和表达式之外,Dataset 还拥有丰富的函数库,包括字符串操作、日期运算、常见数学运算等。完整列表可在DataFrame 函数参考中找到。

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。

有关可在 Dataset 上执行的操作类型的完整列表,请参阅API 文档

除了简单的列引用和表达式之外,Dataset 还拥有丰富的函数库,包括字符串操作、日期运算、常见数学运算等。完整列表可在DataFrame 函数参考中找到。

# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
head(df)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin


# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
head(select(df, "name"))
##      name
## 1 Michael
## 2    Andy
## 3  Justin

# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
##      name (age + 1.0)
## 1 Michael          NA
## 2    Andy          31
## 3  Justin          20

# Select people older than 21
head(where(df, df$age > 21))
##   age name
## 1  30 Andy

# Count people by age
head(count(groupBy(df, "age")))
##   age count
## 1  19     1
## 2  NA     1
## 3  30     1
完整的示例代码位于 Spark 仓库中的“examples/src/main/r/RSparkSQLExample.R”文件。

有关可在 DataFrame 上执行的操作类型的完整列表,请参阅API 文档

除了简单的列引用和表达式之外,DataFrame 还拥有丰富的函数库,包括字符串操作、日期运算、常见数学运算等。完整列表可在DataFrame 函数参考中找到。

以编程方式运行 SQL 查询

SparkSession 上的 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/python/sql/basic.py”文件。

SparkSession 上的 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。

SparkSession 上的 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 Dataset<Row> 返回。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。

sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame 返回。

df <- sql("SELECT * FROM table")
完整的示例代码位于 Spark 仓库中的“examples/src/main/r/RSparkSQLExample.R”文件。

全局临时视图

Spark SQL 中的临时视图是会话范围的,如果创建它的会话终止,它们将消失。如果您希望有一个在所有会话之间共享并持续到 Spark 应用程序终止的临时视图,您可以创建一个全局临时视图。全局临时视图与系统保留的数据库 global_temp 相关联,我们必须使用限定名称来引用它,例如 SELECT * FROM global_temp.view1

# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/python/sql/basic.py”文件。
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。
CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl

SELECT * FROM global_temp.temp_view

创建 Dataset

Dataset 类似于 RDD,然而,它们不使用 Java 序列化或 Kryo,而是使用专门的编码器(Encoder)来序列化对象,以便进行处理或通过网络传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并使用一种格式,允许 Spark 在不将字节反序列化回对象的情况下执行许多操作,例如过滤、排序和哈希。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private long age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public long getAge() {
    return age;
  }

  public void setAge(long age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Long> longEncoder = Encoders.LONG();
Dataset<Long> primitiveDS = spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);
Dataset<Long> transformedDS = primitiveDS.map(
    (MapFunction<Long, Long>) value -> value + 1L,
    longEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。

与 RDD 互操作

Spark SQL 支持两种不同的方法将现有 RDD 转换为 Dataset。第一种方法使用反射来推断包含特定类型对象的 RDD 的 Schema。这种基于反射的方法使得代码更简洁,并且在编写 Spark 应用程序时已知 Schema 的情况下效果很好。

创建 Dataset 的第二种方法是通过编程接口,该接口允许您构建 Schema,然后将其应用于现有 RDD。虽然这种方法更冗长,但它允许您在运行时才知道列及其类型的情况下构建 Dataset。

使用反射推断 Schema

Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,并推断数据类型。通过将键/值对列表作为 kwargs 传递给 Row 类来构造 Row。此列表的键定义了表的列名,并且类型是通过对整个数据集进行采样来推断的,类似于对 JSON 文件执行的推断。

from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin
完整的示例代码位于 Spark 仓库中的“examples/src/main/python/sql/basic.py”文件。

Spark SQL 的 Scala 接口支持自动将包含 Case Class 的 RDD 转换为 DataFrame。Case Class 定义了表的 Schema。使用反射读取 Case Class 参数的名称,并将其作为列名。Case Class 也可以是嵌套的,或者包含复杂类型,例如 SeqArray。此 RDD 可以隐式转换为 DataFrame,然后注册为表。表可以在后续的 SQL 语句中使用。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder: Encoder[Map[String, Any]] =
  org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。

Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame。通过反射获得的 BeanInfo 定义了表的 Schema。目前,Spark SQL 不支持包含 Map 字段的 JavaBean。但是,支持嵌套的 JavaBean 和 ListArray 字段。您可以通过创建一个实现 Serializable 接口并为其所有字段提供 getter 和 setter 方法的类来创建 JavaBean。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(line -> {
    String[] parts = line.split(",");
    Person person = new Person();
    person.setName(parts[0]);
    person.setAge(Integer.parseInt(parts[1].trim()));
    return person;
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
    stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。

以编程方式指定 Schema

当无法预先定义 kwargs 字典时(例如,记录结构以字符串形式编码,或者文本数据集将被解析,并且字段将针对不同用户以不同方式投影),可以通过三个步骤以编程方式创建 DataFrame

  1. 从原始 RDD 创建一个元组或列表的 RDD;
  2. 创建一个由 StructType 表示的 Schema,该 Schema 与在步骤 1 中创建的 RDD 中的元组或列表的结构匹配。
  3. 通过 SparkSession 提供的 createDataFrame 方法将 Schema 应用于 RDD。

例如

# Import data types
from pyspark.sql.types import StringType, StructType, StructField

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/python/sql/basic.py”文件。

当无法预先定义 Case Class 时(例如,记录结构以字符串形式编码,或者文本数据集将被解析,并且字段将针对不同用户以不同方式投影),可以通过三个步骤以编程方式创建 DataFrame

  1. 从原始 RDD 创建一个 Row 的 RDD;
  2. 创建一个由 StructType 表示的 Schema,该 Schema 与在步骤 1 中创建的 RDD 中的 Row 的结构匹配。
  3. 通过 SparkSession 提供的 createDataFrame 方法将 Schema 应用于 Row 的 RDD。

例如

import org.apache.spark.sql.{Encoder, Row}

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”文件。

当无法预先定义 JavaBean 类时(例如,记录结构以字符串形式编码,或者文本数据集将被解析,并且字段将针对不同用户以不同方式投影),可以通过三个步骤以编程方式创建 Dataset<Row>

  1. 从原始 RDD 创建一个 Row 的 RDD;
  2. 创建一个由 StructType 表示的 Schema,该 Schema 与在步骤 1 中创建的 RDD 中的 Row 的结构匹配。
  3. 通过 SparkSession 提供的 createDataFrame 方法将 Schema 应用于 Row 的 RDD。

例如

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
  String[] attributes = record.split(",");
  return RowFactory.create(attributes[0], attributes[1].trim());
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
完整的示例代码位于 Spark 仓库中的“examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java”文件。

标量函数

标量函数是为每行返回单个值的函数,与为一组行返回值的聚合函数相对。Spark SQL 支持多种内置标量函数。它还支持用户定义的标量函数

聚合函数

聚合函数是对一组行返回单个值的函数。内置聚合函数提供了常见的聚合操作,例如 count()count_distinct()avg()max()min() 等。用户不限于预定义的聚合函数,还可以创建自己的聚合函数。有关用户定义的聚合函数的更多详细信息,请参阅用户定义聚合函数的文档。