about云開發

 找回密碼
 立即注冊

QQ登錄

只需一步,快速開始

掃一掃,訪問微社區

打印 上一主題 下一主題

[介紹解說] Spark精細深度比較:為何ShuffleManager改成了SortShuffleManager【面試必備】

[復制鏈接]
跳轉到指定樓層
樓主
BGnv5 發表于 2019-6-15 10:08:23 | 只看該作者 |只看大圖 回帖獎勵 |倒序瀏覽 |閱讀模式
問題導讀:

1.HashShuffleManager 運行原理是什么?
2.SortShuffleManager 運行機制及其原理是什么?
3.shuffle 相關參數如何使用?


在 Spark 的源碼中,負責 shuffle 過程的執行、計算、處理的組件主要是 ShuffleManager。

在 Spark 1.2 以前,默認的 shuffle 計算引擎是 HashShuffleManager。該 ShuffleMananger 有一個非常嚴重的弊端,就是會產生大量的磁盤文件,進而有大量的磁盤 IO 操作,比較影響性能。

因此在 Spark 1.2 之后,默認的 ShuffleManager 改成了 SortShuffleManager。SortShuffleManager 相對來說,有了一定的改進。主要就在于,每個 Task 在 Shuffle Write 操作時,雖然也會產生較大的磁盤文件,但最后會將所有的臨時文件合并 (merge) 成一個磁盤文件,因此每個 Task 就只有一個磁盤文件。在下一個 Stage 的 Shuffle Read Task 拉取自己數據的時候,只要根據索引拉取每個磁盤文件中的部分數據即可。

一、HashShuffleManager 運行原理

普通模式下,在 Shuffle Write 階段,每個 Task 將數據按照 Key 進行 Hash 計算,然后按照計算結果,將相同的 Key 對應的數據寫入內存緩沖區,當內存緩沖區寫滿之后會直接溢寫到磁盤文件。這里需要寫多少個磁盤文件,和下一個 stage 的 Shuffle Read Task 的數量一致。

然后,Shuffle Read 階段的每個 Task 會拉取 Shuffle Write 階段所有相同 Key 的文件,一遍拉取一遍聚合。每個 Shuffle Read 階段的 Task 都有自己的緩沖區,每次只能拉取與緩沖區大小一致的數據,然后通過內存中的 Map 進行聚合等操作,聚合完一批再取下一批數據。

比如,當前 Stage 有 5 個 Executor,每個 Executor 分配一個 cpu core,有 50 個 task,每個 Executor 執行 10 個 task;下一個 stage 有100 個 task。那么在 Shuffle Write 階段每個 task 要創建 100 個磁盤文件,每個 Executor 進程要創建 1000 個文件,一共要創建 1000 * 5 = 5000 個磁盤文件,數量很多。

具體執行原理圖如下圖所示:



針對 HashShuffleManager 我們可以設置一個參數:spark.shuffle.consolidateFiles。這個參數的值默認是 fasle,如果設置成 true 之后就會開啟優化機制。

當開啟這個參數之后,在 Shuffle Write 階段寫文件的時候會復用文件,每個 task 不會為 Shuffle Read 階段的 task 都創建一份文件。此時會出現一個 shuffleFileGroup 的概念,每個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量和 Shuffle Read 階段的 task 數量一致。每個 Executor 上有多少個 cpu core 就會并行執行幾個 task,每個 task 會創建一個 shuffleFileGroup,然后后續并行執行的 task 會復用前面生成的這個 shuffleFileGroup。

比如,當前 stage 有 5 個 Executor,每個 Executor 分配 3 個 cpu core,一共有 50 個 task,每個 Executor 執行 10 個 task,Shuffle Read 階段有 100 個 task。那么此時,每個 Executor 進程會創建 3 * 100 個文件,一共會創建 5 * 3 * 100 個文件。
具體原理如圖示:



二、SortShuffleManager 運行原理

SortShuffleManager 運行機制有兩種,一種是普通運行機制,另一種是 bypass 運行機制。當 shuffle read task 的數量小于等于 spark.shuffle.sort.bypassMergeThreshold 參數值時 (默認是 200 ) ,就會啟用 bypass 機制。

1、普通機制

在該模式下,Shuffle Write 階段會將數據寫入一個內存的數據結構中,此時根據不同的算子會有不同的數據結構。比如是 reduceByKey 這種聚合類的 shuffle 算子,會選用 Map 數據結構,一遍用 Map 進行聚合(HashShuffleManager 聚合操作是放在 Shuffle Read 階段),一遍寫入內存;如果是 join 相關的普通 shuffle 算子的話,會用 Array 數據結構,直接寫入內存。當內存達到臨界閾值之后,會將內存中的數據進行排序,然后分批次寫入磁盤 (默認每批次有 1W 條數據),在寫入磁盤的時候不會像 HashShuffleManager 那樣直接寫入磁盤,這里會先寫入內存緩沖流,當緩沖流滿溢之后一次性寫入磁盤。

此時也會生成大批量的文件,最后會將之前所有的臨時磁盤文件進行合并,這就是 merge 過程 (就是將所有的臨時磁盤文件中的數據讀取出來,然后依次寫入最終的文件中)。每個 task 最終會生成一份磁盤文件和一份索引文件,索引文件中標示了下游每個 task 的數據在文件中的 start offset 和 end offset。

比如,當前 stage 有 5 個 Executor,每個 Executor 分配 1 個 cpu core,共有 50 個 task,每個 Executor 執行 10 個 task;下一個 stage 有 100 個 task。那么每個 Executor 創建 10 個磁盤文件,一共有 50 個磁盤文件。

具體如下圖所示:



2、bypass 機制

觸發該機制的條件:

  • shuffle reduce 端的 task 數量小于 spark.shuffle.sort.bypassMergeThreshold 參數值的時候;
  • 不是聚合類的shuffle算子(比如reduceByKey);

該機制下,當前 stage 的每個 task 會將數據的 key 進行 hash,然后將相同 hash 的 key 鎖對應的數據寫入到同一個內存緩沖區,緩沖寫滿后會溢寫到磁盤文件,這里和 HashShuffleManager一致。然后會進入 merge 階段,將所有的磁盤文件合并成一個磁盤文件,并創建一個索引文件。

相比較于普通機制,這里有兩個地方不同:

  • 將數據寫入內存時候,普通模式是將數據寫入 Map 或者 Array 這樣的內存數據結構中,這里是根據 key 的 Hash 值直接寫入內存;
  • 該模式下在寫入磁盤之前不會排序;
  • 磁盤寫機制不同。

具體如圖示:



三、shuffle 相關的參數

spark.shuffle.file.buffer

  • 默認值:32k
  • 參數說明:該參數用于設置 shuffle write task 的 BufferedOutputStream 的 buffer 緩沖大小。將數據寫到磁盤文件之前,會先寫入 buffer 緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。
  • 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如 64k),從而減少 shuffle write 過程中溢寫磁盤文件的次數,也就可以減少磁盤 IO 次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提升。

spark.reducer.maxSizeInFlight

  • 默認值:48m
  • 參數說明:該參數用于設置 shuffle read task 的 buffer 緩沖大小,而這個 buffer 緩沖決定了每次能夠拉取多少數據。
  • 調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如 96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提升。

spark.shuffle.io.maxRetries

  • 默認值:3
  • 參數說明:shuffle read task 從 shuffle write task 所在節點拉取屬于自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。
  • 調優建議:對于那些包含了特別耗時的 shuffle 操作的作業,建議增加重試最大次數(比如 60 次),以避免由于 JVM 的 full gc 或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對于針對超大數據量(數十億~上百億)的 shuffle 過程,調節該參數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

  • 默認值:5s
  • 參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是 5s。
  • 調優建議:建議加大間隔時長(比如 60s),以增加 shuffle 操作的穩定性。

spark.shuffle.memoryFraction

  • 默認值:0.2
  • 參數說明:該參數代表了 Executor 內存中,分配給 shuffle read task 進行聚合操作的內存比例,默認是 20%。
  • 調優建議:在資源參數調優中講解過這個參數。如果內存充足,而且很少使用持久化操作,建議調高這個比例,給 shuffle read 的聚合操作更多內存,以避免由于內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升 10% 左右。

spark.shuffle.manager

  • 默認值:sort
  • 參數說明:該參數用于設置 ShuffleManager 的類型。Spark 1.5 以后,有三個可選項:hash、sort 和 tungsten-sort。HashShuffleManager 是 Spark 1.2 以前的默認選項,但是 Spark 1.2 以及之后的版本默認都是 SortShuffleManager 了。tungsten-sort 與 sort 類似,但是使用了 tungsten 計劃中的堆外內存管理機制,內存使用效率更高。
  • 調優建議:由于 SortShuffleManager 默認會對數據進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用默認的 SortShuffleManager 就可以;而如果你的業務邏輯不需要對數據進行排序,那么建議參考后面的幾個參數調優,通過 bypass 機制或優化的 HashShuffleManager 來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort 要慎用,因為之前發現了一些相應的 bug。

spark.shuffle.sort.bypassMergeThreshold

  • 默認值:200
  • 參數說明:當 ShuffleManager 為 SortShuffleManager 時,如果 shuffle read task 的數量小于這個閾值(默認是200),則 shuffle write 過程中不會進行排序操作,而是直接按照未經優化的 HashShuffleManager 的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合并成一個文件,并會創建單獨的索引文件。
  • 調優建議:當你使用 SortShuffleManager 時,如果的確不需要排序操作,那么建議將這個參數調大一些,大于 shuffle read task 的數量。那么此時就會自動啟用 bypass 機制,map-side 就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此 shuffle write 性能有待提高。

spark.shuffle.consolidateFiles

  • 默認值:false
  • 參數說明:如果使用 HashShuffleManager,該參數有效。如果設置為 true,那么就會開啟 consolidate 機制,會大幅度合并 shuffle write 的輸出文件,對于 shuffle read task 數量特別多的情況下,這種方法可以極大地減少磁盤 IO 開銷,提升性能。
  • 調優建議:如果的確不需要 SortShuffleManager 的排序機制,那么除了使用 bypass 機制,還可以嘗試將 spark.shffle.manager 參數手動指定為 hash,使用 HashShuffleManager,同時開啟 consolidate 機制。在實踐中嘗試過,發現其性能比開啟了 bypass 機制的 SortShuffleManager 要高出 10%~30%。





最新經典文章,歡迎關注公眾號




作者:stonezhu
原文鏈接:https://juejin.im/post/5cff6461f265da1b6d401bdc


本帖被以下淘專輯推薦:

您需要登錄后才可以回帖 登錄 | 立即注冊

本版積分規則

關閉

推薦上一條 /3 下一條

QQ|小黑屋|about云開發-學問論壇|社區 ( 京ICP備12023829號 )

GMT+8, 2019-8-18 17:43 , Processed in 1.218750 second(s), 31 queries , Gzip On.

Powered by Discuz! X3.4 Licensed

© 2018 Comsenz Inc.Designed by u179

快速回復 返回頂部 返回列表
排球比赛场地