<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    一口氣說出 6 種實現延時消息的方案

    VSole2022-05-29 16:07:26

    延時消息(定時消息)指的在分布式異步消息場景下,生產端發送一條消息,希望在指定延時或者指定時間點被消費端消費到,而不是立刻被消費。

    延時消息適用的業務場景非常的廣泛,在分布式系統環境下,延時消息的功能一般會在下沉到中間件層,通常是 MQ 中內置這個功能或者內聚成一個公共基礎服務。

    本文旨在探討常見延時消息的實現方案以及方案設計的優缺點。

    實現方案

    基于外部存儲實現的方案

    這里討論的外部存儲指的是在 MQ 本身自帶的存儲以外又引入的其他的存儲系統。

    基于外部存儲的方案本質上都是一個套路,將 MQ 和 延時模塊 區分開來,延時消息模塊是一個獨立的服務/進程。延時消息先保留到其他存儲介質中,然后在消息到期時再投遞到 MQ。當然還有一些細節性的設計,比如消息進入的延時消息模塊時已經到期則直接投遞這類的邏輯,這里不展開討論。

    下述方案不同的是,采用了不同的存儲系統。

    基于 數據庫(如MySQL)

    基于關系型數據庫(如MySQL)延時消息表的方式來實現。

    CREATE TABLE `delay_msg` (  
      `id` bigint unsigned NOT NULL AUTO_INCREMENT,  
      `delivery_time` DATETIME NOT NULL COMMENT '投遞時間',  
      `payloads` blob COMMENT '消息內容',  
      PRIMARY KEY (`id`),  
      KEY `time_index` (`delivery_time`)  
    )  
    

    通過定時線程定時掃描到期的消息,然后進行投遞。定時線程的掃描間隔理論上就是你延時消息的最小時間精度。

    優點:

    • 實現簡單;

    缺點:

    • B+Tree索引不適合消息場景的大量寫入;

    基于 RocksDB

    RocksDB 的方案其實就是在上述方案上選擇了比較合適的存儲介質。

    RocksDB 使用的是LSM Tree,LSM 樹更適合大量寫入的場景。滴滴開源的DDMQ中的延時消息模塊 Chronos 就是采用了這個方案。

    DDMQ 這個項目簡單來說就是在 RocketMQ 外面加了一層統一的代理層,在這個代理層就可以做一些功能維度的擴展。延時消息的邏輯就是代理層實現了對延時消息的轉發,如果是延時消息,會先投遞到 RocketMQ 中 Chronos 專用的 topic 中。延時消息模塊 Chronos 消費得到延時消息轉儲到 RocksDB,后面就是類似的邏輯了,定時掃描到期的消息,然后往 RocketMQ 中投遞。

    這個方案老實說是一個比較重的方案。因為基于 RocksDB 來實現的話,從數據可用性的角度考慮,你還需要自己去處理多副本的數據同步等邏輯。

    優點:

    • RocksDB LSM 樹很適合消息場景的大量寫入;

    缺點:

    • 實現方案較重,如果你采用這個方案,需要自己實現 RocksDB 的數據容災邏輯;

    基于Redis

    再來聊聊 Redis 的方案。下面放一個比較完善的方案。

    • Messages Pool 所有的延時消息存放,結構為KV結構,key為消息ID,value為一個具體的message(這里選擇Redis Hash結構主要是因為hash結構能存儲較大的數據量,數據較多時候會進行漸進式rehash擴容,并且對于HSET和HGET命令來說時間復雜度都是O(1))
    • Delayed Queue是16個有序隊列(隊列支持水平擴展),結構為ZSET,value 為 messages pool中消息ID,score為過期時間(分為多個隊列是為了提高掃描的速度)
    • Worker 代表處理線程,通過定時任務掃描 Delayed Queue 中到期的消息

    這個方案選用 Redis 存儲在我看來有幾點考慮。

    • Redis ZSET 很適合實現延時隊列
    • 性能問題,雖然 ZSET 插入是一個 O(logn) 的操作,但是Redis 基于內存操作,并且內部做了很多性能方面的優化。

    但是這個方案其實也有需要斟酌的地方,上述方案通過創建多個 Delayed Queue 來滿足對于并發性能的要求,但這也帶來了多個 Delayed Queue 如何在多個節點情況下均勻分配,并且很可能出現到期消息并發重復處理的情況,是否要引入分布式鎖之類的并發控制設計?

    在量不大的場景下,上述方案的架構其實可以蛻化成主從架構,只允許主節點來處理任務,從節點只做容災備份。實現難度更低更可控。

    定時線程檢查的缺陷與改進

    上述幾個方案中,都通過線程定時掃描的方案來獲取到期的消息。

    定時線程的方案在消息量較少的時候,會浪費資源,在消息量非常多的時候,又會出現因為掃描間隔設置不合理導致延時時間不準確的問題。可以借助 JDK Timer 類中的思想,通過 wait-notify 來節省 CPU 資源。

    獲取中最近的延時消息,然后wait(執行時間-當前時間),這樣就不需要浪費資源到達時間時會自動響應,如果有新的消息進入,并且比我們等待的消息還要小,那么直接notify喚醒,重新獲取這個更小的消息,然后又wait,如此循環。

    開源 MQ 中的實現方案

    再來講講目前自帶延時消息功能的開源MQ,它們是如何實現的

    RocketMQ

    RocketMQ 開源版本支持延時消息,但是只支持 18 個 Level 的延時,并不支持任意時間。只不過這個 Level 在 RocketMQ 中可以自定義的,所幸來說對普通業務算是夠用的。默認值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個level。另外,搜索公眾號Java后端棧后臺回復“面試”,獲取一份驚喜禮包。

    通俗的講,設定了延時 Level 的消息會被暫存在名為 SCHEDULE_TOPIC_XXXX的topic中,并根據 level 存入特定的queuequeueId = delayTimeLevel – 1,即一個queue只存相同延時的消息,保證具有相同發送延時的消息能夠順序消費。broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。

    下面是整個實現方案的示意圖,紅色代表投遞延時消息,紫色代表定時調度到期的延時消息:

    優點:

    • Level 數固定,每個 Level 有自己的定時器,開銷不大
    • 將 Level 相同的消息放入到同一個 Queue 中,保證了同一 Level 消息的順序性;不同 Level 放到不同的 Queue 中,保證了投遞的時間準確性;
    • 通過只支持固定的Level,將不同延時消息的排序變成了固定Level Topic 的追加寫操作

    缺點:

    • Level 配置的修改代價太大,固定 Level 不靈活
    • CommitLog 會因為延時消息的存在變得很大

    Pulsar

    Pulsar 支持“任意時間”的延時消息,但實現方式和 RocketMQ 不同。

    通俗的講,Pulsar 的延時消息會直接進入到客戶端發送指定的 Topic 中,然后在堆外內存中創建一個基于時間的優先級隊列,來維護延時消息的索引信息。延時時間最短的會放在頭上,時間越長越靠后。在進行消費邏輯時候,再判斷是否有到期需要投遞的消息,如果有就從隊列里面拿出,根據延時消息的索引查詢到對應的消息進行消費。

    如果節點崩潰,在這個 broker 節點上的 Topics 會轉移到其他可用的 broker 上,上面提到的這個優先級隊列也會被重建。

    下面是 Pulsar 公眾號中對于 Pulsar 延時消息的示意圖。

    乍一看會覺得這個方案其實非常簡單,還能支持任意時間的消息。但是這個方案有幾個比較大的問題:

    • 內存開銷:維護延時消息索引的隊列是放在堆外內存中的,并且這個隊列是以訂閱組(Kafka中的消費組)為維度的,比如你這個 Topic 有 N 個訂閱組,那么如果你這個 Topic 使用了延時消息,就會創建 N 個 隊列;并且隨著延時消息的增多,時間跨度的增加,每個隊列的內存占用也會上升。(是的,在這個方案下,支持任意的延時消息反而有可能讓這個缺陷更嚴重)
    • 故障轉移之后延時消息索引隊列的重建時間開銷:對于跨度時間長的大規模延時消息,重建時間可能會到小時級別。(摘自 Pulsar 官方公眾號文章)
    • 存儲開銷:延時消息的時間跨度會影響到 Pulsar 中已經消費的消息數據的空間回收。打個比方,你的 Topic 如果業務上要求支持一個月跨度的延時消息,然后你發了一個延時一個月的消息,那么你這個 Topic 中底層的存儲就會保留整整一個月的消息數據,即使這一個月中99%的正常消息都已經消費了。

    對于前面第一點和第二點的問題,社區也設計了解決方案,在隊列中加入時間分區,Broker 只加載當前較近的時間片的隊列到內存,其余時間片分區持久化磁盤,示例圖如下圖所示:

    但是目前,這個方案并沒有對應的實現版本。可以在實際使用時,規定只能使用較小時間跨度的延時消息,來減少前兩點缺陷的影響。

    另外,因為內存中存的并不是延時消息的全量數據,只是索引,所以可能要積壓上百萬條延時消息才可能對內存造成顯著影響,從這個角度來看,官方暫時沒有完善前兩個問題也可以理解了。

    至于第三個問題,估計是比較難解決的,需要在數據存儲層將延時消息和正常消息區分開來,單獨存儲延時消息。

    QMQ

    QMQ提供任意時間的延時/定時消息,你可以指定消息在未來兩年內(可配置)任意時間內投遞。

    把 QMQ 放到最后,是因為我覺得 QMQ 是目前開源 MQ 中延時消息設計最合理的。里面設計的核心簡單來說就是 多級時間輪 + 延時加載 + 延時消息單獨磁盤存儲。

    QMQ的延時/定時消息使用的是兩層 hash wheel 來實現的。

    第一層位于磁盤上,每個小時為一個刻度(默認為一個小時一個刻度,可以根據實際情況在配置里進行調整),每個刻度會生成一個日志文件(schedule log),因為QMQ支持兩年內的延時消息(默認支持兩年內,可以進行配置修改),則最多會生成2 * 366 * 24 = 17568個文件(如果需要支持的最大延時時間更短,則生成的文件更少)。

    第二層在內存中,當消息的投遞時間即將到來的時候,會將這個小時的消息索引(索引包括消息在schedule log中的offset和size)從磁盤文件加載到內存中的hash wheel上,內存中的hash wheel則是以500ms為一個刻度。

    總結一下設計上的亮點:

    • 時間輪算法適合延時/定時消息的場景,省去延時消息的排序,插入刪除操作都是 O(1) 的時間復雜度;
    • 通過多級時間輪設計,支持了超大時間跨度的延時消息;
    • 通過延時加載,內存中只會有最近要消費的消息,更久的延時消息會被存儲在磁盤中,對內存友好;
    • 延時消息單獨存儲(schedule log),不會影響到正常消息的空間回收;

    本文匯總了目前業界常見的延時消息方案,并且討論了各個方案的優缺點。希望對讀者有所啟發。

    topicrocksdb
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    目前業界常見的延時消息方案
    添加消息的任務我們稱為producer,而取出并使用消息的任務,我們稱之為consumer。kafka應運而生,它是專門設計用來做消息中間件的系統。這兩點也是kafka要解決的核心問題。為此,kafka提出了partition的概念。由于消息不會被刪除,因此可以等消費者明確告知kafka這條消息消費成功以后,再去更新游標。對于同一個topic,不同的消費組有各自的游標。
    Kafka只有在消息提交之后,才會將消息暴露給消費者。不論是producer端還是consumer端發往partition的請求,都通過leader數據副本所在的broker進行處理。這個過程由Kafka controller節點broker自動完成,主要是從Zookeeper讀取和修改受影響partition的一些元數據信息
    為了存放這些元數據,kafka 集群會為每一個 partition 在 zk 集群上創建一個節點,partition 的數量直接決定了 zk 上的節點數。當 partition 數量變多,意味著單個 broker 節點上的 partitiion Leader 切換時間變長。
    Kafka消息積壓的典型場景:1.實時/消費任務掛掉比如,我們寫的實時應用因為某種原因掛掉了,并且這個任務沒有被監控程序監控發現通知相關負責人,負責人又沒有寫自動拉起任務的腳本進行重啟。此外,Kafka分區數是Kafka并行度調優的最小單元,如果Kafka分區數設置的太少,會影響Kafka consumer消費的吞吐量。
    零信任的“問題”
    2022-03-24 18:32:08
    “問題”是一個多義詞,可以理解為負面的意義,如錯誤(Error)、麻煩和困難(Problem)、副作用(Side-effect),也可以理解為開放性的意義,如提問(Question)、題目和課題(Topic)。本文探討零信任的問題,是從宏觀的視角來分析:沒有特別關注零信任的技術細節,重點是試圖追問零信任根本的原理問題;沒有具體討論零信任的實現方法,但是卻希望解構其需求框架。文章重點不在于闡明或立論
    Apache Kafka是一個開源的分布式事件流平臺,被數千家公司用于高性能數據管道、流分析、數據集成和任務關鍵型應用程序。
    本文更多的是根據調試Windows Server 2003,分析漏洞成因。但是AD域并沒有對其進行強校驗。通過建立與域控同名卻不以\$結尾的機器賬戶,即DC,對域控進行欺騙。至此便得到了高權限ST。從上圖中可以很明確的看到域控的機器名為WINSRVSERVER$,之后會使用WINSRVSERVER作為機器賬戶名進行欺騙。攻擊準備工作相關準備工作不是本文重點,可以在noPac項目中學習。
    Kafka consumer為了及時消費消息,會以Consumer Group(消費組)的形式,啟動多個consumer消費消息。不同的消費組在消費消息時彼此互不影響,同一個消費組的consumer協調在一起消費訂閱的topic所有分區消息。
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类