0%

DataFrame编程

DataFrame编程全解析:结构化数据处理的核心抽象

DataFrame 是 Spark SQL 的核心抽象,它将关系型数据库的结构化查询能力与 Spark 的分布式计算能力相结合,为开发者提供了高效、灵活的数据处理方式。本文将深入探讨 DataFrame 的创建、转换、操作及性能优化,帮助你全面掌握这一强大工具。

DataFrame 基础概念与核心特性

DataFrame 与 RDD 的本质区别

特性 RDD DataFrame
数据结构 无 schema,仅存储对象集合 有 schema,明确列名和类型
优化机制 依赖手动优化,缺乏查询分析 自动优化执行计划(Catalyst 优化器)
计算方式 基于函数式编程(map、filter 等) 基于关系代数(select、join、groupBy)
序列化方式 对象序列化,开销大 列式存储(如 Parquet),节省空间
适用场景 非结构化数据(如日志、文本) 结构化数据(如数据库表、CSV)

Schema 元信息的重要性

Schema 定义了 DataFrame 各列的名称和类型,使 Spark 能够:

  • 优化内存使用:通过列式存储和类型推断,减少内存占用;
  • 加速查询执行:直接定位所需列,避免全量扫描;
  • 提供类型安全:编译时检查列名和类型,提前发现错误。

DataFrame 的创建与转换

创建 DataFrame 的主要方式

从文件数据源读取

支持 CSV、JSON、Parquet、ORC 等多种格式:

1
2
3
4
5
6
7
8
9
10
11
// 读取 JSON 文件  
val jsonDF = spark.read.json("path/to/data.json")

// 读取 CSV 文件(带表头,自动推断类型)
val csvDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("path/to/data.csv")

// 读取 Parquet 文件(Spark 默认格式)
val parquetDF = spark.read.parquet("path/to/data.parquet")
从 RDD 转换

需通过 toDF() 方法或手动指定 Schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 方式一:使用样例类(Case Class)自动推断 Schema  
case class Person(name: String, age: Int)
val personRDD = sc.parallelize(Seq(
Person("Alice", 25),
Person("Bob", 30)
))
val personDF = personRDD.toDF() // 自动使用样例类字段名作为列名

// 方式二:手动指定 Schema
val rdd = sc.parallelize(Seq(
(1, "Alice"),
(2, "Bob")
))
val df = rdd.toDF("id", "name") // 手动指定列名

// 方式三:通过 StructType 定义复杂 Schema
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("info", MapType(StringType, StringType), nullable = true)
))
val rowRDD = sc.parallelize(Seq(
Row(1, "Alice", Map("city" -> "Beijing", "job" -> "Engineer"))
))
val complexDF = spark.createDataFrame(rowRDD, schema)
从 Hive 表或 JDBC 数据源读取
1
2
3
4
5
6
// 读取 Hive 表(需启用 Hive 支持)  
val hiveDF = spark.table("hive_db.table_name")

// 读取 JDBC 数据源(如 MySQL)
val jdbcDF = spark.read
.jdbc("jdbc:mysql://localhost/test", "table_name", connectionProperties)

DataFrame 与 RDD 的互相转换

DataFrame → RDD

DataFrame 转为 RDD 后,每行数据变为 Row 类型:

1
2
3
4
5
val rdd = df.rdd  // RDD[Row]  
rdd.foreach(row => {
val id = row.getInt(0) // 通过索引访问列(从 0 开始)
val name = row.getString(1)
})
RDD → DataFrame

通过 toDF()createDataFrame() 方法:

1
2
3
4
5
6
// 方式一:简单转换(需导入隐式转换)  
import spark.implicits._
val df = rdd.toDF("col1", "col2")

// 方式二:复杂 Schema 转换
val df = spark.createDataFrame(rdd, schema)

DataFrame 的常用操作

基本查询操作

数据展示与 Schema 查看
1
2
3
4
df.show(10)  // 显示前 10 行(默认 20 行)  
df.printSchema() // 打印 Schema 结构
df.columns // 获取列名数组
df.count() // 统计行数
列选择与过滤
1
2
3
4
5
6
// 选择列(等价于 SQL 的 SELECT)  
df.select("name", "age").show()

// 过滤行(等价于 SQL 的 WHERE)
df.filter($"age" > 18).show() // 使用 $"列名" 语法
df.where("age > 18 AND gender = 'F'").show() // 直接写 SQL 表达式
数据排序与聚合
1
2
3
4
5
6
7
// 排序  
df.sort($"age".desc).show() // 按年龄降序排列

// 聚合(等价于 SQL 的 GROUP BY)
df.groupBy("gender")
.agg(avg("age"), sum("salary")) // 计算各性别平均年龄和总薪资
.show()

表连接与集合操作

表连接(Join)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val employeesDF = spark.read.parquet("employees")  
val departmentsDF = spark.read.parquet("departments")

// 内连接
employeesDF.join(departmentsDF, "dept_id").show()

// 左外连接
employeesDF.join(departmentsDF, Seq("dept_id"), "left_outer").show()

// 多条件连接
employeesDF.join(
departmentsDF,
employeesDF("dept_id") === departmentsDF("id") &&
employeesDF("year") === departmentsDF("founded_year")
).show()
集合操作
1
2
3
4
5
6
7
val df1 = spark.createDataFrame(Seq((1, "A"), (2, "B"))).toDF("id", "name")  
val df2 = spark.createDataFrame(Seq((2, "B"), (3, "C"))).toDF("id", "name")

df1.union(df2).show() // 并集(包含重复行)
df1.union(df2).distinct().show() // 并集(去重)
df1.intersect(df2).show() // 交集
df1.except(df2).show() // 差集(df1 有但 df2 没有的)

与 SQL 的结合使用

创建临时视图
1
2
3
4
5
df.createTempView("people")  // 创建临时视图(Session 级别)  
df.createGlobalTempView("people") // 创建全局临时视图(跨 Session)

// 查询全局临时视图时需加前缀
spark.sql("SELECT * FROM global_temp.people").show()
执行 SQL 查询
1
2
3
4
5
6
7
8
9
// 复杂查询示例  
val resultDF = spark.sql("""
SELECT gender, AVG(age) AS avg_age, COUNT(*) AS count
FROM people
WHERE age > 18
GROUP BY gender
HAVING count > 100
ORDER BY avg_age DESC
""")

DataFrame 的高级特性与优化

列表达式与函数

Spark SQL 提供丰富的内置函数(如字符串、数学、日期等):

1
2
3
4
5
6
7
8
9
10
11
import org.apache.spark.sql.functions._  

// 字符串操作
df.select(concat($"firstName", lit(" "), $"lastName").as("fullName")).show()

// 日期操作
df.select(year($"birthDate"), month($"birthDate")).show()

// 自定义聚合函数
val myAverage = new MyAverageUDAF() // 自定义 UDAF 实现
df.groupBy("dept").agg(myAverage($"salary")).show()

性能优化技巧

缓存频繁使用的 DataFrame
1
2
df.cache()  // 缓存到内存(等价于 persist(StorageLevel.MEMORY_ONLY))  
df.persist(StorageLevel.MEMORY_AND_DISK) // 内存不足时溢写到磁盘
分区控制
1
2
3
4
// 重新分区  
df.repartition(10) // 增加或减少分区数
df.repartition($"deptId") // 按列值哈希分区(提升 join 性能)
df.coalesce(5) // 合并分区(减少数据移动)
谓词下推(Predicate Pushdown)

确保过滤条件尽早应用,减少数据传输:

1
2
3
4
5
6
7
// 正确方式:先过滤后 join  
val filteredDF = df.filter($"age" > 18)
val joinedDF = filteredDF.join(otherDF, "id")

// 错误方式:先 join 后过滤(数据传输量大)
val joinedDF = df.join(otherDF, "id")
val filteredJoinedDF = joinedDF.filter($"age" > 18)

数据写入与存储格式

写入文件
1
2
3
4
5
6
7
8
9
10
// 写入 Parquet(推荐格式)  
df.write.parquet("path/to/output.parquet")

// 写入 CSV
df.write
.option("header", "true")
.csv("path/to/output.csv")

// 写入 JSON
df.write.json("path/to/output.json")
写入模式控制
1
2
3
4
5
6
df.write  
.mode("append") // 追加模式(默认)
// .mode("overwrite") // 覆盖模式
// .mode("ignore") // 表存在则忽略
// .mode("error") // 表存在则报错(默认)
.parquet("path/to/output")

DataFrame 与 Dataset 的对比

特性 DataFrame Dataset
类型安全 否(Row 类型,运行时检查) 是(编译时检查)
适用语言 所有语言(Scala、Java、Python、R) 仅 Scala 和 Java
性能 高(依赖 Catalyst 优化) 更高(类型信息优化代码生成)
使用场景 通用结构化数据处理 需类型安全的复杂领域对象处理

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10

域名更新通知

您好!我们的官方域名已更新为 zhhll.com.cn。 请收藏新域名以获取最佳访问体验。