DataFrame编程
DataFrame是一种以RDD为基础的分布式数据集,类似于关系型数据库的Table表,与RDD的主要区别在于,DataFrame带有schema元信息,表示数据集的每一列都带有名称和类型,而RDD不知道所存数据元素的具体内部结构,只能进行简单、通用的流水线优化。
SparkSession
在Spark Core中使用的是上下文环境SparkContext,而在SparkSQL中使用的是SparkSession,SparkSession是由SparkContext和HiveContext的组合
spark-shell会自动创建SparkContext和SparkSession对象
1 | Spark context available as 'sc' (master = local[*], app id = local-1621927578517). |
1 | org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@15c882e8 |
在自己编写程序构建环境时
1 | val sparkConf = new SparkConf().setMaster("local").setAppName("sparksql") |
DataFrame操作
创建DataFrame
SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式
- 通过spark的数据源进行创建
- 从RDD进行转换
- 从Hive的Table进行查询返回
通过spark的数据源进行创建
spark支持很多种数据源格式
1 | csv format jdbc json load option options orc parquet schema table text textFile |
以json为例
1 | // 参数为文件的路径(hdfs路径) |
从RDD进行转换
可以在RDD和DataFrame之间进行互相转换
1 | val rdd = sc.textFile("data/id.txt") |
DataFrame同样也可以转成RDD
1 | val json = spark.read.json("/spark/local/user.json") |
创建临时表
使用DataFrame创建临时视图来辅助使得可以使用SQL语句来查询数据
1 | // 有以下四种方式 |
sql查询表数据
1 | // 查询临时视图 |