入门

起点:SparkSession

Spark 中所有功能的入口点是 SparkSession 类。 要创建一个基本的 SparkSession,只需使用 SparkSession.builder

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
在 Spark 仓库中的 "examples/src/main/python/sql/basic.py" 查找完整的示例代码。

Spark 中所有功能的入口点是 SparkSession 类。 要创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。

Spark 中所有功能的入口点是 SparkSession 类。 要创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。

Spark 中所有功能的入口点是 SparkSession 类。 要初始化一个基本的 SparkSession,只需调用 sparkR.session()

sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
在 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R" 查找完整的示例代码。

请注意,首次调用时,sparkR.session() 会初始化一个全局 SparkSession 单例实例,并始终为后续调用返回对该实例的引用。 这样,用户只需初始化一次 SparkSession,然后像 read.df 这样的 SparkR 函数就可以隐式访问此全局实例,用户无需传递 SparkSession 实例。

Spark 2.0 中的 SparkSession 提供了对 Hive 功能的内置支持,包括使用 HiveQL 编写查询、访问 Hive UDF 以及从 Hive 表中读取数据的功能。 要使用这些功能,你无需拥有现有的 Hive 设置。

创建 DataFrames

使用 SparkSession,应用程序可以从 现有 RDD、从 Hive 表或从 Spark 数据源 创建 DataFrames。

例如,以下代码基于 JSON 文件的内容创建一个 DataFrame

# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
在 Spark 仓库中的 "examples/src/main/python/sql/basic.py" 查找完整的示例代码。

使用 SparkSession,应用程序可以从 现有 RDD、从 Hive 表或从 Spark 数据源 创建 DataFrames。

例如,以下代码基于 JSON 文件的内容创建一个 DataFrame

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。

使用 SparkSession,应用程序可以从 现有 RDD、从 Hive 表或从 Spark 数据源 创建 DataFrames。

例如,以下代码基于 JSON 文件的内容创建一个 DataFrame

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。

使用 SparkSession,应用程序可以从本地 R data.frame、从 Hive 表或从 Spark 数据源 创建 DataFrames。

例如,以下代码基于 JSON 文件的内容创建一个 DataFrame

df <- read.json("examples/src/main/resources/people.json")

# Displays the content of the DataFrame
head(df)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
## +----+-------+
## | age|   name|
## +----+-------+
## |null|Michael|
## |  30|   Andy|
## |  19| Justin|
## +----+-------+
在 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R" 查找完整的示例代码。

非类型化 Dataset 操作(也称为 DataFrame 操作)

DataFrames 为 ScalaJavaPythonR 中的结构化数据操作提供了一种特定于领域的语言。

如上所述,在 Spark 2.0 中,DataFrames 只是 Scala 和 Java API 中 Row 的 Dataset。 这些操作也称为“非类型化转换”,与具有强类型 Scala/Java Datasets 的“类型化转换”形成对比。

这里我们包括一些使用 Datasets 进行结构化数据处理的基本示例

在 Python 中,可以通过属性(df.age)或通过索引(df['age'])访问 DataFrame 的列。 虽然前者对于交互式数据探索很方便,但强烈建议用户使用后一种形式,这种形式在未来具有更好的兼容性,并且不会因为也是 DataFrame 类上的属性的列名而中断。

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+
在 Spark 仓库中的 "examples/src/main/python/sql/basic.py" 查找完整的示例代码。

有关可以在 DataFrame 上执行的操作类型的完整列表,请参阅 API 文档

除了简单的列引用和表达式之外,DataFrames 还拥有丰富的函数库,包括字符串操作、日期算术、常用数学运算等等。 完整的列表可在 DataFrame 函数参考 中找到。

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。

有关可以在 Dataset 上执行的操作类型的完整列表,请参阅 API 文档

除了简单的列引用和表达式之外,Datasets 还拥有丰富的函数库,包括字符串操作、日期算术、常用数学运算等等。 完整的列表可在 DataFrame 函数参考 中找到。

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。

有关可以在 Dataset 上执行的操作类型的完整列表,请参阅 API 文档

除了简单的列引用和表达式之外,Datasets 还拥有丰富的函数库,包括字符串操作、日期算术、常用数学运算等等。 完整的列表可在 DataFrame 函数参考 中找到。

# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")

# Show the content of the DataFrame
head(df)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin


# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
head(select(df, "name"))
##      name
## 1 Michael
## 2    Andy
## 3  Justin

# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
##      name (age + 1.0)
## 1 Michael          NA
## 2    Andy          31
## 3  Justin          20

# Select people older than 21
head(where(df, df$age > 21))
##   age name
## 1  30 Andy

# Count people by age
head(count(groupBy(df, "age")))
##   age count
## 1  19     1
## 2  NA     1
## 3  30     1
在 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R" 查找完整的示例代码。

有关可以在 DataFrame 上执行的操作类型的完整列表,请参阅 API 文档

除了简单的列引用和表达式之外,DataFrames 还拥有丰富的函数库,包括字符串操作、日期算术、常用数学运算等等。 完整的列表可在 DataFrame 函数参考 中找到。

以编程方式运行 SQL 查询

SparkSession 上的 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
在 Spark 仓库中的 "examples/src/main/python/sql/basic.py" 查找完整的示例代码。

SparkSession 上的 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。

SparkSession 上的 sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 Dataset<Row> 返回。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。

sql 函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame 返回。

df <- sql("SELECT * FROM table")
在 Spark 仓库中的 "examples/src/main/r/RSparkSQLExample.R" 查找完整的示例代码。

全局临时视图

Spark SQL 中的临时视图是会话范围的,如果创建它的会话终止,则会消失。 如果你想要一个在所有会话之间共享的临时视图,并保持活动状态直到 Spark 应用程序终止,你可以创建一个全局临时视图。 全局临时视图绑定到一个系统保留的数据库 global_temp,我们必须使用限定名称来引用它,例如 SELECT * FROM global_temp.view1

# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+
在 Spark 仓库中的 "examples/src/main/python/sql/basic.py" 查找完整的示例代码。
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。
CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl

SELECT * FROM global_temp.temp_view

创建 Datasets

Datasets 类似于 RDD,但是它们没有使用 Java 序列化或 Kryo,而是使用专门的 Encoder 来序列化对象,以便进行处理或通过网络传输。 虽然 encoders 和标准序列化都负责将对象转换为字节,但 encoders 是动态生成的代码,并使用一种允许 Spark 执行许多操作(如过滤、排序和哈希)而无需将字节反序列化为对象的方式。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private long age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public long getAge() {
    return age;
  }

  public void setAge(long age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Long> longEncoder = Encoders.LONG();
Dataset<Long> primitiveDS = spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);
Dataset<Long> transformedDS = primitiveDS.map(
    (MapFunction<Long, Long>) value -> value + 1L,
    longEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。

与 RDD 互操作

Spark SQL 支持两种不同的方法将现有 RDD 转换为 Datasets。 第一种方法使用反射来推断包含特定类型对象的 RDD 的 schema。 这种基于反射的方法可以生成更简洁的代码,并且在你编写 Spark 应用程序时已经知道 schema 时效果很好。

创建 Datasets 的第二种方法是通过一个编程接口,该接口允许你构造一个 schema,然后将其应用于现有的 RDD。 虽然这种方法更加冗长,但它允许你在列及其类型在运行时才知道时构造 Datasets。

使用反射推断 Schema

Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,并推断数据类型。 通过将键/值对列表作为 kwargs 传递给 Row 类来构造 Rows。 此列表的键定义了表的列名,并且通过对整个数据集进行采样来推断类型,类似于对 JSON 文件执行的推断。

from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin
在 Spark 仓库中的 "examples/src/main/python/sql/basic.py" 查找完整的示例代码。

Spark SQL 的 Scala 接口支持自动将包含 case 类别的 RDD 转换为 DataFrame。 case 类定义了表的 schema。 使用反射读取 case 类的参数名称,并将其作为列的名称。 case 类也可以是嵌套的或包含复杂类型,例如 Seqs 或 Arrays。 此 RDD 可以隐式转换为 DataFrame,然后注册为表。 表可以在后续 SQL 语句中使用。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。

Spark SQL 支持自动将 JavaBeans 的 RDD 转换为 DataFrame。 使用反射获得的 BeanInfo 定义了表的模式。 目前,Spark SQL 不支持包含 Map 字段的 JavaBeans。 但是,支持嵌套的 JavaBeans 和 ListArray 字段。 您可以通过创建一个实现 Serializable 接口并且具有所有字段的 getter 和 setter 方法的类来创建一个 JavaBean。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(line -> {
    String[] parts = line.split(",");
    Person person = new Person();
    person.setName(parts[0]);
    person.setAge(Integer.parseInt(parts[1].trim()));
    return person;
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
    stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。

以编程方式指定 Schema

当 kwargs 的字典无法提前定义时(例如,记录的结构编码在字符串中,或者文本数据集将被解析,并且字段将为不同的用户以不同的方式投影),可以分三个步骤以编程方式创建一个 DataFrame

  1. 从原始 RDD 创建元组或列表的 RDD;
  2. 创建一个由 StructType 表示的模式,该模式与步骤 1 中创建的 RDD 中的元组或列表的结构相匹配。
  3. 通过 SparkSession 提供的 createDataFrame 方法将该模式应用于 RDD。

例如

# Import data types
from pyspark.sql.types import StringType, StructType, StructField

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+
在 Spark 仓库中的 "examples/src/main/python/sql/basic.py" 查找完整的示例代码。

当 case class 无法提前定义时(例如,记录的结构编码在字符串中,或者文本数据集将被解析,并且字段将为不同的用户以不同的方式投影),可以分三个步骤以编程方式创建一个 DataFrame

  1. 从原始 RDD 创建 Row 的 RDD;
  2. 创建一个由 StructType 表示的模式,该模式与步骤 1 中创建的 RDD 中的 Row 的结构相匹配。
  3. 通过 SparkSession 提供的 createDataFrame 方法将该模式应用于 Row 的 RDD。

例如

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
在 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" 查找完整的示例代码。

当 JavaBean 类无法提前定义时(例如,记录的结构编码在字符串中,或者文本数据集将被解析,并且字段将为不同的用户以不同的方式投影),可以分三个步骤以编程方式创建一个 Dataset<Row>

  1. 从原始 RDD 创建 Row 的 RDD;
  2. 创建一个由 StructType 表示的模式,该模式与步骤 1 中创建的 RDD 中的 Row 的结构相匹配。
  3. 通过 SparkSession 提供的 createDataFrame 方法将该模式应用于 Row 的 RDD。

例如

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
  String[] attributes = record.split(",");
  return RowFactory.create(attributes[0], attributes[1].trim());
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
在 Spark 仓库中的 "examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java" 查找完整的示例代码。

标量函数

标量函数是每行返回单个值的函数,而不是聚合函数,聚合函数为一组行返回一个值。 Spark SQL 支持各种内置标量函数。它还支持用户定义的标量函数

聚合函数

聚合函数是在一组行上返回单个值的函数。内置聚合函数提供了常见的聚合,例如 count()count_distinct()avg()max()min() 等。用户不限于预定义的聚合函数,并且可以创建自己的聚合函数。有关用户定义的聚合函数的更多详细信息,请参阅用户定义的聚合函数的文档。