0%

DataFrame编程

DataFrame编程

DataFrame是一种以RDD为基础的分布式数据集,类似于关系型数据库的Table表,与RDD的主要区别在于,DataFrame带有schema元信息,表示数据集的每一列都带有名称和类型,而RDD不知道所存数据元素的具体内部结构,只能进行简单、通用的流水线优化。

SparkSession

在Spark Core中使用的是上下文环境SparkContext,而在SparkSQL中使用的是SparkSession,SparkSession是由SparkContext和HiveContext的组合

spark-shell会自动创建SparkContext和SparkSession对象

1
2
Spark context available as 'sc' (master = local[*], app id = local-1621927578517).
Spark session available as 'spark'
1
org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@15c882e8

在自己编写程序构建环境时

1
2
3
4
5
val sparkConf = new SparkConf().setMaster("local").setAppName("sparksql")

val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 隐式转换 这个spark是上面声明的SparkSession对象实例
import spark.implicits._

DataFrame操作

创建DataFrame

SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式

  • 通过spark的数据源进行创建
  • 从RDD进行转换
  • 从Hive的Table进行查询返回
通过spark的数据源进行创建

spark支持很多种数据源格式

1
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

以json为例

1
2
3
4
5
// 参数为文件的路径(hdfs路径)
val json = spark.read.json("/spark/local/user.json")
--
可以看到返回的是一个DataFrame,且可以看到字段信息
json: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
从RDD进行转换

可以在RDD和DataFrame之间进行互相转换

1
2
3
val rdd = sc.textFile("data/id.txt")
// rdd转DataFrame
rdd.toDF("id").show

DataFrame同样也可以转成RDD

1
2
3
val json = spark.read.json("/spark/local/user.json")
// org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD
val rdd = json.rdd

创建临时表

使用DataFrame来创建临时视图来辅助使得可以使用SQL语句来查询数据

1
2
3
4
5
6
7
8
9
10
11
// 有以下四种方式
// 1.新建临时视图
// 普通的临时视图是Session范围的,其他session是无法获取到这张表的
json.createTempView("user")
// 2.新建或者替换临时视图
json.createOrReplaceTempView("user")
// 3.新建全局临时视图
// 全局临时视图是应用范围的,在访问时需要全路径访问,global_temp.user
json.createGlobalTempView("user")
// 4.新建或者替换全局临时视图
json.createOrReplaceGlobalTempView("user")

sql查询表数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 查询临时视图
spark.sql("select * from user").show
+---+----+
|age|name|
+---+----+
| 18|张三|
| 20|李四|
+---+----+

// 查询全局临时视图 注意global_temp
spark.sql("select * from global_temp.user").show
+---+----+
|age|name|
+---+----+
| 18|张三|
| 20|李四|
+---+----+