SparkSQL

| 标签 Spark 

1. Spark SQL概述

1.1 SparkSQL 是什么

image.png Spark SQL 是Spark 用于结构化数据(structured data)处理的Spark 模块。

1.2 Hive and SparkSQL

SparkSQL 的前身是Shark,给熟悉RDBMS 但又不理解MapReduce 的技术人员提供快速上手的工具。 Hive 是早期唯一运行在Hadoop 上的SQL-on-Hadoop 工具。但是MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop 的效率,大量的SQL-on-Hadoop 工具开始产生,其中表现较为突出的是:

  • Drill
  • Impala
  • Shark

其中Shark 是伯克利实验室Spark 生态环境的组件之一,是基于Hive 所开发的工具,它修改了下图所示的右下角的内存管理、物理计划、执行三个模块,并使之能运行在Spark 引擎上。 image.png Shark 的出现,使得SQL-on-Hadoop 的性能比Hive 有了10-100 倍的提高。

image.png 但是,随着Spark 的发展,对于野心勃勃的Spark 团队来说,Shark 对于Hive 的太多依赖(如采用Hive 的语法解析器、查询优化器等等),制约了Spark 的One Stack Rule Them All 的既定方针,制约了Spark 各个组件的相互集成,所以提出了SparkSQL 项目。SparkSQL 抛弃原有Shark 的代码,汲取了Shark 的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive 兼容性等,重新开发了SparkSQL 代码;由于摆脱了对Hive 的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步,海阔天空”。

  • 数据兼容方面SparkSQL 不但兼容Hive,还可以从RDD、parquet 文件、JSON 文件中获取数据,未来版本甚至支持获取RDBMS 数据以及cassandra 等NOSQL 数据;
  • 性能优化方面除了采取In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进Cost Model 对查询进行动态评估、获取最佳物理计划等等;
  • 组件扩展方面无论是SQL 的语法解析器、分析器还是优化器都可以重新定义,进行扩展。

2014 年6 月1 日Shark 项目和SparkSQL 项目的主持人Reynold Xin 宣布:停止对Shark 的 开发,团队将所有资源放SparkSQL 项目上,至此,Shark 的发展画上了句话,但也因此发展出两个支线:SparkSQL 和Hive on Spark。 image.png 其中SparkSQL 作为Spark 生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark 是一个Hive 的发展计划,该计划将Spark 作为Hive 的底层引擎之一,也就是说,Hive 将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark 等引擎。 对于开发人员来讲,SparkSQL 可以简化RDD 的开发,提高开发效率,且执行效率非常快,所以实际工作中,基本上采用的就是SparkSQL。Spark SQL 为了简化RDD 的开发,提高开发效率,提供了2 个编程抽象,类似Spark Core 中的RDD

  • DataFrame
  • DataSet

1.3 SparkSQL 特点

1.3.1 易整合

无缝的整合了SQL 查询和Spark 编程 image.png

1.3.2 统一的数据访问

使用相同的方式连接不同的数据源 image.png

1.3.3 兼容Hive

在已有的仓库上直接运行SQL 或者HiveQL image.png

1.3.4 标准数据连接

通过JDBC 或者ODBC 来连接 image.png

1.4 DataFrame 是什么

在Spark 中,DataFrame 是一种以RDD 为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame 与RDD 的主要区别在于,前者带有schema 元信息,即DataFrame 所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL 得以洞察更多的结构信息,从而对藏于DataFrame 背后的数据源以及作用于DataFrame 之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core 只能在stage 层面进行简单、通用的流水线优化。 同时,与Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和map)。从API 易用性的角度上看,DataFrame API 提供的是一套高层的关系操作,比函数式的RDD API 要更加友好,门槛更低 image.png 上图直观地体现了DataFrame 和RDD 的区别。

左侧的RDD[Person]虽然以Person 为类型参数,但Spark 框架本身不了解Person 类的内部结构。而右侧的DataFrame 却提供了详细的结构信息,使得Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。 DataFrame 是为数据提供了Schema 的视图。可以把它当做数据库中的一张表来对待

DataFrame 也是懒执行的,但性能上比RDD 要高,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser 进行优化。

1.5 DataSet 是什么

DataSet 是分布式数据集合。DataSet 是Spark 1.6 中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD 的优势(强类型,使用强大的lambda 函数的能力)以及Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换(操作map,flatMap,filter等等)

  • DataSet 是DataFrame API 的一个扩展,是SparkSQL 最新的数据抽象
  • 用户友好的API 风格,既具有类型安全检查也具有DataFrame 的查询优化特性;
  • 用样例类来对DataSet 中定义数据的结构信息,样例类中每个属性的名称直接映射到DataSet 中的字段名称;
  • DataSet 是强类型的。比如可以有DataSet[Car],DataSet[Person]。
  • DataFrame 是DataSet 的特列,DataFrame=DataSet[Row] ,所以可以通过as 方法将DataFrame 转换为DataSet。Row 是一个类型,跟Car、Person 这些的类型一样,所有的表结构信息都用Row 来表示。获取数据时需要指定顺序

2. SparkSQL 核心编程

2.1 新的起点

Spark Core 中,如果想要执行应用程序,需要首先构建上下文环境对象SparkContext,Spark SQL 其实可以理解为对Spark Core 的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。 在老的版本中,SparkSQL 提供两种SQL 查询起始点:一个叫SQLContext,用于Spark自己提供的SQL 查询;一个叫HiveContext,用于连接Hive 的查询。 SparkSession 是Spark 最新的SQL 查询起始点,实质上是SQLContext 和HiveContext 的组合,所以在SQLContex 和HiveContext 上可用的API 在SparkSession 上同样是可以使用的。SparkSession 内部封装了SparkContext,所以计算实际上是由sparkContext 完成的。当我们使用spark-shell 的时候, spark 框架会自动的创建一个名称叫做spark 的SparkSession 对象, 就像我们以前可以自动获取到一个sc 来表示SparkContext 对象一样

Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1648219626755).
Spark session available as 'spark'.
Welcome to
____              __
/  __/__  ___ _____/ /__
_\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 3.0.0
/_/

scala> spark
res1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@714ab3a1

scala> sc
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7ec02149

2.2 DataFrame

Spark SQL 的DataFrame API 允许我们使用DataFrame 而不用必须去注册临时表或者生成SQL 表达式。DataFrame API 既有transformation 操作也有action 操作。

2.2.1 创建DataFrame

在Spark SQL 中SparkSession 是创建DataFrame 和执行SQL 的入口,创建DataFrame有三种方式:通过Spark 的数据源进行创建;从一个存在的RDD 进行转换;还可以从HiveTable 进行查询返回。

  1. 从Spark 数据源进行创建
    • 查看Spark 支持创建文件的数据源格式
      scala> spark.read.
      csv      jdbc   load     options   parquet   table   textFile      
      format   json   option   orc       schema    text   
      
  • 由于我们是spark-yarn模式,所以在HDFS的/user/mhk/input目录中创建user.json 文件
    {"username":"zhangsan","age":30}
    {"username":"lisi","age":20}
    {"username":"wangwu","age":40}
    
  • 读取json 文件创建DataFrame ```shell scala> val df = spark.read.json(“input/user.json”) df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

scala> df.show +—+——–+ |age|username| +—+——–+ | 30|zhangsan| | 20| lisi| | 40| wangwu| +—+——–+

注意:如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作为Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用bigint 接收,可以和
Long 类型转换,但是和Int 不能进行转换

2. 从RDD 进行转换

在后续章节中讨论

3. 从Hive Table 进行查询返回

在后续章节中讨论

### 2.2.2 SQL 语法
SQL 语法风格是指我们查询数据的时候使用SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助

1. 读取JSON 文件创建DataFrame
```shell
scala> val df = spark.read.json("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
  1. 对DataFrame 创建一个临时表
    scala> df.createTempView("user")
    
  2. 通过SQL 语句实现查询全表 ```shell scala> spark.sql(“select * from user”).show +—+——–+ |age|username| +—+——–+ | 30|zhangsan| | 20| lisi| | 40| wangwu| +—+——–+

scala> spark.sql(“select age from user”).show +—+ |age| +—+ | 30| | 20| | 40| +—+

scala> spark.sql(“select avg(age) from user”).show +——–+
|avg(age)| +——–+ | 30.0| +——–+

注意:普通临时表是Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

4. 对于DataFrame 创建一个全局表
```shell
scala> df.createOrReplaceGlobalTempView("people")
2022-03-29 09:05:29,299 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
  1. 通过SQL 语句实现查询全表
    scala> spark.newSession.sql("select * from global_temp.people").show
    +---+--------+
    |age|username|
    +---+--------+
    | 30|zhangsan|
    | 20|    lisi|
    | 40|  wangwu|
    +---+--------+
    

    2.2.3 DSL 语法

    DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在Scala, Java, Python 和R 中使用DSL,使用DSL 语法风格不必去创建临时视图了

  2. 创建一个DataFrame
    scala> df
    res6: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
    
  3. 查看DataFrame 的Schema 信息
    scala> df.printSchema
    root
     |-- age: long (nullable = true)
     |-- username: string (nullable = true)
    
  4. 只查看”username”列数据
    scala> df.select("username").show
    +--------+
    |username|
    +--------+
    |zhangsan|
    |    lisi|
    |  wangwu|
    +--------+
    
  5. 查看”username”列数据以及”age+1”数据

注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

scala> df.select($"age" + 1).show
+---------+
|(age + 1)|
+---------+
|       31|
|       21|
|       41|
+---------+

scala> df.select($"username",$"age" + 1).show
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan|       31|
|    lisi|       21|
|  wangwu|       41|
+--------+---------+


scala> df.select('age + 1).show
+---------+
|(age + 1)|
+---------+
|       31|
|       21|
|       41|
+---------+

scala> df.select('username,'age + 1).show
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan|       31|
|    lisi|       21|
|  wangwu|       41|
+--------+---------+
  1. 查询年龄大于20的数据
    scala> df.filter('age>20).show
    +---+--------+
    |age|username|
    +---+--------+
    | 30|zhangsan|
    | 40|  wangwu|
    +---+--------+
    

    2.2.4 RDD 转换为DataFrame

    在IDEA 中开发程序时,如果需要RDD 与DF 或者DS 之间互相操作,那么需要引入 import spark.implicits._

这里的spark 不是Scala 中的包名,而是创建的sparkSession 对象的变量名称,所以必须先创建SparkSession 对象再导入。这里的spark 对象不能使用var 声明,因为Scala 只支持val 修饰的对象的引入。 spark-shell 中无需导入,自动完成此操作

scala> val rdd = sc.makeRDD(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at makeRDD at <console>:24

scala> val df = rdd.toDF("id")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> df.show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+

实际开发中,一般通过样例类将RDD 转换为DataFrame

scala> case class User(name:String, age:Int)
defined class User

scala> val list = List(User("zhangsan",30),User("lisi",40))
list: List[User] = List(User(zhangsan,30), User(lisi,40))

scala> list.toDF.show
+--------+---+
|    name|age|
+--------+---+
|zhangsan| 30|
|    lisi| 40|
+--------+---+

2.2.5 DataFrame 转换为RDD DataFrame 其实就是对RDD 的封装,所以可以直接获取内部的RDD

scala> val rdd = sc.makeRDD(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at makeRDD at <console>:24

scala> val df = rdd.toDF("id")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> df.show
+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
+---+

scala> df.rdd
res18: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[57] at rdd at <console>:26

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[57] at rdd at <console>:26

scala> rdd.collect
res21: Array[org.apache.spark.sql.Row] = Array([1], [2], [3], [4])

注意:此时得到的RDD 存储类型为Row

2.3 DataSet

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

2.3.1 创建DataSet

  1. 使用样例类序列创建DataSet ```shell scala> case class User(name:String, age:Int) defined class User

scala> val list = List(User(“zhangsan”,30),User(“lisi”,40)) list: List[User] = List(User(zhangsan,30), User(lisi,40))

scala> list.to to toDF toIterable toMap toSet toTraversable
toArray toDS toIterator toParArray toStream toVector
toBuffer toIndexedSeq toList toSeq toString

scala> val ds = list.toDS ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> ds.show +——–+—+ | name|age| +——–+—+ |zhangsan| 30| | lisi| 40| +——–+—+


2. 使用基本类型的序列创建DataSet
```shell
scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
|		 1|
|	   2|
|	   3|
|	   4|
|	   5|
+-----+

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD 来得到DataSet

2.3.2 RDD 转换为DataSet

SparkSQL 能够自动将包含有case 类的RDD 转换成DataSet,case 类定义了table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如Seq 或者Array 等复杂的结构。

scala> val rdd = sc.makeRDD(List(User(30,"zhangsan"), User(40,"lisi")))
rdd: org.apache.spark.rdd.RDD[User] = ParallelCollectionRDD[12] at makeRDD at <console>:26

scala> rdd.toDS
res5: org.apache.spark.sql.Dataset[User] = [age: bigint, username: string]
把数据封装成样例类

直接变也行,没有上一种方便
scala> val rdd = sc.makeRDD(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at makeRDD at <console>:24

scala> rdd.toDS
res6: org.apache.spark.sql.Dataset[Int] = [value: int]

2.3.3 DataSet 转换为RDD

DataSet 其实也是对RDD 的封装,所以可以直接获取内部的RDD

scala> val rdd = sc.makeRDD(List(User(30,"zhangsan"), User(40,"lisi")))
rdd: org.apache.spark.rdd.RDD[User] = ParallelCollectionRDD[14] at makeRDD at <console>:26

scala> val ds = rdd.toDS
ds: org.apache.spark.sql.Dataset[User] = [age: bigint, username: string]

scala> val rdd1 = ds.rdd
rdd1: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[17] at rdd at <console>:25

2.4 DataFrame 和DataSet 转换

DataFrame 其实是DataSet 的特例,所以它们之间是可以互相转换的。

  • DataFrame 转换为DataSet ```shell scala> val df = spark.read.json(“input/user.json”) df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

scala> df res1: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

scala> case class User(age:Long, username:String) defined class User

scala> val ds = df.as[User] ds: org.apache.spark.sql.Dataset[User] = [age: bigint, username: string]

scala> ds.show +—+——–+
|age|username| +—+——–+ | 30|zhangsan| | 20| lisi| | 40| wangwu| +—+——–+


- DataSet 转换为DataFrame
```shell
scala> ds.toDF
res4: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

2.5 RDD、DataFrame、DataSet 三者的关系

在SparkSQL 中Spark 为我们提供了两个新的抽象,分别是DataFrame 和DataSet。他们和RDD 有什么区别呢?首先从版本的产生上来看:

  • Spark1.0 => RDD
  • Spark1.3 => DataFrame
  • Spark1.6 => Dataset

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark 版本中,DataSet 有可能会逐步取代RDD和DataFrame 成为唯一的API 接口。

2.3.1 三者的共性

  • RDD、DataFrame、DataSet 全都是spark 平台下的分布式弹性数据集,为处理超大型数据提供便利;
  • 三者都有惰性机制,在进行创建、转换,如map 方法时,不会立即执行,只有在遇到Action 如foreach 时,三者才会开始遍历运算;
  • 三者有许多共同的函数,如filter,排序等;
  • 在对DataFrame 和Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession 对象后尽量直接导入)
  • 三者都会根据Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  • 三者都有partition 的概念
  • DataFrame 和DataSet 均可使用模式匹配获取各个字段的值和类型

    2.3.2 三者的区别

    1) RDD

  • RDD 一般和spark mllib 同时使用
  • RDD 不支持sparksql 操作

2) DataFrame

  • 与RDD 和Dataset 不同,DataFrame 每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
  • DataFrame 与DataSet 一般不与spark mllib 同时使用
  • DataFrame 与DataSet 均支持SparkSQL 的操作,比如select,groupby 之类,还能注册临时表/视窗,进行sql 语句操作
  • DataFrame 与DataSet 支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)

3) DataSet

  • Dataset 和DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是DataSet 的一个特例 type DataFrame = Dataset[Row]
  • DataFrame 也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset 中,每一行是什么类型是不一定的,在自定义了case class 之后可以很自由的获得每一行的信息

    2.3.1 三者的互相转换

    image.png

    2.6 IDEA 开发SparkSQL

    实际开发中,都是使用IDEA 进行开发的。

    2.6.1 添加依赖

    ```xml

org.apache.spark spark-sql_2.12 3.0.0
### 2.6.2 代码实现
```scala
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Spark01_SparkSQL_Basic {
  def main(args: Array[String]): Unit = {

    //TODO 创建SparkSQL的运行环境
    val sparkConf = new SparkConf().setAppName("sparkSQL").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //TODO 执行逻辑操作

    // DataFrame
    //val df: DataFrame = spark.read.json("datas/user.json")
    //df.show()

    //  DataFrame => SQL
    //df.createOrReplaceTempView("user")

    //spark.sql("select * from user").show()
    //spark.sql("select age,username from user").show()
    //spark.sql("select avg(age) from user").show()

    //  DataFrame => DSL
    //  在使用DataFrame时,如果涉及到转换操作,需要引入转换规则
    //  注意这里的spark是对象名不是包名
    import spark.implicits._
    //df.select("age", "username").show()
    //df.select($"age" + 1).show()
    //df.select('age + 1).show()
    //df.select('username,'age + 1).show()

    // DataSet
    // DataFrame其实是特定泛型的DataSet   type DataFrame = Dataset[Row]
    //val seq = Seq(1, 2, 3, 4)
    //val ds: Dataset[Int] = seq.toDS()
    //ds.show()


    // RDD <=> DataFrame
    val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhangsan", 30), (2, "lisi", 40)))
    val df: DataFrame = rdd.toDF("id", "name", "age")
    val rowRDD: RDD[Row] = df.rdd


    // DataFrame <=> DataSet
    val ds: Dataset[User] = df.as[User]

    val df1: DataFrame = ds.toDF()

    // RDD <=> DataSet
    val ds1: Dataset[User] = rdd.map {
      case (id, name, age) => {
        User(id, name, age)
      }
    }.toDS()

    val userRDD: RDD[User] = ds1.rdd

    //TODO 关闭环境
    spark.close()
  }

  case class User(id: Int, name: String, age: Int)
}

2.7 数据的加载和保存

2.7.1 通用的加载和保存方式

SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式为parquet

1) 加载数据 spark.read.load 是加载数据的通用方法

scala> spark.read.
csv      jdbc   load     options   parquet   table   textFile      
format   json   option   orc       schema    text 

image.png

[mhk@hadoop102 resources]$ pwd
/opt/module/spark-yarn/examples/src/main/resources
[mhk@hadoop102 resources]$ ll
总用量 44
drwxr-xr-x. 3 mhk mhk   57 6月   6 2020 dir1
-rw-r--r--. 1 mhk mhk  130 6月   6 2020 employees.json
-rw-r--r--. 1 mhk mhk  240 6月   6 2020 full_user.avsc
-rw-r--r--. 1 mhk mhk 5812 6月   6 2020 kv1.txt
-rw-r--r--. 1 mhk mhk   49 6月   6 2020 people.csv
-rw-r--r--. 1 mhk mhk   73 6月   6 2020 people.json
-rw-r--r--. 1 mhk mhk   32 6月   6 2020 people.txt
-rw-r--r--. 1 mhk mhk  185 6月   6 2020 user.avsc
-rw-r--r--. 1 mhk mhk  334 6月   6 2020 users.avro
-rw-r--r--. 1 mhk mhk  547 6月   6 2020 users.orc
-rw-r--r--. 1 mhk mhk  615 6月   6 2020 users.parquet

[mhk@hadoop102 resources]$ hadoop fs -put users.parquet /user/mhk
2022-03-29 18:13:32,224 INFO  [Thread-7] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

scala> val df = spark.read.load("input/users.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> df.show
+------+--------------+----------------+                                        
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

如果读取不同格式的数据,可以对不同的数据格式进行设定

scala> spark.read.format("…")[.option("…")].load("…")
scala> val df = spark.read.format("json").load("input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

scala> df.show
+---+--------+
|age|username|
+---+--------+
| 30|zhangsan|
| 20|    lisi|
| 40|  wangwu|
+---+--------+
  • format(“…”):指定加载的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。
  • load(“…”):在”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”格式下需要传入加载数据的路径。
  • option(“…”):在”jdbc”格式下需要传入JDBC 相应参数,url、user、password 和dbtable

我们前面都是使用read API 先把文件加载到 DataFrame 然后再查询,其实,我们也可以直接在文件上进行查询: 文件格式.文件路径

scala> spark.sql("select * from json.`input/user.json`").show
2022-03-29 18:30:22,496 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2022-03-29 18:30:22,498 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2022-03-29 18:30:32,093 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
2022-03-29 18:30:32,093 WARN metastore.ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore mhk@10.211.55.11
2022-03-29 18:30:33,079 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
2022-03-29 18:30:33,177 WARN metastore.ObjectStore: Failed to get database json, returning NoSuchObjectException
+---+--------+
|age|username|
+---+--------+
| 30|zhangsan|
| 20|    lisi|
| 40|  wangwu|
+---+--------+

2) 保存数据 df.write.save 是保存数据的通用方法

scala> df.write.save("output_SparkSQL")

DDF72B0F9D37D98D3035DB9B2D0FE1E2.jpg

如果保存不同格式的数据,可以对不同的数据格式进行设定

scala>df.write.format("…")[.option("…")].save("…")
  • format(“…”):指定保存的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。
  • save (“…”):在”csv”、”orc”、”parquet”和”textFile”格式下需要传入保存数据的路径。
  • option(“…”):在”jdbc”格式下需要传入JDBC 相应参数,url、user、password 和dbtable

保存操作可以使用SaveMode, 用来指明如何处理数据,使用mode()方法来设置。有一点很重要: 这些SaveMode 都是没有加锁的, 也不是原子操作。 SaveMode 是一个枚举类,其中的常量包括:

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) “error”(default) 如果文件已经存在则抛出异常
SaveMode.Append “append” 如果文件已经存在则追加
SaveMode.Overwrite “overwrite” 如果文件已经存在则覆盖
SaveMode.Ignore “ignore” 如果文件已经存在则忽略
scala> df.write.format("json").save("output1_SparkSQL")

scala> df.write.format("json").mode("append").save("output1_SparkSQL")

image.png

2.8.2 Parquet

Spark SQL 的默认数据源为Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式存储格式。 数据源为Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。

2.8.3 JSON

Spark SQL 能够自动推测JSON 数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载JSON 文件。 注意:Spark 读取的JSON 文件不是传统的JSON 文件,每一行都应该是一个JSON 串。格式如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]

1) 导入隐式转换

import spark.implicits._

2) 加载JSON 文件

val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)

3) 创建临时表

peopleDF.createOrReplaceTempView("people")

4) 数据查询

val teenagerNamesDF = spark.sql("SELECT	name	FROM	people	WHERE	age	BETWEEN	13 AND 19")
teenagerNamesDF.show()
+------+
|  name|
+------+
|Justin|
+------+

2.8.4 CSV

Spark SQL 可以配置CSV 文件的列表信息,读取CSV 文件,CSV 文件的第一行设置为数据列

[mhk@hadoop102 resources]$ hadoop fs -put people.csv /user/mhk/input
2022-03-29 18:59:21,263 INFO  [Thread-7] sasl.SaslDataTransferClient (SaslDataTransferClient.java:checkTrustAndSend(239)) - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

scala> val df = spark.read.format("csv").option("sep", ";").option("inferSchema","true").option("header", "true").load("input/people.csv")
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> df.show
+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+

2.8.5 MySQL

Spark SQL 可以通过JDBC 从关系型数据库中读取数据的方式创建DataFrame,通过对

DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用spark-shell 操作,可在启动shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark 的类路径下。

bin/spark-shell --jars mysql-connector-java-5.1.27-bin.jar

我们这里只演示在Idea 中通过JDBC 对Mysql 进行操作 1) 导入依赖

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

2) 操作

object Spark04_SparkSQL_JDBC {
  def main(args: Array[String]): Unit = {

    //TODO 创建SparkSQL的运行环境
    val sparkConf = new SparkConf().setAppName("sparkSQL").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //读取MySQL数据
    val df = spark.read
        .format("jdbc")
        .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("user", "root")
        .option("password", "000000")
        .option("dbtable", "user")
        .load()

    //df.show()

    //保存数据
    df.write
        .format("jdbc")
        .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("user", "root")
        .option("password", "000000")
        .option("dbtable", "user1")
        .mode(SaveMode.Append)
        .save()


    //TODO 关闭环境
    spark.close()
  }
}

2.8.6 Hive

Apache Hive 是Hadoop 上的SQL 引擎,Spark SQL 编译时可以包含Hive 支持,也可以不包含。包含Hive 支持的Spark SQL 可以支持Hive 表访问、UDF (用户自定义函数)以及Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在Spark SQL 中包含Hive 的库,并不需要事先安装Hive。一般来说,最好还是在编译Spark SQL 时引入Hive 支持,这样就可以使用这些特性了。如果你下载的是二进制版本的Spark,它应该已经在编译时添加了Hive 支持。 若要把Spark SQL 连接到一个部署好的Hive 上,你必须把hive-site.xml 复制到Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive,Spark SQL 也可以运行。需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作metastore_db。此外,如果你尝试使用HiveQL 中的CREATE TABLE (并非CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的/user/hive/warehouse 目录中(如果你的classpath 中有配好的 hdfs-site.xml,默认的文件系统就是HDFS,否则就是本地文件系统)。

spark-shell 默认是Hive 支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。

1) 内嵌的HIVE

如果使用Spark 内嵌的Hive, 则什么都不用做, 直接使用即可. Hive 的元数据存储在derby 中, 默认仓库地址:$SPARK_HOME/spark-warehouse

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

[mhk@hadoop102 spark-yarn]$ ll
总用量 132
drwxr-xr-x. 3 mhk mhk  4096 3月  25 23:04 bin
drwxr-xr-x. 2 mhk mhk   178 3月  17 21:44 conf
drwxr-xr-x. 5 mhk mhk    50 6月   6 2020 data
-rw-rw-r--. 1 mhk mhk   689 3月  29 18:30 derby.log
drwxr-xr-x. 4 mhk mhk    29 6月   6 2020 examples
drwxr-xr-x. 2 mhk mhk 12288 6月   6 2020 jars
drwxr-xr-x. 4 mhk mhk    38 6月   6 2020 kubernetes
-rw-r--r--. 1 mhk mhk 23312 6月   6 2020 LICENSE
drwxr-xr-x. 2 mhk mhk  4096 6月   6 2020 licenses
drwxrwxr-x. 2 mhk mhk  4096 3月  18 09:11 logs
drwxrwxr-x. 5 mhk mhk   133 3月  29 18:30 metastore_db
-rw-r--r--. 1 mhk mhk 57677 6月   6 2020 NOTICE
drwxr-xr-x. 7 mhk mhk   275 6月   6 2020 python
drwxr-xr-x. 3 mhk mhk    17 6月   6 2020 R
-rw-r--r--. 1 mhk mhk  4488 6月   6 2020 README.md
-rw-r--r--. 1 mhk mhk   183 6月   6 2020 RELEASE
drwxr-xr-x. 2 mhk mhk  4096 6月   6 2020 sbin
drwxrwxr-x. 2 mhk mhk     6 3月  17 20:13 work
drwxr-xr-x. 2 mhk mhk    42 6月   6 2020 yarn

scala> spark.read.json("input/user.json")
res21: org.apache.spark.sql.DataFrame = [age: bigint, username: string]         

scala> df.createOrReplaceTempView("user")

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |     user|       true|
+--------+---------+-----------+

scala> spark.sql("create table ids(id int)")
2022-03-29 20:49:29,007 WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
2022-03-29 20:49:29,459 WARN conf.HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
2022-03-29 20:49:29,459 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
2022-03-29 20:49:29,460 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist
2022-03-29 20:49:29,578 WARN metastore.HiveMetaStore: Location: file:/opt/module/spark-yarn/spark-warehouse/ids specified for non-external table:ids
res24: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|      ids|      false|
|        |     user|       true|
+--------+---------+-----------+


scala> spark.sql("load data local inpath '/home/mhk/id.txt' into table ids")
res26: org.apache.spark.sql.DataFrame = []

[mhk@hadoop102 spark-yarn]$ ll
总用量 132
drwxr-xr-x. 3 mhk mhk  4096 3月  25 23:04 bin
drwxr-xr-x. 2 mhk mhk   178 3月  17 21:44 conf
drwxr-xr-x. 5 mhk mhk    50 6月   6 2020 data
-rw-rw-r--. 1 mhk mhk   689 3月  29 18:30 derby.log
drwxr-xr-x. 4 mhk mhk    29 6月   6 2020 examples
drwxr-xr-x. 2 mhk mhk 12288 6月   6 2020 jars
drwxr-xr-x. 4 mhk mhk    38 6月   6 2020 kubernetes
-rw-r--r--. 1 mhk mhk 23312 6月   6 2020 LICENSE
drwxr-xr-x. 2 mhk mhk  4096 6月   6 2020 licenses
drwxrwxr-x. 2 mhk mhk  4096 3月  18 09:11 logs
drwxrwxr-x. 5 mhk mhk   133 3月  29 18:30 metastore_db
-rw-r--r--. 1 mhk mhk 57677 6月   6 2020 NOTICE
drwxr-xr-x. 7 mhk mhk   275 6月   6 2020 python
drwxr-xr-x. 3 mhk mhk    17 6月   6 2020 R
-rw-r--r--. 1 mhk mhk  4488 6月   6 2020 README.md
-rw-r--r--. 1 mhk mhk   183 6月   6 2020 RELEASE
drwxr-xr-x. 2 mhk mhk  4096 6月   6 2020 sbin
drwxr-xr-x. 3 mhk mhk    17 3月  29 20:49 spark-warehouse
drwxrwxr-x. 2 mhk mhk     6 3月  17 20:13 work
drwxr-xr-x. 2 mhk mhk    42 6月   6 2020 yarn

scala> spark.sql("select * from ids").show
+----+
|  id|
+----+
|1002|
|1003|
+----+

在实际使用中, 几乎没有任何人会使用内置的Hive

2) 外部的HIVE 如果想连接外部已经部署好的Hive,需要通过以下几个步骤:

  • Spark 要接管Hive 需要把hive-site.xml 拷贝到conf/目录下
  • 把Mysql 的驱动copy 到jars/目录下
  • 如果访问不到hdfs,则需要把core-site.xml 和hdfs-site.xml 拷贝到conf/目录下
  • 重启spark-shell ```shell [mhk@hadoop102 conf]$ cp hive-site.xml /opt/module/spark-yarn/conf/

[mhk@hadoop102 spark-yarn]$ cd conf/ [mhk@hadoop102 conf]$ ll 总用量 40 -rw-r–r–. 1 mhk mhk 1105 6月 6 2020 fairscheduler.xml.template -rw-rw-r–. 1 mhk mhk 2113 3月 29 20:57 hive-site.xml -rw-r–r–. 1 mhk mhk 2023 6月 6 2020 log4j.properties.template -rw-r–r–. 1 mhk mhk 9141 6月 6 2020 metrics.properties.template -rw-r–r–. 1 mhk mhk 865 6月 6 2020 slaves.template -rw-r–r–. 1 mhk mhk 1452 3月 17 21:44 spark-defaults.conf -rwxr-xr-x. 1 mhk mhk 4603 3月 17 21:44 spark-env.sh

scala> spark.sql(“show tables”).show 2022-03-29 21:01:35,782 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 2022-03-29 21:01:35,881 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is ‘tez’ and will be reset to ‘mr’ to disable useless hive logic 2022-03-29 21:01:37,630 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 2022-03-29 21:01:37,630 WARN conf.HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist 2022-03-29 21:01:37,634 WARN conf.HiveConf: HiveConf of name hive.stats.retries.wait does not exist 2022-03-29 21:01:39,562 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 2022-03-29 21:01:42,125 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException +——–+———+———–+ |database|tableName|isTemporary| +——–+———+———–+ | default| student| false| | default| student1| false| | default| student2| false| | default| student4| false| | default| student5| false| | default| student6| false| | default| test| false| | default| test1| false| +——–+———+———–+

**3) 运行Spark SQL CLI**
Spark SQL CLI 可以很方便的在本地运行Hive 元数据服务以及从命令行执行查询任务。在Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似一Hive 窗口
```shell
bin/spark-sql

4)运行 Spark beeline Spark Thrift Server 是Spark 社区基于HiveServer2 实现的一个Thrift 服务。旨在无缝兼容HiveServer2。因为Spark Thrift Server 的接口和协议都和HiveServer2 完全一致,因此我们部署好Spark Thrift Server 后,可以直接使用hive 的beeline 访问Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代HiveServer2,因此它依旧可以和Hive Metastore 进行交互,获取到hive 的元数据。

如果想连接Thrift Server,需要通过以下几个步骤:

  • Spark 要接管Hive 需要把hive-site.xml 拷贝到conf/目录下
  • 把Mysql 的驱动copy 到jars/目录下
  • 如果访问不到hdfs,则需要把core-site.xml 和hdfs-site.xml 拷贝到conf/目录下
  • 启动Thrift Server ```shell [mhk@hadoop102 spark-yarn]$ sbin/start-thriftserver.sh starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /opt/module/spark-yarn/logs/spark-mhk-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-hadoop102.out

[mhk@hadoop102 spark-yarn]$ bin/beeline -u jdbc:hive2://hadoop102:10000 -n mhk Connecting to jdbc:hive2://hadoop102:10000 2022-03-29 21:22:51,928 INFO jdbc.Utils: Supplied authorities: hadoop102:10000 2022-03-29 21:22:51,929 INFO jdbc.Utils: Resolved authority: hadoop102:10000 Connected to: Spark SQL (version 3.0.0) Driver: Hive JDBC (version 2.3.7) Transaction isolation: TRANSACTION_REPEATABLE_READ Beeline version 2.3.7 by Apache Hive 0: jdbc:hive2://hadoop102:10000>

0: jdbc:hive2://hadoop102:10000> show tables . . . . . . . . . . . . . . . .> ; +———–+————+————–+ | database | tableName | isTemporary | +———–+————+————–+ | default | student | false | | default | student1 | false | | default | student2 | false | | default | student4 | false | | default | student5 | false | | default | student6 | false | | default | test | false | | default | test1 | false | +———–+————+————–+

0: jdbc:hive2://hadoop102:10000> show databases; +————+ | namespace | +————+ | default | | hive | | hive1 | | hive2 | +————+ 4 rows selected (0.452 seconds)


**5)代码操作Hive**
1) 导入依赖
```xml
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>

2) 将hive-site.xml 文件拷贝到项目的resources 目录中,代码实现

//创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("sql")
.getOrCreate()

上一篇     下一篇