<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    Kafka集群消息積壓問題及處理策略

    VSole2022-08-04 16:45:53

    通常情況下,企業中會采取輪詢或者隨機的方式,通過Kafka的producer向Kafka集群生產數據,來盡可能保證Kafk分區之間的數據是均勻分布的。

    在分區數據均勻分布的前提下,如果我們針對要處理的topic數據量等因素,設計出合理的Kafka分區數量。對于一些實時任務,比如Spark Streaming/Structured-Streaming、Flink和Kafka集成的應用,消費端不存在長時間"掛掉"的情況即數據一直在持續被消費,那么一般不會產生Kafka數據積壓的情況。

    但是這些都是有前提的,當一些意外或者不合理的分區數設置情況的發生,積壓問題就不可避免。

    Kafka消息積壓的典型場景:

    1.實時/消費任務掛掉

    比如,我們寫的實時應用因為某種原因掛掉了,并且這個任務沒有被監控程序監控發現通知相關負責人,負責人又沒有寫自動拉起任務的腳本進行重啟。

    那么在我們重新啟動這個實時應用進行消費之前,這段時間的消息就會被滯后處理,如果數據量很大,可就不是簡單重啟應用直接消費就能解決的。

    2.Kafka分區數設置的不合理(太少)和消費者"消費能力"不足

    Kafka單分區生產消息的速度qps通常很高,如果消費者因為某些原因(比如受業務邏輯復雜度影響,消費時間會有所不同),就會出現消費滯后的情況。

    此外,Kafka分區數是Kafka并行度調優的最小單元,如果Kafka分區數設置的太少,會影響Kafka consumer消費的吞吐量。

    3.Kafka消息的key不均勻,導致分區間數據不均衡

    在使用Kafka producer消息時,可以為消息指定key,但是要求key要均勻,否則會出現Kafka分區間數據不均衡。

    那么,針對上述的情況,有什么好的辦法處理數據積壓呢?

    一般情況下,針對性的解決辦法有以下幾種:

    1.實時/消費任務掛掉導致的消費滯后

    a.任務重新啟動后直接消費最新的消息,對于"滯后"的歷史數據采用離線程序進行"補漏"。

    此外,建議將任務納入監控體系,當任務出現問題時,及時通知相關負責人處理。當然任務重啟腳本也是要有的,還要求實時框架異常處理能力要強,避免數據不規范導致的不能重新拉起任務。

    b.任務啟動從上次提交offset處開始消費處理

    如果積壓的數據量很大,需要增加任務的處理能力,比如增加資源,讓任務能盡可能的快速消費處理,并趕上消費最新的消息

    2.Kafka分區少了

    如果數據量很大,合理的增加Kafka分區數是關鍵。如果利用的是Spark流和Kafka direct approach方式,也可以對KafkaRDD進行repartition重分區,增加并行度處理。

    3.由于Kafka消息key設置的不合理,導致分區數據不均衡

    可以在Kafka producer處,給key加隨機后綴,使其均衡。

    kafka
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    如果是在消費端丟失數據,那么多次消費結果完全一模一樣的幾率很低。這時已經fetch的數據還沒有處理完成但已經被commit掉,因此沒有機會再次被處理,數據丟失。網絡負載很高或者磁盤很忙寫入失敗的情況下,沒有自動重試重發消息。
    Kafka消息積壓的典型場景:1.實時/消費任務掛掉比如,我們寫的實時應用因為某種原因掛掉了,并且這個任務沒有被監控程序監控發現通知相關負責人,負責人又沒有寫自動拉起任務的腳本進行重啟。此外,Kafka分區數是Kafka并行度調優的最小單元,如果Kafka分區數設置的太少,會影響Kafka consumer消費的吞吐量。
    Kafka consumer為了及時消費消息,會以Consumer Group(消費組)的形式,啟動多個consumer消費消息。不同的消費組在消費消息時彼此互不影響,同一個消費組的consumer協調在一起消費訂閱的topic所有分區消息。
    隨機讀寫會導致尋址時間延長,從而影響磁盤的讀寫速度。而Kafka在將數據持久化到磁盤時,采用只追加的順序寫,有效降低了尋址時間,提高效率。當讀操作發生時,先從PageCache中查找,如果發生缺頁才進行磁盤調度,最終返回需要的數據。對應到Kafka生產和消費消息中:producer把消息發到broker后,數據并不是直接落入磁盤的,而是先進入PageCache。
    Apache Kafka是一個開源的分布式事件流平臺,被數千家公司用于高性能數據管道、流分析、數據集成和任務關鍵型應用程序。
    分布式流平臺Kafka
    2022-08-02 10:13:27
    無論消息是否被消費,Kafka集群都會持久的保存所有發布的消息,直到過期。Kafka中采用分區的設計主要有兩個目的:第一,當日志大小超過了單臺服務器的限制,允許日志進行擴展。在Kafka中實現消費的方式是將日志中的分區劃分到每一個消費者實例上,以便在任何時間,每個實例都是分區唯一的消費者。
    為了存放這些元數據,kafka 集群會為每一個 partition 在 zk 集群上創建一個節點,partition 的數量直接決定了 zk 上的節點數。當 partition 數量變多,意味著單個 broker 節點上的 partitiion Leader 切換時間變長。
    假設Mysql中canal_test庫下有一張表policy_cred,需要統計實時統計policy_status狀態為1的mor_rate的的變化趨勢,并標注比率的風險預警等級。?本次安裝的canal版本為1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ選用Kafka。服務器系統為Centos
    Kafka只有在消息提交之后,才會將消息暴露給消費者。不論是producer端還是consumer端發往partition的請求,都通過leader數據副本所在的broker進行處理。這個過程由Kafka controller節點broker自動完成,主要是從Zookeeper讀取和修改受影響partition的一些元數據信息
    在企業實時處理架構中,通常將spark streaming和kafka集成作為整個大數據處理架構的核心環節之一。
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类