入门
- 起点:SparkSession
- 创建 DataFrame
- 无类型 Dataset 操作(也称为 DataFrame 操作)
- 以编程方式运行 SQL 查询
- 全局临时视图
- 创建 Dataset
- 与 RDD 交互
- 标量函数
- 聚合函数
起点:SparkSession
Spark 中所有功能的入口点是 SparkSession
类。要创建一个基本的 SparkSession
,只需使用 SparkSession.builder
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Spark 中所有功能的入口点是 SparkSession
类。要创建一个基本的 SparkSession
,只需使用 SparkSession.builder()
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
Spark 中所有功能的入口点是 SparkSession
类。要创建一个基本的 SparkSession
,只需使用 SparkSession.builder()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Spark 中所有功能的入口点是 SparkSession
类。要初始化一个基本的 SparkSession
,只需调用 sparkR.session()
sparkR.session(appName = "R Spark SQL basic example", sparkConfig = list(spark.some.config.option = "some-value"))
请注意,当第一次调用时,sparkR.session()
会初始化一个全局的 SparkSession
单例实例,并且始终返回对该实例的引用,以便在后续调用中使用。这样,用户只需要初始化一次 SparkSession
,然后 SparkR 函数(如 read.df
)就可以隐式地访问该全局实例,用户无需传递 SparkSession
实例。
SparkSession
在 Spark 2.0 中提供了对 Hive 功能的内置支持,包括使用 HiveQL 编写查询、访问 Hive UDF 以及从 Hive 表读取数据的能力。要使用这些功能,您无需拥有现有的 Hive 设置。
创建 DataFrame
使用 SparkSession
,应用程序可以从 现有的 RDD
、Hive 表或 Spark 数据源 创建 DataFrame。
例如,以下代码基于 JSON 文件的内容创建 DataFrame
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
使用 SparkSession
,应用程序可以从 现有的 RDD
、Hive 表或 Spark 数据源 创建 DataFrame。
例如,以下代码基于 JSON 文件的内容创建 DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
使用 SparkSession
,应用程序可以从 现有的 RDD
、Hive 表或 Spark 数据源 创建 DataFrame。
例如,以下代码基于 JSON 文件的内容创建 DataFrame
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
使用 SparkSession
,应用程序可以从本地 R 数据框、Hive 表或 Spark 数据源 创建 DataFrame。
例如,以下代码基于 JSON 文件的内容创建 DataFrame
df <- read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame
head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# Another method to print the first few rows and optionally truncate the printing of long values
showDF(df)
## +----+-------+
## | age| name|
## +----+-------+
## |null|Michael|
## | 30| Andy|
## | 19| Justin|
## +----+-------+
无类型 Dataset 操作(也称为 DataFrame 操作)
DataFrame 为 Scala、Java、Python 和 R 中的结构化数据操作提供了领域特定语言。
如上所述,在 Spark 2.0 中,DataFrame 只是 Scala 和 Java API 中的 Row
的 Dataset。这些操作也称为“无类型转换”,与具有强类型 Scala/Java Dataset 的“类型化转换”形成对比。
这里我们包含了一些使用 Dataset 进行结构化数据处理的基本示例
在 Python 中,可以通过属性 (df.age
) 或索引 (df['age']
) 访问 DataFrame 的列。虽然前者便于交互式数据探索,但强烈建议用户使用后者,后者是面向未来的,并且不会因 DataFrame 类中也是属性的列名而失效。
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+
# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
有关可以在 DataFrame 上执行的操作类型的完整列表,请参阅 API 文档。
除了简单的列引用和表达式之外,DataFrame 还拥有丰富的函数库,包括字符串操作、日期运算、常用数学运算等等。完整的列表可以在 DataFrame 函数参考 中找到。
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
有关可以在 Dataset 上执行的操作类型的完整列表,请参阅 API 文档。
除了简单的列引用和表达式之外,Dataset 还拥有丰富的函数库,包括字符串操作、日期运算、常用数学运算等等。完整的列表可以在 DataFrame 函数参考 中找到。
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
有关可以在 Dataset 上执行的操作类型的完整列表,请参阅 API 文档。
除了简单的列引用和表达式之外,Dataset 还拥有丰富的函数库,包括字符串操作、日期运算、常用数学运算等等。完整的列表可以在 DataFrame 函数参考 中找到。
# Create the DataFrame
df <- read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame
head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# Print the schema in a tree format
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# Select only the "name" column
head(select(df, "name"))
## name
## 1 Michael
## 2 Andy
## 3 Justin
# Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1))
## name (age + 1.0)
## 1 Michael NA
## 2 Andy 31
## 3 Justin 20
# Select people older than 21
head(where(df, df$age > 21))
## age name
## 1 30 Andy
# Count people by age
head(count(groupBy(df, "age")))
## age count
## 1 19 1
## 2 NA 1
## 3 30 1
有关可以在 DataFrame 上执行的操作类型的完整列表,请参阅 API 文档。
除了简单的列引用和表达式之外,DataFrame 还拥有丰富的函数库,包括字符串操作、日期运算、常用数学运算等等。完整的列表可以在 DataFrame 函数参考 中找到。
以编程方式运行 SQL 查询
SparkSession
上的 sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 DataFrame
返回。
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
SparkSession
上的 sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 DataFrame
返回。
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
SparkSession
上的 sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 Dataset<Row>
返回。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果作为 SparkDataFrame
返回。
df <- sql("SELECT * FROM table")
全局临时视图
Spark SQL 中的临时视图是会话范围的,如果创建它的会话终止,则会消失。如果您想拥有一个在所有会话之间共享并一直存在到 Spark 应用程序终止的临时视图,则可以创建一个全局临时视图。全局临时视图与系统保留的数据库 global_temp
绑定,我们必须使用限定名来引用它,例如 SELECT * FROM global_temp.view1
。
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
创建 Dataset
Dataset 与 RDD 类似,但是,它们不使用 Java 序列化或 Kryo,而是使用专门的 Encoder 来序列化对象,以便在网络上进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的,并使用一种格式,使 Spark 能够在不将字节反序列化回对象的情况下执行许多操作,例如过滤、排序和哈希。
case class Person(name: String, age: Long)
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private long age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getAge() {
return age;
}
public void setAge(long age) {
this.age = age;
}
}
// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// Encoders for most common types are provided in class Encoders
Encoder<Long> longEncoder = Encoders.LONG();
Dataset<Long> primitiveDS = spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);
Dataset<Long> transformedDS = primitiveDS.map(
(MapFunction<Long, Long>) value -> value + 1L,
longEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
与 RDD 交互
Spark SQL 支持两种不同的方法将现有 RDD 转换为 Dataset。第一种方法使用反射来推断包含特定类型对象的 RDD 的模式。这种基于反射的方法可以使代码更简洁,并且在您在编写 Spark 应用程序时已经知道模式的情况下效果很好。
创建 Dataset 的第二种方法是通过编程接口,该接口允许您构建模式,然后将其应用于现有 RDD。虽然这种方法更冗长,但它允许您在直到运行时才知道列及其类型的情况下构建 Dataset。
使用反射推断模式
Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,并推断数据类型。Row 是通过将键/值对列表作为 kwargs 传递给 Row 类来构建的。此列表的键定义表的列名,类型是通过对整个数据集进行采样来推断的,类似于对 JSON 文件执行的推断。
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
Spark SQL 的 Scala 接口支持将包含 case 类 的 RDD 自动转换为 DataFrame。case 类定义表的模式。case 类参数的名称使用反射读取,并成为列的名称。case 类也可以嵌套或包含复杂类型,例如 Seq
或 Array
。此 RDD 可以隐式转换为 DataFrame,然后注册为表。表可以在后续的 SQL 语句中使用。
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
Spark SQL 支持将 JavaBean 的 RDD 自动转换为 DataFrame。使用反射获得的 BeanInfo
定义表的模式。目前,Spark SQL 不支持包含 Map
字段的 JavaBean。嵌套的 JavaBean 以及 List
或 Array
字段是支持的。您可以通过创建一个实现 Serializable 并为其所有字段提供 getter 和 setter 的类来创建 JavaBean。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
以编程方式指定模式
当无法提前定义 kwargs 字典时(例如,记录的结构编码在字符串中,或者将解析文本数据集,并且不同用户将以不同的方式投影字段),可以通过三个步骤以编程方式创建 DataFrame
。
- 从原始 RDD 创建元组或列表的 RDD;
- 创建由
StructType
表示的模式,该模式与步骤 1 中创建的 RDD 中的元组或列表的结构匹配。 - 通过
SparkSession
提供的createDataFrame
方法将模式应用于 RDD。
例如
# Import data types
from pyspark.sql.types import StringType, StructType, StructField
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")
# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
当无法提前定义 case 类时(例如,记录的结构编码在字符串中,或者文本数据集将被解析,并且字段将针对不同的用户进行不同的投影),可以通过三个步骤以编程方式创建 DataFrame
。
- 从原始 RDD 创建一个
Row
的 RDD; - 创建由
StructType
表示的模式,该模式与步骤 1 中创建的 RDD 中Row
的结构匹配。 - 通过
SparkSession
提供的createDataFrame
方法将模式应用于Row
的 RDD。
例如
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
当无法提前定义 JavaBean 类时(例如,记录的结构编码在字符串中,或者文本数据集将被解析,并且字段将针对不同的用户进行不同的投影),可以通过三个步骤以编程方式创建 Dataset<Row>
。
- 从原始 RDD 创建一个
Row
的 RDD; - 创建由
StructType
表示的模式,该模式与步骤 1 中创建的 RDD 中Row
的结构匹配。 - 通过
SparkSession
提供的createDataFrame
方法将模式应用于Row
的 RDD。
例如
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
标量函数
标量函数是按行返回单个值的函数,与聚合函数相反,聚合函数返回一组行的值。Spark SQL 支持各种 内置标量函数。它还支持 用户定义的标量函数。
聚合函数
聚合函数是返回一组行的单个值的函数。 内置聚合函数 提供常见的聚合,例如 count()
、count_distinct()
、avg()
、max()
、min()
等。用户不限于预定义的聚合函数,可以创建自己的函数。有关用户定义的聚合函数的更多详细信息,请参阅 用户定义的聚合函数 的文档。