本文主要介紹了達達集團使用基于開源的Flink Stream SQL開發(fā)的Dada Flink SQL進行實時計算任務SQL化過程中的實踐經(jīng)驗。
時間回到2018年,在數(shù)據(jù)平臺和數(shù)據(jù)團隊的共同努力下,我們已經(jīng)有了完整的離線計算流程,完善的離線數(shù)倉模型,也上線了很多的數(shù)據(jù)產(chǎn)品和大量的數(shù)據(jù)報表。隨著業(yè)務的發(fā)展,我們也逐漸面臨著越來越多的實時計算方面的需求。隨著Flink在國內(nèi)的逐漸流行,實時計算也越來越多地進入我們的視野。當時,F(xiàn)link的SQL功能還不完善,大量數(shù)據(jù)開發(fā)需要的功能無法使用SQL表達。因此,我們的選擇和很多公司的選擇類似,通過對Flink的框架和API進行封裝,降低我們的數(shù)據(jù)開發(fā)人員進行實時任務開發(fā)的難度。針對這些需求我們計劃通過一些封裝,使得數(shù)據(jù)開發(fā)同學無需開發(fā)Java或者Scala代碼,專注于業(yè)務邏輯的開發(fā)。由于開發(fā)資源有限,我們傾向于通過引進一些開源的框架并進行定制性的開發(fā)來完成這個任務。通過一些調(diào)研,我們鎖定了袋鼠云的Flink Stream SQL(以下簡稱FSL)和Uber的AthenaX。對比后,F(xiàn)SL的豐富的插件、開發(fā)的活躍度和支持的相對完善對于我們更有吸引力。因此,我們引進了袋鼠云的FSL,并基于FSL開發(fā)了達達的SQL計算引擎Dada Flink SQL(以下簡稱DFL),并以此進行實時計算任務的SQL化。
首先介紹一下DFL的架構(gòu)。DFL中的主要組件為launcher、core、source插件、sink插件、Flink Siddhi插件以及side插件,其中Flink Siddhi為我們根據(jù)開源的Flink Siddhi接入的基于Siddhi的規(guī)則引擎,后面我們會有專門的文章介紹Flink Siddhi相關(guān)的內(nèi)容和我們做的封裝。launcher負責加載必要的source/side/sink插件,并將Flink program提交到Flink集群,支持session cluster模式和single job模式。core模塊負責解析SQL語句,生成SQLTree,并根據(jù)解析的source、sink、Flink Siddhi和side內(nèi)容加載相應的插件,生成必要的組件并注冊進Flink TableEnvironment。之后,根據(jù)SQL是否使用了維表JOIN的功能 ,會選擇直接調(diào)用TableEnvironment.sqlUpdate()或者進行維表JOIN的處理。除維表JOIN之外,根據(jù)我們數(shù)據(jù)開發(fā)同學的需求,我們還加入了INTERVAL JOIN的支持。使用流程表示,DFL的整體流程如下圖所示。
2.1 Parser
DFL使用Parser來解析SQL語句,解析為相應的數(shù)據(jù)結(jié)構(gòu),并放入SqlTree進行管理以便后續(xù)使用。Parser定義了良好的接口,易于通過增加新的實現(xiàn)類來增加對新的SQL語法的支持。Parser的接口定義如下:
其中match用于判斷一個具體的Parser的實現(xiàn)能否實現(xiàn)對給定的SQL語句的解析,verifySyntax為我們新增加的接口功能,用于驗證給定SQL的語法是否正確,并將相關(guān)的錯誤信息放入errorInfo中供調(diào)用方使用,parserSql實現(xiàn)具體的SQL語法的解析工作。我們?yōu)镮Parser增加了很多的實現(xiàn)以實現(xiàn)新的功能,例如增加對Flink Siddhi的支持等。
2.2 維表JOIN
DFL中包含兩種維表JOIN的實現(xiàn)方式:ALL及SIDE方式。ALL方式會將需要JOIN的數(shù)據(jù)一次性讀取并緩存到Task的內(nèi)存中,并可以設(shè)置定期刷新緩存;SIDE方式則在需要進行JOIN時從相應的數(shù)據(jù)源中讀取相應的數(shù)據(jù),并根據(jù)設(shè)置決定是否將讀取到的數(shù)據(jù)緩存在內(nèi)存中。ALL和SIDE模式相應的抽象類的定義分別為AllReqRow和AsyncReqRow,他們都實現(xiàn)了共同的接口ISideReqRow,ISideReqRow中定義了用于將事實表的數(shù)據(jù)和維表讀取的數(shù)據(jù)進行JOIN的方法Row fillData(Row input, Object sideInput)。AllReqRow和AsyncReqRow的定義分別如下:
可以看到其中使用了模板方法的設(shè)計模式。
AsyncSideReqRow主要提供了初始化LRU緩存,從LRU緩存中獲取數(shù)據(jù)以及從數(shù)據(jù)源或者LRU緩存中無法找到需要JOIN的數(shù)據(jù)時的默認處理方法。
開發(fā)DFL的過程中,根據(jù)一些業(yè)務相關(guān)的需求及簡化數(shù)據(jù)開發(fā)人員使用DFL的需要,我們在原生FSL的基礎(chǔ)上進行了大量的改進和擴展的工作,下面介紹一些我們在DFL上做的工作。
3.1 Flink HA模式下,SESSION模式提交任務超時
為了Flink任務有較好的容錯性,我們?yōu)镕link集群配置了基于ZooKeper的HA。出于任務管理和維護的需要,我們的一些Flink任務使用了session模式,在將這些任務遷移到DFL后,發(fā)現(xiàn)提交任務時,會報超時的錯誤。查閱Flink的官方文檔也沒有發(fā)現(xiàn)線索。后面經(jīng)過我們的探索,發(fā)現(xiàn)了在YARN session模式下,配置了HA時,進行任務提交需要指定high-availability.cluster-id。添加了如下代碼后,SESSION模式下,任務可以正常提交了。
3.2 Kafka支持使用SQL關(guān)鍵字作為JSON的字段名
當在Flink中使用了SQL關(guān)鍵字作字段名時,即使將字段名用反引號包起來,依然會報如下的錯誤:
這個是Flink的bug,已經(jīng)在1.10.1中作了修復,詳見這個issue:https://issues.apache.org/jira/browse/FLINK-16526。我們使用的版本為Flink 1.6.2,無法使用這個修復。我們的做法是支持將Kafka中JSON的字段名和引用這個JSON字段的列名作解耦,即在Flink SQL中使用指定的列名引用該JSON字段,而用于JSON解析的還是原始的JSON字段名。具體來說,我們在元數(shù)據(jù)系統(tǒng)中,支持為Kafka類型的表注冊一個可選的sourceName。如果注冊了sourceName,F(xiàn)link Stream SQL將使用sourceName去JSON中解析對應的字段。
3.3 元數(shù)據(jù)整合
DFL上線后,通過添加必要的功能,使用純SQL開發(fā)已經(jīng)滿足我們的很多實時任務開發(fā)的需求。但是在DFL運行一段時間后,我們注意到了管理各種上下游存儲的信息給我們的數(shù)據(jù)開發(fā)人員帶來的困擾。我們線上使用的存儲系統(tǒng)包括了Kafka、HBase、ElasticSearch、Redis和MySQL(之后又引入了ClickHouse)。這些數(shù)據(jù)源基本都是異構(gòu)的,連接及用戶信息各異,而且在不同的任務中使用相同的數(shù)據(jù)源,每次都需要使用CREATE TABLE
元數(shù)據(jù)管理系統(tǒng)開發(fā)完成后,我們將Flink Stream SQL和元數(shù)據(jù)管理系統(tǒng)進行了深度集成。通過引入USE TABLE <> AS <> WITH ()的語法,我們的數(shù)據(jù)開發(fā)人員只需要將數(shù)據(jù)源在元數(shù)據(jù)管理系統(tǒng)中進行注冊 ,之后在Flink Stream SQL中引用注冊后的表就無需再填寫任何連接信息,而且如果需要引用所有的字段的話,也無需再填寫字段信息。如果不想要引用所有的子段,有兩種辦法可以做到。第一種方法是在USE TABLE的WITH里面使用columns表達需要引用的字段,第二種方法是在元數(shù)據(jù)系統(tǒng)里注冊一張只包含了要引用的字段的表。
3.4 Redis hash/set數(shù)據(jù)類型的支持
FSL已經(jīng)內(nèi)置了對Redis作為sink table和side table的支持,但是FSL只支持Redis的String類型的數(shù)據(jù),而我們的場景會使用到Redis的hash和set類型的數(shù)據(jù),因此我們需要添加對Redis這兩種數(shù)據(jù)類型的支持。首先介紹一下將Redis中的數(shù)據(jù)映射到Flink中的表的方法,在我們的Redis的key中包含了兩部分的內(nèi)容(使用":"分隔),兩部分分別為固定的keyPrefix和由一到多個字段的值使用":"拼接的primaryKey,其中keyPrefix模擬表的概念,也方便Redis中存儲的內(nèi)容的管理。對String類型的數(shù)據(jù),Redis的key會在上面介紹的key的基礎(chǔ)上拼接上字段名稱(使用":"作為分隔符),并以字段的值作為該key對應的value寫入Redis中;對Hash類型的數(shù)據(jù),Redis的完整的key就為上面介紹的key,hash的key則由用戶指定的字段的值使用":"拼接而成,類似的,hash的value由用戶指定的字段的值拼接而成。除了Redis hash和set數(shù)據(jù)類型的支持之外,我們還為Redis增加了setnx和hsetnx以及TTL的功能。
3.5 ClickHouse sink的支持
FSL內(nèi)置了對Kafka、MySQL、Redis、Elasticsearch和HBbase等數(shù)據(jù)源作為目標表的支持,但是我們在使用的過程中也遇到了一些新的數(shù)據(jù)源作為目標寫入端的要求,為此我們開發(fā)了新的sink插件來支持這種需求。我們開發(fā)和維護的sink插件包括了ClickHouse和HdfsFile。下面以ClickHouse的sink為例介紹一下我們在這方面所做的一些工作。
對于ClickHouse,我們開發(fā)了實現(xiàn)了RichSinkFunction和CheckpointedFunction的ClickhouseSink。通過實現(xiàn)CheckpointedFunction并在snapshotState()方法中將數(shù)據(jù)刷寫到ClickHouse來確保數(shù)據(jù)不會丟失。為了處理不同的輸入數(shù)據(jù)類型,我們提供接口ClickhouseMapper
不同于通常情況下由用戶提供sink表的schema的方式,我們通過執(zhí)行DESC
的方式從ClickHouse獲取表的schema。為了處理ClickHouse中的特殊數(shù)據(jù)類型,例如nullable(String),Int32等,我們使用正則表達式提取出實際的類型進行寫入,相關(guān)的代碼如下。為了寫入數(shù)據(jù)的過程不阻塞正常的數(shù)據(jù)處理流程,我們使用了將數(shù)據(jù)寫入任務放入線程池的方式。同時為了在Flink任務失敗的情況下不發(fā)生數(shù)據(jù)丟失的情況,在snapshotState()方法中等待線程池中的任務完成。
3.6 BINLOG表達的簡化
為了處理線上數(shù)據(jù)的更新,我們采用了阿里巴巴開源的Canal采集MySQL binlog并發(fā)送到Kafka的方式。由于binlog特殊的數(shù)據(jù)組織形式,處理binlog的數(shù)據(jù)需要做很多繁雜的工作,例如從binlog的columnValues或者updatedValues字段中使用udf取出實際增加或者更新的字段。由于我們將Flink Stream SQL和元數(shù)據(jù)系統(tǒng)進行了對接,因此我們可以拿到MySQL表的schema信息,從而我們可以提供語法封裝來幫助數(shù)據(jù)開發(fā)人員減少這種重復性的SQL表達。為此,我們引入一種新的SQL語法:USE BINLOG TABLE,這種語法的格式如下。
我們會將這種語法展開為如下的內(nèi)容。
在DFL上線后,由于可以使用純SQL進行開發(fā),符合數(shù)據(jù)開發(fā)同學的開發(fā)習慣,而且我們提供了很多的語法封裝,加上元數(shù)據(jù)管理帶來的便利,數(shù)據(jù)開發(fā)同學逐步將一些實時計算任務遷移到了DFL上,這為部門帶來了極大的效率提升。截止到目前,DFL已經(jīng)應用到了達達集團的各個數(shù)據(jù)應用系統(tǒng)中,系統(tǒng)中運行的實時計算任務已經(jīng)達到70多個,涵蓋達達快送、京東到家的各個業(yè)務及流量模塊,而且實時計算任務數(shù)量和SQL化占比還在穩(wěn)步增加中。隨著大數(shù)據(jù)部門的計算基礎(chǔ)設(shè)施開放,現(xiàn)在我們的實時計算能力也在集團其它部門中得到了越來越廣泛的應用。
當前Flink的社區(qū)版本已經(jīng)發(fā)展到了1.10,F(xiàn)link Table/SQL本身已經(jīng)支持了DFL提供的多數(shù)功能,出于降低維護組件復雜度的考慮,我們計劃后續(xù)引入Flink 1.10,并逐步推廣Flink 1.10的使用,以期最后將所有的任務都遷移到最新的Flink版本上。
公司內(nèi)部在逐步推廣私有云的使用,考慮到社區(qū)在Flink on K8s上的進展,我們后續(xù)在引入新版本的Flink時,將嘗試在公司的私有云上進行部署。
作者簡介:馬陽陽 達達集團數(shù)據(jù)平臺高級開發(fā)工程師,負責達達集團計算引擎相關(guān)的維護和開發(fā)工作
順豐、中通、圓通、韻達、申通、極兔的高管工資獎金有多高?
1778 閱讀鳴鳴很忙VS三只松鼠 ,誰的供應鏈更抗打?
1603 閱讀從規(guī)模到質(zhì)量,韻達開啟2025年增長之路
1476 閱讀Gartner 2025 WMS魔力象限看倉儲管理系統(tǒng)發(fā)展趨勢
1402 閱讀河南首輛跨境電商TIR國際卡班發(fā)運
983 閱讀商家朋友們注意了,抖音電商再次升級物流服務
788 閱讀Shopee一季度GMV達286億美元
756 閱讀中儲智運以數(shù)智物流賦能鹽湖產(chǎn)業(yè)鏈
770 閱讀順心捷達上線“承諾達”產(chǎn)品
687 閱讀《馬士基亞太區(qū)5月市場資訊》發(fā)布:以變應變,破局前行
705 閱讀