Spark SQL编程指南


1 概述

1、Spark SQL

SparkSQL的一个用途是直接执行SQL查询语句,您可以使用最基本的SQL语法或HiveQL语法。SparkSQL能够从已经存在的Hive中读取数据。SparkSQL将返回DataFrame的结果,如果SQL是用其他编程语言运行的。您也可以使用SparkSQL,也可以在命令行中使用JDBC/ODBC。

2、DataFrames

DataFrame是一个分布式的数据集合,其中的每个数据包含若干命名段。从概念上讲,她与关系型数据库的表或R,以及Python中的dataframe等效,只是在底层DataFrame做了更多优化。DataFrame能够从许多数据源(sources)装载和构建数据,例如:结构化数据文件、Hive中的表、外部数据库或已经存在的RDD。DataFrameAPI支持Scala、Java、Python、andR。

3、Datasets

Dataset是Spark-1.6中新增加的API,目前还处于试验阶段。Dataset希望将RDD的优点(强类型,可使用lambda表达式函数)与SparkSQL的优化执行引擎相结合。Dataset可以通过JVM对象构建(constructed)获得,而Dataset则可以使用各种transformation算子(map、flatMap、filter等等)。

DatasetAPI对于Scala和Java的支持接口是一致的,但是目前还不支持Python,尽管Python本身具有语言动态特性的优点(例如,您可以使用字段名称访问数据,row.columnName)。对于Python的完全支持将在以后的版本中添加。

2 入口:SQLContext

Spark SQL所有的功能入口都是SQLContext 类,及其子类。不过要创建一个SQLContext对象,首先需要有一个SparkContext对象。

val sc: SparkContext // 假设已经有一个 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用于包含RDD到DataFrame隐式转换操作
import sqlContext.implicits._

除了SQLContext之外,还可以创建HiveContext,这是SQLContext的一个超集。

除了SQLContext的功能之外,HiveContext还提供HiveQL完整语法、Hive表中UDF使用和数据访问。要使用HiveContext,不需要安装HiveContext,而且SQLContext可以使用的数据源和HiveContext一样可用。HiveContext是分开包装的,所以Spark的默认发行版可以包含所有的Hive依赖关系。如果这些依赖项没有问题(不会造成依赖冲突等),建议在Spark-1.3之前使用HiveContext。SQLContext在随后的Spark版本中会逐渐升级,以达到类似HiveContext功能的状态。

spark.sql.dialect选项可以指定其他SQL(或者SQL语言)。该参数可在SparkContext.setConf或SQL语句的SETkey=value命令指定。对于SQLContext,当前配置的惟一可选值是”sql”,它使用SparkSQL自带的简单SQL解析器。对于HiveContext,spark.sql.dialect的缺省值为”hiveql”,当然您也可以将其值返回sql。HiveSQL解析器现在支持更完整的SQL语法,所以建议大部分时间使用HiveContext。

3 创建DataFrame

Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD),或者Hive表,或者其他数据源(data sources

以下是一个从JSON文件创建DataFrame的小栗子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 将DataFrame内容打印到stdout
df.show()

4 DataFrame操作

DataFrame提供了结构化数据的领域专用语言支持,包括Scala, Java, Python and R。
这里我们给出一个结构化数据处理的基本示例:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建一个 DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 展示 DataFrame 的内容
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印数据树形结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 筛选出年龄大于21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 计算各个年龄的人数
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

除了简单的字段引用和表达式支持之外,DataFrame还提供了丰富的工具函数库,包括字符串组装,日期处理,常见的数学函数等。

5 编程方式执行SQL查询

SQLContext.sql可以执行一个SQL查询,并返回DataFrame结果。

val sqlContext = ... // 已有一个 SQLContext 对象
val df = sqlContext.sql("SELECT * FROM table")

6 创建Dataset

Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。

// 对普通类型数据的Encoder是由 importing sqlContext.implicits._ 自动提供的
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // 返回: Array(2, 3, 4)

// 以下这行不仅定义了case class,同时也自动为其创建了Encoder
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrame 只需提供一个和数据schema对应的class即可转换为 Dataset。Spark会根据字段名进行映射。
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

7 和RDD互操作

Spark SQL有两种方法将RDD转为DataFrame。

  • 1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;
  • 2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式。

1、利用反射推导schema

Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用SQL语句查询了。

// sc 是已有的 SparkContext 对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 为了支持RDD到DataFrame的隐式转换
import sqlContext.implicits._

// 定义一个case class.
// 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
// 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
case class Person(name: String, age: Int)

// 创建一个包含Person对象的RDD,并将其注册成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接执行SQL语句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
// 查询结果中每行的字段可以按字段索引访问:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名访问:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回结果: Map("name" -> "Justin", "age" -> 19)

2、编程方式定义Schema

如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:

  • 从已有的RDD创建一个包含Row对象的RDD
  • 用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配
  • 把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame

例如:

// sc 是已有的SparkContext对象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建一个RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 数据的schema被编码与一个字符串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各个数据类型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基于前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 将schema应用到包含Row对象的RDD上,得到一个DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 将DataFrame注册为table
peopleDataFrame.registerTempTable("people")

// 执行SQL语句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
// 并且其字段可以以索引访问,也可以用字段名访问
results.map(t => "Name: " + t(0)).collect().foreach(println)