DataFrame编程全解析:结构化数据处理的核心抽象 DataFrame 是 Spark SQL 的核心抽象,它将关系型数据库的结构化查询能力与 Spark 的分布式计算能力相结合,为开发者提供了高效、灵活的数据处理方式。本文将深入探讨 DataFrame 的创建、转换、操作及性能优化,帮助你全面掌握这一强大工具。
DataFrame 基础概念与核心特性 DataFrame 与 RDD 的本质区别
特性
RDD
DataFrame
数据结构
无 schema,仅存储对象集合
有 schema,明确列名和类型
优化机制
依赖手动优化,缺乏查询分析
自动优化执行计划(Catalyst 优化器)
计算方式
基于函数式编程(map、filter 等)
基于关系代数(select、join、groupBy)
序列化方式
对象序列化,开销大
列式存储(如 Parquet),节省空间
适用场景
非结构化数据(如日志、文本)
结构化数据(如数据库表、CSV)
Schema 元信息的重要性 Schema 定义了 DataFrame 各列的名称和类型,使 Spark 能够:
优化内存使用 :通过列式存储和类型推断,减少内存占用;
加速查询执行 :直接定位所需列,避免全量扫描;
提供类型安全 :编译时检查列名和类型,提前发现错误。
DataFrame 的创建与转换 创建 DataFrame 的主要方式 从文件数据源读取 支持 CSV、JSON、Parquet、ORC 等多种格式:
1 2 3 4 5 6 7 8 9 10 11 val jsonDF = spark.read.json("path/to/data.json" ) val csvDF = spark.read .option("header" , "true" ) .option("inferSchema" , "true" ) .csv("path/to/data.csv" ) val parquetDF = spark.read.parquet("path/to/data.parquet" )
从 RDD 转换 需通过 toDF()
方法或手动指定 Schema:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 case class Person (name: String , age: Int ) val personRDD = sc.parallelize(Seq ( Person ("Alice" , 25 ), Person ("Bob" , 30 ) )) val personDF = personRDD.toDF() val rdd = sc.parallelize(Seq ( (1 , "Alice" ), (2 , "Bob" ) )) val df = rdd.toDF("id" , "name" ) import org.apache.spark.sql.types._ val schema = StructType (Seq ( StructField ("id" , IntegerType , nullable = false ), StructField ("name" , StringType , nullable = true ), StructField ("info" , MapType (StringType , StringType ), nullable = true ) )) val rowRDD = sc.parallelize(Seq ( Row (1 , "Alice" , Map ("city" -> "Beijing" , "job" -> "Engineer" )) )) val complexDF = spark.createDataFrame(rowRDD, schema)
从 Hive 表或 JDBC 数据源读取 1 2 3 4 5 6 val hiveDF = spark.table("hive_db.table_name" ) val jdbcDF = spark.read .jdbc("jdbc:mysql://localhost/test" , "table_name" , connectionProperties)
DataFrame 与 RDD 的互相转换 DataFrame → RDD DataFrame 转为 RDD 后,每行数据变为 Row
类型:
1 2 3 4 5 val rdd = df.rdd rdd.foreach(row => { val id = row.getInt(0 ) val name = row.getString(1 ) })
RDD → DataFrame 通过 toDF()
或 createDataFrame()
方法:
1 2 3 4 5 6 import spark.implicits._ val df = rdd.toDF("col1" , "col2" ) val df = spark.createDataFrame(rdd, schema)
DataFrame 的常用操作 基本查询操作 数据展示与 Schema 查看 1 2 3 4 df.show(10 ) df.printSchema() df.columns df.count()
列选择与过滤 1 2 3 4 5 6 df.select("name" , "age" ).show() df.filter($"age" > 18 ).show() df.where("age > 18 AND gender = 'F'" ).show()
数据排序与聚合 1 2 3 4 5 6 7 df.sort($"age" .desc).show() df.groupBy("gender" ) .agg(avg("age" ), sum("salary" )) .show()
表连接与集合操作 表连接(Join) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 val employeesDF = spark.read.parquet("employees" ) val departmentsDF = spark.read.parquet("departments" ) employeesDF.join(departmentsDF, "dept_id" ).show() employeesDF.join(departmentsDF, Seq ("dept_id" ), "left_outer" ).show() employeesDF.join( departmentsDF, employeesDF("dept_id" ) === departmentsDF("id" ) && employeesDF("year" ) === departmentsDF("founded_year" ) ).show()
集合操作 1 2 3 4 5 6 7 val df1 = spark.createDataFrame(Seq ((1 , "A" ), (2 , "B" ))).toDF("id" , "name" ) val df2 = spark.createDataFrame(Seq ((2 , "B" ), (3 , "C" ))).toDF("id" , "name" ) df1.union(df2).show() df1.union(df2).distinct().show() df1.intersect(df2).show() df1.except(df2).show()
与 SQL 的结合使用 创建临时视图 1 2 3 4 5 df.createTempView("people" ) df.createGlobalTempView("people" ) spark.sql("SELECT * FROM global_temp.people" ).show()
执行 SQL 查询 1 2 3 4 5 6 7 8 9 val resultDF = spark.sql("" " SELECT gender, AVG(age) AS avg_age, COUNT(*) AS count FROM people WHERE age > 18 GROUP BY gender HAVING count > 100 ORDER BY avg_age DESC " "" )
DataFrame 的高级特性与优化 列表达式与函数 Spark SQL 提供丰富的内置函数(如字符串、数学、日期等):
1 2 3 4 5 6 7 8 9 10 11 import org.apache.spark.sql.functions._ df.select(concat($"firstName" , lit(" " ), $"lastName" ).as("fullName" )).show() df.select(year($"birthDate" ), month($"birthDate" )).show() val myAverage = new MyAverageUDAF () df.groupBy("dept" ).agg(myAverage($"salary" )).show()
性能优化技巧 缓存频繁使用的 DataFrame 1 2 df.cache() df.persist(StorageLevel .MEMORY_AND_DISK )
分区控制 1 2 3 4 df.repartition(10 ) df.repartition($"deptId" ) df.coalesce(5 )
谓词下推(Predicate Pushdown) 确保过滤条件尽早应用,减少数据传输:
1 2 3 4 5 6 7 val filteredDF = df.filter($"age" > 18 ) val joinedDF = filteredDF.join(otherDF, "id" ) val joinedDF = df.join(otherDF, "id" ) val filteredJoinedDF = joinedDF.filter($"age" > 18 )
数据写入与存储格式 写入文件 1 2 3 4 5 6 7 8 9 10 df.write.parquet("path/to/output.parquet" ) df.write .option("header" , "true" ) .csv("path/to/output.csv" ) df.write.json("path/to/output.json" )
写入模式控制 1 2 3 4 5 6 df.write .mode("append" ) .parquet("path/to/output" )
DataFrame 与 Dataset 的对比
特性
DataFrame
Dataset
类型安全
否(Row 类型,运行时检查)
是(编译时检查)
适用语言
所有语言(Scala、Java、Python、R)
仅 Scala 和 Java
性能
高(依赖 Catalyst 优化)
更高(类型信息优化代码生成)
使用场景
通用结构化数据处理
需类型安全的复杂领域对象处理
v1.3.10