Twitter把Kafka當(dāng)作存儲(chǔ)系統(tǒng)使用,使用kafka的公司Twitter把Kafka當(dāng)作存儲(chǔ)系統(tǒng)使用當(dāng)開發(fā)者通過(guò)API消費(fèi)Twitter的公共數(shù)據(jù)時(shí),他們需要獲得可靠性、速度和穩(wěn)定性方面的保證。因此,在不久前,我們推出了Account Activity Replay API幫助開發(fā)者們提升他們系統(tǒng)的穩(wěn)定性。這個(gè)A......
當(dāng)開發(fā)者通過(guò)API消費(fèi)Twitter的公共數(shù)據(jù)時(shí),他們需要獲得可靠性、速度和穩(wěn)定性方面的保證。因此,在不久前,我們推出了Account Activity Replay API幫助開發(fā)者們提升他們系統(tǒng)的穩(wěn)定性。這個(gè)API是一個(gè)數(shù)據(jù)恢復(fù)工具,開發(fā)者可以用它來(lái)檢索最早發(fā)生在5天前的事件,恢復(fù)由于各種原因(包括在實(shí)時(shí)傳遞時(shí)突然發(fā)生的服務(wù)器中斷)沒有被傳遞的事件。
除了構(gòu)建API來(lái)提升開發(fā)者體驗(yàn),我們還做了一些優(yōu)化:
·提高Twitter內(nèi)部工程師的生產(chǎn)力。
·保持系統(tǒng)的可維護(hù)性。具體來(lái)說(shuō),就是盡量減少開發(fā)人員、站點(diǎn)可靠性工程師和其他與系統(tǒng)交互的人員的上下文切換。
基于這些原因,在構(gòu)建這個(gè)API所依賴的回放系統(tǒng)時(shí),我們利用了Account Activity API現(xiàn)有的實(shí)時(shí)系統(tǒng)設(shè)計(jì)。這有助于我們重用現(xiàn)有的工作,并最小化上下文切換負(fù)擔(dān)和培訓(xùn)工作。
實(shí)時(shí)系統(tǒng)采用了發(fā)布和訂閱架構(gòu)。為了保持架構(gòu)的一致性,構(gòu)建一個(gè)可以讀取數(shù)據(jù)的存儲(chǔ)層,我們想到了傳統(tǒng)的流式技術(shù)——Kafka。
1
背景
兩個(gè)數(shù)據(jù)中心產(chǎn)生實(shí)時(shí)事件,事件被寫入到跨數(shù)據(jù)中心的主題上,實(shí)現(xiàn)數(shù)據(jù)冗余。
但并不是所有的事件都需要被傳遞,所以會(huì)有一個(gè)內(nèi)部應(yīng)用程序負(fù)責(zé)對(duì)事件進(jìn)行篩選。這個(gè)應(yīng)用程序消費(fèi)來(lái)自這些主題的事件,根據(jù)保存在鍵值存儲(chǔ)中的一組規(guī)則來(lái)檢查每一個(gè)事件,并決定是否應(yīng)該通過(guò)我們的公共API將消息傳遞給特定的開發(fā)者。事件是通過(guò)Webhook傳遞的,每個(gè)Webhook URL都有一個(gè)開發(fā)人員負(fù)責(zé)維護(hù),并有唯一的ID標(biāo)識(shí)。
圖1:數(shù)據(jù)生成管道
2
存儲(chǔ)和分區(qū)
通常,在構(gòu)建一個(gè)需要存儲(chǔ)層的回放系統(tǒng)時(shí),人們可能會(huì)使用基于Hadoop和HDFS的架構(gòu)。但我們選擇了Kafka,主要基于以下兩個(gè)原因:
·已有的實(shí)時(shí)系統(tǒng)采用了發(fā)布和訂閱架構(gòu);
·回放系統(tǒng)存儲(chǔ)的事件量不是PB級(jí)的,我們存儲(chǔ)的數(shù)據(jù)不會(huì)超過(guò)幾天。此外,執(zhí)行Hadoop的MapReduce作業(yè)比在Kafka上消費(fèi)數(shù)據(jù)成本更高、速度更慢,達(dá)不到開發(fā)者的期望。
要利用實(shí)時(shí)管道來(lái)構(gòu)建回放管道,首先要確保事件被存儲(chǔ)在Kafka中。我們把Kafka主題叫作deliverylog,每個(gè)數(shù)據(jù)中心都有一個(gè)這樣的主題。然后,這些主題被交叉復(fù)制,實(shí)現(xiàn)數(shù)據(jù)冗余,以便支持來(lái)自數(shù)據(jù)中心之外的重放請(qǐng)求。事件在被傳遞之前經(jīng)過(guò)去重操作。
在這個(gè)Kafka主題上,我們使用默認(rèn)的分區(qū)機(jī)制創(chuàng)建了多個(gè)分區(qū),分區(qū)與WebhookId的散列值(事件記錄的鍵)一一對(duì)應(yīng)。我們考慮過(guò)使用靜態(tài)分區(qū),但最終決定不使用它,因?yàn)槿绻渲幸粋€(gè)開發(fā)人員生成的事件多于其他開發(fā)人員,那么這個(gè)分區(qū)包含的數(shù)據(jù)將多于其他分區(qū),造成了分區(qū)的不均衡。相反,我們選擇固定數(shù)量的分區(qū),然后使用默認(rèn)分區(qū)策略來(lái)分布數(shù)據(jù),這樣就降低了分區(qū)不均衡的風(fēng)險(xiǎn),并且不需要讀取Kafka主題的所有分區(qū)。重放服務(wù)基于請(qǐng)求的WebhookId來(lái)確定要讀取哪個(gè)分區(qū),并為該分區(qū)啟動(dòng)一個(gè)新的Kafka消費(fèi)者。主題的分區(qū)數(shù)量不會(huì)發(fā)生變化,因?yàn)檫@會(huì)影響鍵的散列和事件的分布。
我們使用了固態(tài)磁盤,根據(jù)每個(gè)時(shí)間段讀取的事件數(shù)量來(lái)分配空間。我們選擇這種磁盤而不是傳統(tǒng)的硬盤驅(qū)動(dòng)器,以此來(lái)獲得更快的處理速度,并減少與查找和訪問(wèn)操作相關(guān)的開銷。因?yàn)槲覀冃枰L問(wèn)低頻數(shù)據(jù),無(wú)法獲得頁(yè)面緩存優(yōu)化的好處,所以最好是使用固態(tài)磁盤。
為了最小化存儲(chǔ)空間,我們使用了snappy壓縮算法。我們知道大部分處理工作都在消費(fèi)端,之所以選擇snappy,是因?yàn)樗诮鈮簳r(shí)比其他Kafka所支持的壓縮算法(如gzip和lz4)更快。
3
請(qǐng)求和處理
在我們?cè)O(shè)計(jì)的這個(gè)系統(tǒng)中,通過(guò)API調(diào)用來(lái)發(fā)快遞重放請(qǐng)求。我們從請(qǐng)求消息體中獲取WebhookId和要重放的事件的日期范圍。這些請(qǐng)求被持久化到MySQL中,相當(dāng)于進(jìn)入了隊(duì)列,直到它們被重放服務(wù)讀取。請(qǐng)求中的日期范圍用于確定要讀取的分區(qū)的偏移量。消費(fèi)者對(duì)象的offsetForTimes函數(shù)用于獲取偏移量。
圖2:重放系統(tǒng)接收請(qǐng)求,并將請(qǐng)求發(fā)快遞給配置服務(wù)(數(shù)據(jù)訪問(wèn)層),然后被持久化到數(shù)據(jù)庫(kù)中。
重放服務(wù)處理每個(gè)重放請(qǐng)求,它們通過(guò)MySQL相互協(xié)調(diào),處理數(shù)據(jù)庫(kù)中的下一個(gè)需要重放的記錄。重放進(jìn)程定期輪詢MySQL,獲取需要被處理的掛起作業(yè)。一個(gè)請(qǐng)求會(huì)在各種狀態(tài)之間轉(zhuǎn)換。等待被處理的請(qǐng)求處于開放狀態(tài)(OPEN STATE),剛退出隊(duì)列的請(qǐng)求處于啟動(dòng)狀態(tài)(STARTED STATE),正在被處理的請(qǐng)求處于進(jìn)行中狀態(tài)(ONGOING STATE),已處理完成的請(qǐng)求將轉(zhuǎn)換到已完成狀態(tài)(COMPLETED STATE)。一個(gè)重放進(jìn)程只會(huì)選擇一個(gè)尚未啟動(dòng)的請(qǐng)求(即處于打開狀態(tài)的請(qǐng)求)。
每隔一段時(shí)間,當(dāng)一個(gè)工作進(jìn)程將一個(gè)請(qǐng)求退出隊(duì)列后,它會(huì)在MySQL表中寫入時(shí)間戳,表示正在處理當(dāng)前的重放作業(yè)。如果重放進(jìn)程在處理請(qǐng)求時(shí)死掉了,相應(yīng)的作業(yè)將被重新啟動(dòng)。因此,除了將處于打開狀態(tài)的請(qǐng)求退出隊(duì)列之外,重放操作還將讀取處于已開始或正在進(jìn)行中的、在預(yù)定義的分鐘數(shù)內(nèi)沒有心跳的作業(yè)。
圖3:數(shù)據(jù)傳遞層:重放服務(wù)通過(guò)輪詢MySQL來(lái)讀取作業(yè),消費(fèi)來(lái)自Kafka的消息,并通過(guò)Webhook服務(wù)傳遞事件。
在讀取事件時(shí)會(huì)進(jìn)行去重操作,然后事件被發(fā)布到消費(fèi)者端的Webhook URL上。去重是通過(guò)維護(hù)被讀取事件的散列值緩存來(lái)實(shí)現(xiàn)的。如果遇到具有相同散列值的事件,就不傳遞這個(gè)事件。
總的來(lái)說(shuō),我們的解決方案與傳統(tǒng)的將Kafka作為實(shí)時(shí)、流式系統(tǒng)的用法不一樣。我們成功地將Kafka作為存儲(chǔ)系統(tǒng),構(gòu)建了一個(gè)API,在進(jìn)行事件恢復(fù)時(shí)提升了用戶體驗(yàn)和數(shù)據(jù)訪問(wèn)能力。我們利用已有的實(shí)時(shí)系統(tǒng)設(shè)計(jì)讓系統(tǒng)的維護(hù)變得更加容易。此外,客戶數(shù)據(jù)的恢復(fù)速度達(dá)到了我們的預(yù)期。
原文鏈接
https://blog.twitter.com/engineering/enus/topics/infrastructure/2020/kafkaasastoragesystem.html
特別聲明:以上文章內(nèi)容僅代表作者本人觀點(diǎn),不代表ESG跨境電商觀點(diǎn)或立場(chǎng)。如有關(guān)于作品內(nèi)容、版權(quán)或其它問(wèn)題請(qǐng)于作品發(fā)表后的30日內(nèi)與ESG跨境電商聯(lián)系。
二維碼加載中...
使用微信掃一掃登錄
使用賬號(hào)密碼登錄
平臺(tái)顧問(wèn)
微信掃一掃
馬上聯(lián)系在線顧問(wèn)
小程序
ESG跨境小程序
手機(jī)入駐更便捷
返回頂部