如何閱讀spark源碼(spark源碼看什么書(shū))
本篇文章給大家談?wù)勅绾伍喿xspark源碼,以及spark源碼看什么書(shū)對(duì)應(yīng)的知識(shí)點(diǎn),希望對(duì)各位有所幫助,不要忘了收藏本站喔。
本文目錄一覽:
- 1、如何成為Spark高手
- 2、可能是全網(wǎng)最詳細(xì)的 Spark Sql Aggregate 源碼剖析
- 3、spark sql 2.3 源碼解讀 - Execute (7)
- 4、怎么用Eclipse搭建Spark源碼閱讀環(huán)境
- 5、怎么在Idea IDE里面打開(kāi)Spark源碼而不報(bào)錯(cuò)
- 6、Spark源碼分析之SparkSubmit的流程
如何成為Spark高手
第一階段:熟練掌握Scala語(yǔ)言
1,spark框架是采用scala語(yǔ)言寫(xiě)的,精致優(yōu)雅。想要成為spark高手,你就必須閱讀spark源碼,就必須掌握scala。?
2,雖然現(xiàn)在的spark可以使用多種語(yǔ)言開(kāi)發(fā),java,python,但是最快速和支持最好的API依然并將永遠(yuǎn)是Scala的API,所以必須掌握scala來(lái)編寫(xiě)復(fù)雜的和高性能的spark分布式程序。?
3尤其是熟練掌握Scala的trait,apply,函數(shù)式編程,泛型,逆變,與協(xié)變等。
第二階段:精通spark平臺(tái)本身提供給開(kāi)發(fā)折的API
1,掌握spark中面向RDD的開(kāi)發(fā)模式,掌握各種transformation和action函數(shù)的使用。?
2,掌握Spark中的款依賴(lài)和窄依賴(lài),lineage機(jī)制。?
3,掌握RDD的計(jì)算流程,如Stage的劃分,spark應(yīng)用程序提交給集群的基礎(chǔ)過(guò)程和Work節(jié)點(diǎn)基礎(chǔ)的工作原理。
第三階段:深入Spark內(nèi)核
此階段主要是通過(guò)Spark框架的源碼研讀來(lái)深入Spark內(nèi)核部分:?
1,通過(guò)源碼掌握Spark的任務(wù)提交,?
2,通過(guò)源碼掌握Spark的集群的任務(wù)調(diào)度,?
3,尤其要精通DAGScheduler,TaskScheduler和Worker節(jié)點(diǎn)內(nèi)部的工作的每一步細(xì)節(jié)。
第四階段:掌握Spark上的核心框架的使用
Spark作為云計(jì)算大數(shù)據(jù)時(shí)代的集大成者,在實(shí)時(shí)流式處理,圖技術(shù),機(jī)器學(xué)習(xí),nosql查詢(xún)等方面具有明顯的優(yōu)勢(shì),我們使用Spark的時(shí)候大部分時(shí)間都是在使用其框架:?
sparksql,spark streaming等?
1,spark streaming是出色的實(shí)時(shí)流失處理框架,要掌握,DStream,transformation和checkpoint等。?
2,spark sql是離線統(tǒng)計(jì)分析工具,shark已經(jīng)沒(méi)落。?
3,對(duì)于spark中的機(jī)器學(xué)習(xí)和Graphx等要掌握其原理和用法。
第五階段:做商業(yè)級(jí)的spark項(xiàng)目
通過(guò)一個(gè)完整的具有代表性的spark項(xiàng)目來(lái)貫穿spark的方方面面,包括項(xiàng)目的框架設(shè)計(jì),用到的技術(shù)的剖析,開(kāi)始實(shí)現(xiàn),運(yùn)維等,完善掌握其中的每一個(gè)階段和細(xì)節(jié),以后你就可以從容的面對(duì)絕大多數(shù)spark項(xiàng)目。
第六階段:提供spark解決方案
1,徹底掌握spark框架源碼的每一個(gè)細(xì)節(jié),?
2,根據(jù)步同的業(yè)務(wù)場(chǎng)景的需要提供spark在不同場(chǎng)景的解決方案,?
3,根據(jù)實(shí)際需要,在spark框架基礎(chǔ)上經(jīng)行2次開(kāi)發(fā),打造自己的spark框架。
可能是全網(wǎng)最詳細(xì)的 Spark Sql Aggregate 源碼剖析
縱觀 Spark Sql 源碼,聚合的實(shí)現(xiàn)是其中較為復(fù)雜的部分,本文希望能以例子結(jié)合流程圖的方式來(lái)說(shuō)清楚整個(gè)過(guò)程。這里僅關(guān)注 Aggregate 在物理執(zhí)行計(jì)劃相關(guān)的內(nèi)容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個(gè)專(zhuān)門(mén)的 Aggregation strategy 用來(lái)處理聚合,我們先來(lái)看看這個(gè)策略。
本文暫不討論 distinct Aggregate 的實(shí)現(xiàn)(有興趣的可以看看另一篇博文 ),我們來(lái)看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執(zhí)行計(jì)劃的
創(chuàng)建聚合分為兩個(gè)階段:
AggregateExpression 共有以下幾種 mode:
Q:是否支持使用 hash based agg 是如何判斷的?
摘自我另一篇文章:
為了說(shuō)明最常用也是最復(fù)雜的的 hash based agg,本小節(jié)暫時(shí)將示例 sql 改為
這樣就能進(jìn)入 HashAggregateExec 的分支
構(gòu)造函數(shù)主要工作就是對(duì) groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進(jìn)行了初始化
在 enable code gen 的情況下,會(huì)調(diào)用 HashAggregateExec#inputRDDs 來(lái)生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們?cè)O(shè)置 spark.sql.codegen.wholeStage 為 false 來(lái) disable code gen,這樣就會(huì)調(diào)用 HashAggregateExec#doExecute 來(lái)生成 RDD,如下:
可以看到,關(guān)鍵的部分就是根據(jù) child.execute() 生成的 RDD 的每一個(gè) partition 的迭代器轉(zhuǎn)化生成一個(gè)新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個(gè) partition。由于 TungstenAggregationIterator 涉及內(nèi)容非常多,我們單開(kāi)一大節(jié)來(lái)進(jìn)行介紹。
此迭代器:
注:UnsafeKVExternalSorter 的實(shí)現(xiàn)可以參考:
UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實(shí)現(xiàn),由原始內(nèi)存(byte array)而不是 Java 對(duì)象支持,由三個(gè)區(qū)域組成:
使用 UnsafeRow 的收益:
構(gòu)造函數(shù)的主要流程已在上圖中說(shuō)明,需要注意的是:當(dāng)內(nèi)存不足時(shí)(畢竟每個(gè) grouping 對(duì)應(yīng)的 agg buffer 直接占用內(nèi)存,如果 grouping 非常多,或者 agg buffer 較大,容易出現(xiàn)內(nèi)存用盡)會(huì)從 hash based aggregate 切換為 sort based aggregate(會(huì) spill 數(shù)據(jù)到磁盤(pán)),后文會(huì)進(jìn)行詳述。先來(lái)看看最關(guān)鍵的 processInputs 方法的實(shí)現(xiàn)
上圖中,需要注意的是:hashMap 中 get 一個(gè) groupingKey 對(duì)應(yīng)的 agg buffer 時(shí),若已經(jīng)存在該 buffer 則直接返回;若不存在,嘗試申請(qǐng)內(nèi)存新建一個(gè):
上圖中,用于真正處理一條 row 的 AggregationIterator#processRow 還需進(jìn)一步展開(kāi)分析。在此之前,我們先來(lái)看看 AggregateFunction 的分類(lèi)
AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類(lèi),具體的聚合函數(shù)均為這兩類(lèi)的子類(lèi)。
DeclarativeAggregate 是一類(lèi)直接由 Catalyst 中的 Expressions 構(gòu)成的聚合函數(shù),主要邏輯通過(guò)調(diào)用 4 個(gè)表達(dá)式完成,分別是:
我們?cè)俅我匀菀桌斫獾? Count 來(lái)舉例說(shuō)明:
通常來(lái)講,實(shí)現(xiàn)一個(gè)基于 Expressions 的 DeclarativeAggregate 函數(shù)包含以下幾個(gè)重要的組成部分:
再來(lái)看看 AggregationIterator#processRow
AggregationIterator#processRow 會(huì)調(diào)用
生成用于處理一行數(shù)據(jù)(row)的函數(shù)
說(shuō)白了 processRow 生成了函數(shù)才是直接用來(lái)接受一條 input row 來(lái)更新對(duì)應(yīng)的 agg buffer,具體是根據(jù) mode 及 aggExpression 中的 aggFunction 的類(lèi)型調(diào)用其 updateExpressions 或 mergeExpressions 方法:
比如,對(duì)于 aggFunction 為 DeclarativeAggregate 類(lèi)型的 Partial 下的 Count 來(lái)說(shuō)就是調(diào)用其 updateExpressions 方法,即:
對(duì)于 Final 的 Count 來(lái)說(shuō)就是調(diào)用其 mergeExpressions 方法,即:
對(duì)于 aggFunction 為 ImperativeAggregate 類(lèi)型的 Partial 下的 Collect 來(lái)說(shuō)就是調(diào)用其 update 方法,即:
對(duì)于 Final 的 Collect 來(lái)說(shuō)就是調(diào)用其 merge 方法,即:
我們都知道,讀取一個(gè)迭代器的數(shù)據(jù),是要不斷調(diào)用 hasNext 方法進(jìn)行 check 是否還有數(shù)據(jù),當(dāng)該方法返回 true 的時(shí)候再調(diào)用 next 方法取得下一條數(shù)據(jù)。所以要知道如何讀取 TungstenAggregationIterator 的數(shù)據(jù),就得分析其這兩個(gè)方法。
分為兩種情況,分別是:
Agg 的實(shí)現(xiàn)確實(shí)復(fù)雜,本文雖然篇幅已經(jīng)很長(zhǎng),但還有很多方面沒(méi)有 cover 到,但基本最核心、最復(fù)雜的點(diǎn)都詳細(xì)介紹了,如果對(duì)于未 cover 的部分有興趣,請(qǐng)自行閱讀源碼進(jìn)行分析~
spark sql 2.3 源碼解讀 - Execute (7)
終于到了最后一步執(zhí)行了:
最關(guān)鍵的兩個(gè)函數(shù)便是 doPrepare和 doExecute了。
還是以上一章的sql語(yǔ)句為例,其最終生成的sparkplan為:
看一下SortExec的doPrepare 和 doExecute方法:
下面看child也就是ShuffleExchangeExec:
先看沒(méi)有exchangeCoordinator的情況,
首先執(zhí)行:
上面的方法會(huì)返回一個(gè)ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle后的partition id:
接下來(lái):
返回結(jié)果是ShuffledRowRDD:
CoalescedPartitioner的邏輯:
再看有exchangeCoordinator的情況:
同樣返回的是ShuffledRowRDD:
再看doEstimationIfNecessary:
estimatePartitionStartIndices 函數(shù)得到了 partitionStartIndices:
有exchangeCoordinator的情況就生成了partitionStartIndices,從而對(duì)分區(qū)進(jìn)行了調(diào)整。
最后來(lái)一個(gè)例子:
未開(kāi)啟exchangeCoordinator的plan:
開(kāi)啟exchangeCoordinator的plan:
不同之處是 兩個(gè)Exchange都帶了coordinator,且都是同一個(gè)coordinator。
執(zhí)行withExchangeCoordinator前:
執(zhí)行withExchangeCoordinator后:
生成了coordinator,且執(zhí)行了 doPrepare后,可以看到兩個(gè)exchange都向其注冊(cè)了。
doExecute后:
原先的numPartitions是200,經(jīng)過(guò)執(zhí)行后,生成的partitionStartIndices為[1],也就是只有1個(gè)partition,顯然在測(cè)試數(shù)據(jù)量很小的情況下,1個(gè)partition是更為合理的。這就是ExchangeCoordinator的功勞。
execute 最終的輸出是rdd,剩下的結(jié)果便是spark對(duì)rdd的運(yùn)算了。其實(shí) spark sql 最終的目標(biāo)便也是生成rdd,交給spark core來(lái)運(yùn)算。
spark sql的介紹到這里就結(jié)束了。
怎么用Eclipse搭建Spark源碼閱讀環(huán)境
應(yīng)該說(shuō)這個(gè)和是不是Spark項(xiàng)目沒(méi)什么關(guān)系。
建議你使用intellij idea,在spark目錄下執(zhí)行"sbt/sbt gen-idea",會(huì)自動(dòng)生成.idea項(xiàng)目,導(dǎo)入即可。
idea我不熟,還需要做一些其他的插件配置(python, sbt等)和環(huán)境設(shè)置。
你也可以使用Eclipse看,Eclipse有scala IDE,把Spark項(xiàng)目當(dāng)maven工程導(dǎo)入。但是子項(xiàng)目之間的依賴(lài)會(huì)有點(diǎn)問(wèn)題,會(huì)報(bào)錯(cuò)。
推薦使用前者,向Databricks的開(kāi)發(fā)者看齊;我使用的是后者,我直接依賴(lài)了編譯好的包就不會(huì)報(bào)錯(cuò)了,純讀源碼的話也勉強(qiáng)可以跟蹤和調(diào)試。
另外,我也看有的Committer用vim看spark代碼的,所以怎么看源碼都無(wú)所謂,你熟悉就好,而且這和是不是Spark項(xiàng)目也沒(méi)什么關(guān)系。:)
怎么在Idea IDE里面打開(kāi)Spark源碼而不報(bào)錯(cuò)
首先我們先點(diǎn)擊一個(gè)工程的Project Structure菜單,這時(shí)候會(huì)彈出一個(gè)對(duì)話框,仔細(xì)的用戶(hù)肯定會(huì)發(fā)現(xiàn)里面列出來(lái)的模塊(Module)居然沒(méi)有yarn!就是這個(gè)原因?qū)е聐arn模塊相關(guān)的代碼老是報(bào)錯(cuò)!只需要將yarn模塊加入到這里即可。
步驟依次選擇 Add-Import Module-選擇pom.xml,然后一步一步點(diǎn)擊確定,這時(shí)候會(huì)在對(duì)話框里面多了spark-yarn_2.10模塊,
然后點(diǎn)擊Maven Projects里面的Reimport All Maven Projects,等yarn模塊里面的所有依賴(lài)全部下載完的時(shí)候,我們就可以看到這個(gè)模塊里面的代碼終于不再報(bào)錯(cuò)了!!
Spark源碼分析之SparkSubmit的流程
本文主要對(duì)SparkSubmit的任務(wù)提交流程源碼進(jìn)行分析。 Spark源碼版本為2.3.1。
首先閱讀一下啟動(dòng)腳本,看看首先加載的是哪個(gè)類(lèi),我們看一下 spark-submit 啟動(dòng)腳本中的具體內(nèi)容。
可以看到這里加載的類(lèi)是org.apache.spark.deploy.SparkSubmit,并且把啟動(dòng)相關(guān)的參數(shù)也帶過(guò)去了。下面我們跟一下源碼看看整個(gè)流程是如何運(yùn)作的...
SparkSubmit的main方法如下
這里我們由于我們是提交作業(yè),所有會(huì)走上面的submit(appArgs, uninitLog)方法
可以看到submit方法首先會(huì)準(zhǔn)備任務(wù)提交的環(huán)境,調(diào)用了prepareSubmitEnvironment,該方法會(huì)返回四元組,該方法中會(huì)調(diào)用doPrepareSubmitEnvironment,這里我們重點(diǎn)注意 childMainClass類(lèi)具體是什么 ,因?yàn)檫@里涉及到后面啟動(dòng)我們主類(lèi)的過(guò)程。
以下是doPrepareSubmitEnvironment方法的源碼...
可以看到該方法首先是解析相關(guān)的參數(shù),如jar包,mainClass的全限定名,系統(tǒng)配置,校驗(yàn)一些參數(shù),等等,之后的關(guān)鍵點(diǎn)就是根據(jù)我們 deploy-mode 參數(shù)來(lái)判斷是如何運(yùn)行我們的mainClass,這里主要是通過(guò)childMainClass這個(gè)參數(shù)來(lái)決定下一步首先啟動(dòng)哪個(gè)類(lèi)。
childMainClass根據(jù)部署模型有不同的值:
之后該方法會(huì)把準(zhǔn)備好的四元組返回,我們接著看之前的submit方法
可以看到這里最終會(huì)調(diào)用doRunMain()方法去進(jìn)行下一步。
doRunMain的實(shí)現(xiàn)如下...
doRunMain方法中會(huì)判斷是否需要一個(gè)代理用戶(hù),然后無(wú)論需不需要都會(huì)執(zhí)行runMain方法,我們接下來(lái)看看runMain方法是如何實(shí)現(xiàn)的。
這里我們只假設(shè)以集群模式啟動(dòng),首先會(huì)加載類(lèi),將我們的childMainClass加載為字節(jié)碼對(duì)象mainClass ,然后將mainClass 映射成SparkApplication對(duì)象,因?yàn)槲覀円约耗J絾?dòng),那么上一步返回四元組中的childMainClass的參數(shù)為ClientApp的全限定名,而這里會(huì)調(diào)用app實(shí)例的start方法因此,這里最終調(diào)用的是ClientApp的start方法。
ClientApp的start方法如下...
可以看到這里和之前我們的master啟動(dòng)流程有些相似。
可以參考我上一篇文章 Spark源碼分析之Master的啟動(dòng)流程 對(duì)這一流程加深理解。
首先是準(zhǔn)備rpcEnv環(huán)境,之后通過(guò)master的地址獲取masterEndpoints端點(diǎn)相關(guān)信息,因?yàn)檫@里運(yùn)行start方法時(shí)會(huì)將之前配置的相關(guān)參數(shù)都傳進(jìn)來(lái),之后就會(huì)通過(guò)rpcEnv注冊(cè)相關(guān)clientEndPoint端點(diǎn)信息,同時(shí)需要注意,這里會(huì)把masterEndpoints端點(diǎn)信息也作為構(gòu)造ClientEndpoint端點(diǎn)的參數(shù),也就是說(shuō)這個(gè)ClientEndpoint會(huì)和masterEndpoints通信。
而在我上一篇文章中說(shuō)過(guò),只要是setupEndpoint方法被調(diào)用,一定會(huì)調(diào)用相關(guān)端點(diǎn)的的onStart方法,而這會(huì)調(diào)用clientEndPoint的onStart方法。
ClientEndPoint類(lèi)中的onStart方法會(huì)匹配launch事件。源碼如下
onStart中匹配我們的launch的過(guò)程,這個(gè)過(guò)程是啟動(dòng)driverWrapper的過(guò)程,可以看到上面源碼中封裝了mainClass ,該參數(shù)對(duì)應(yīng)DriverWrapper類(lèi)的全限定名,之后將mainClass封裝到command中,然后封裝到driverDescription中,向Master申請(qǐng)啟動(dòng)Driver。
這個(gè)過(guò)程會(huì)向Mster發(fā)送消息,是通過(guò)rpcEnv來(lái)實(shí)現(xiàn)發(fā)射消息的,而這里就涉及到outbox信箱,會(huì)調(diào)用postToOutbox方法,向outbox信箱中添加消息,然后通過(guò)TransportClient的send或sendRpc方法發(fā)送消息。發(fā)件箱以及發(fā)送過(guò)程是在同一個(gè)線程中進(jìn)行。
而細(xì)心的同學(xué)會(huì)注意到這里調(diào)用的方法名為SendToMasterAndForwardReply,見(jiàn)名之意,發(fā)送消息到master并且期待回應(yīng)。
下面是rpcEnv來(lái)實(shí)現(xiàn)向遠(yuǎn)端發(fā)送消息的一個(gè)調(diào)用流程,最終會(huì)通過(guò)netty中的TransportClient來(lái)寫(xiě)出。
之后,Master端會(huì)觸發(fā)receiveAndReply函數(shù),匹配RequestSubmitDriver樣例類(lèi),完成模式匹配執(zhí)行后續(xù)流程。
可以看到這里首先將Driver信息封裝成DriverInfo,然后添加待調(diào)度列表waitingDrivers中,然后調(diào)用通用的schedule函數(shù)。
由于waitingDrivers不為空,則會(huì)走LaunchDriver的流程,當(dāng)前的application申請(qǐng)資源,這時(shí)會(huì)向worker發(fā)送消息,觸發(fā)Worker的receive方法。
Worker的receive方法中,當(dāng)Worker遇到LaunchDriver指令時(shí),創(chuàng)建并啟動(dòng)一個(gè)DriverRunner,DriverRunner啟動(dòng)一個(gè)線程,異步的處理Driver啟動(dòng)工作。這里說(shuō)啟動(dòng)的Driver就是剛才說(shuō)的org.apache.spark.deploy.worker.DriverWrapper
可以看到上面在DriverRunner中是開(kāi)辟線程異步的處理Driver啟動(dòng)工作,不會(huì)阻塞主進(jìn)程的執(zhí)行,而prepareAndRunDriver方法中最終調(diào)用 runDriver..
runDriver中主要先做了一些初始化工作,接著就開(kāi)始啟動(dòng)driver了。
上述Driver啟動(dòng)工作主要分為以下幾步:
下面我們直接看DriverWrapper的實(shí)現(xiàn)
DriverWrapper,會(huì)創(chuàng)建了一個(gè)RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監(jiān)控Worker節(jié)點(diǎn)是否正常,如果出現(xiàn)異常就直接退出,然后當(dāng)前的ClassLoader加載userJar,同時(shí)執(zhí)行userMainClass,在執(zhí)行用戶(hù)的main方法后關(guān)閉workerWatcher。
以上就是SparkSubmit的流程,下一篇我會(huì)對(duì)SparkContext的源碼進(jìn)行解析。
歡迎關(guān)注...
如何閱讀spark源碼的介紹就聊到這里吧,感謝你花時(shí)間閱讀本站內(nèi)容,更多關(guān)于spark源碼看什么書(shū)、如何閱讀spark源碼的信息別忘了在本站進(jìn)行查找喔。
掃描二維碼推送至手機(jī)訪問(wèn)。
版權(quán)聲明:本文由飛速云SEO網(wǎng)絡(luò)優(yōu)化推廣發(fā)布,如需轉(zhuǎn)載請(qǐng)注明出處。