查看原文
其他

阿里云Spark Shuffle的优化

辰石 大数据技术与架构 2021-10-21

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!
本次分享者:辰石,来自阿里巴巴计算平台事业部EMR团队技术专家,目前从事大数据存储以及Spark相关方面的工作。
  • Spark Shuffle介绍

  • Smart Shuffle设计

  • 性能分析

Spark Shuffle流程

  • Spark 0.8及以前 Hash Based Shuffle 

  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制 

  • Spark 0.9 引入ExternalAppendOnlyMap 

  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle 

  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle 

  • Spark 1.4 引入Tungsten-Sort Based Shuffle 

  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle 

  • Spark 2.0 Hash Based Shuffle退出历史舞台

总结一下, 就是最开始的时候使用的是 Hash Based Shuffle, 这时候每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是M x R ,其中M是Map的个数,R是Reduce的个数。这样会产生大量的小文件,对文件系统压力很大,而且也不利于IO吞吐量。后面忍不了了就做了优化,把在同一core上运行的多个Mapper 输出的合并到同一个文件,这样文件数目就变成了 cores R 个了。

Spark Shuffle实现

Sort-based shuffle介绍

这个方式的选择是在org.apache.spark.SparkEnv完成的:
// Let the user specify short names forshuffle managers
val shortShuffleMgrNames = Map(
"hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName =conf.get("spark.shuffle.manager", "sort") //获得Shuffle Manager的type,sort为默认
val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager =instantiateClass[ShuffleManager](shuffleMgrClass)
Hashbased shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M x R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。Hash based shuffle设计的目标之一就是避免不需要的排序(Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销)。但是它在处理超大规模数据集的时候,产生了大量的DiskIO和内存的消耗,这无疑很影响性能。
Hash based shuffle也在不断的优化中,正如前面讲到的Spark 0.8.1引入的file consolidation在一定程度上解决了这个问题。为了更好的解决这个问题,Spark 1.1 引入了Sort based shuffle。首先,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。
避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件对系统带来的压力。
目前writer的实现分为三种, 分为 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter。

SortShuffleManager只有BlockStoreShuffleReader这一种ShuffleReader。

Spark-shuffle存在的问题

同步操作
Shuffle数据只有等map task任务结束后可能会触发多路归并生成最终数据。
大量的磁盘IO
Shuffle的数据在Merge阶段存在大量的磁盘读写IO,在sort-merge阶段对磁盘IO带宽要求较高。
计算与网络的串行
Task任务计算和网络IO的串行操作。

Smart Shuffle

shuffle数据的pipeline
shuffle数据在map端累积到一定数量发送到reduce端。
避免不必要的网络IO
根据partition数量的位置,可以调度该reduce任务到相应的节点。
计算和网络IO的异步化
shuffle数据的生成和shuffle数据的发送可以并行执行。
避免sort-merge减少磁盘IO
shuffle数据是按照partition进行分区,shuffle数据无需sort-merge

Smart Shuffle使用

  • 配置spark.shuffle.manager : org.apache.spark.shuffle.hash.HashShuffleManager

  • 配置spark.shuffle.smart.spill.memorySizeForceSpillThreshold:控制shuffle数据占用内存的大小,默认为128M

  • 配置spark.shuffle.smart.transfer.blockSize:控制shuffle在网络传输数据块的大小

性能分析

硬件及软件资源:

TPC-DS性能:

Smart shuffle TPC-DS性能提升28%:
  • Smart shuffle没有打来单个query性能的下降

  • 单个query最大能够带来最大2倍的性能提升

提取Q2和Q49查询性能分析:
  • Q2在两种shuffle性能保持一致

  • Q49在Smart shuffle下性能有很大提升

单个查询对比:

左侧为sorted shuffle,右边为smart shuffle。 Q2查询相对简单,shuffle数据也比较少,smart shuffle性能保持不变。
Q2 CPU对比: 左侧为sorted shuffle,右侧是smart shuffle

磁盘对比:
左侧为sorted shuffle,右侧是smart shuffle


欢迎点赞+收藏+转发朋友圈素质三连


文章不错?点个【在看】吧! 👇

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存