spark优化
算法性能优化
- 使用
reduceByKey
/aggregateByKey
替代groupByKey
- 使用带 Partitions 的 API 进行计算(一次函数调用处理一个 Partition),如
mapPartitions
替代map
- 使用手动添加前缀的方式优化由于数据倾斜带来的性能问题
- 使用
并行度优化
- 使用带有numPartitions参数的,可以配置分区数量
- 通过配置spark.default.parallelism来控制Shuffle过程的默认任务数量。spark官方推荐cpu num*2数量的并行度
缓存优化
- 调用
cache
persist
及unpersist
以便控制哪一个RDD
需要缓存 - 缓存
RDD
时考虑使用序列化缓存,进一步考虑压缩
- 调用
内存优化
- 使用更节约内存的数据结构:如避免使用 java 的包装类型(boxed),避免使用内置的
Map
List
等数据结构(会创建额外的Entry
对象)等 - 使用广播变量:对于某个只读的大对象,在一个
Executor
内部共享,而不是每个task
都复制一份 - 调整 spark 管理的内存大小:配置
spark.memory
相关参数 - 调整 JVM 的新生代和老生代内存比例
- gc 优化:使用
G1
垃圾收集器
- 使用更节约内存的数据结构:如避免使用 java 的包装类型(boxed),避免使用内置的
其他有用的优化方式
- 资源:配置
executor
的数量,每个executor
的核数及内存,driver
的核数和内存 - 调度:配置是否重启一个较慢的任务,设置
spark.speculation
相关参数 - IO:使用节约空间的序列化方式,如配置
kryo
序列化,调整本地化程度等待时间spark.locality.wait
参数
- 资源:配置
分区问题
小分区合并问题 使用filter进行频繁过滤或过滤掉的数据量过大会造成大量的小分区产生。由于spark是每个数据分区都会分配一个任务执行,如果任务过大,每个任务处理的数据量很小,会造成线程切换开销大,很多任务等待执行,并行度不高。可以使用RDD中重分区repartition进行数据紧缩,减少分区数,将小分区合并为大分区。
倾斜问题 在个别分区上,任务执行时间过长。当少量任务处理的数据量和其他任务差异过大时,任务进度长时间维持在99%。
产生数据倾斜的原因有几种
- key的数据分布不均匀
- 业务数据本身就会产生数据倾斜
- 结构化数据表设计问题
- 某些SQL语句会产生数据倾斜
产生任务倾斜的原因不太好确定,一般是那台机器正在执行的Executor执行时间过长,因为服务器架构,或JVM,也可能是来自线程池的问题。