<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    Kafka分區分配策略(Partition Assignment Strategy)

    VSole2022-08-04 16:39:22

    眾所周知,Apache Kafka是基于生產者和消費者模型作為開源的分布式發布訂閱消息系統(當然,目前Kafka定位于an open-source distributed event streaming platform),由Scala和Java編寫。

    Kafka提供了類似于JMS的特性,但設計上又有很大區別,它不是JMS規范的實現,如Kafka允許多個消費者主動拉取數據,而在JMS中只有點對點模式消費者才會主動拉取數據。

    Kafka producer在向Kafka集群發送消息時,需要指定topic,Kafka根據topic對消息進行歸類(邏輯劃分),而一個topic通常會有多個partition分區,落到磁盤上就是多個partition目錄。

    Kafka consumer為了及時消費消息,會以Consumer Group(消費組)的形式,啟動多個consumer消費消息。不同的消費組在消費消息時彼此互不影響,同一個消費組的consumer協調在一起消費訂閱的topic所有分區消息。這就引申一個問題:消費組中的consumer是如何確定自己該消費哪些分區的數據的?

    Kafka提供了多種分區策略如RoundRobin(輪詢)、Range(按范圍),可通過參數partition.assignment.strategy進行配置。

    一般情況下,在topic和消費組不發生變化,Kafka會根據topic分區、消費組情況等確定分區策略,但是當發生以下情況時,會觸發Kafka的分區重分配:

    1. Consumer Group中的consumer發生了新增或者減少

    1. 同一個Consumer Group新增consumer
    2. Consumer Group訂閱的topic分區發生變化如新增分區

    2. Consumer Group訂閱的topic分區發生變化如新增分區

    本文通過下面的場景,來分別闡述Kafka主要的分配策略RoundRobin和Range:

    Range Strategy

    Range策略是針對topic而言的,在進行分區分配時,為了盡可能保證所有consumer均勻的消費分區,會對同一個topic中的partition按照序號排序,并對consumer按照字典順序排序。

    然后為每個consumer劃分固定的分區范圍,如果不夠平均分配,那么排序靠前的消費者會被多分配分區。具體就是將partition的個數除于consumer線程數來決定每個consumer線程消費幾個分區。如果除不盡,那么前面幾個消費者線程將會多分配分區。

    通過下面公式更直觀:

    假設n = 分區數 / 消費者數量,m = 分區數 % 消費者線程數量,那么前m個消費者每個分配n+1個分區,后面的(消費者線程數量 - m)個消費者每個分配n個分區。

    舉個例子:

    一個消費組CG1中有C0和C1兩個consumer,消費Kafka中的主題t1。t1的分區數為10,并且C1的num.streams為1,C2的num.streams為2。

    經過排序后,分區為:0, 1, 2, 3, 4, 5, 6, 7, 8, 9;CG1中消費者線程為C0-0、C1-0、C1-1。然后因為 10除3除不盡,那么消費者線程C0-0將會多分配分區,所以分區分配之后結果如下:

    C0-0 將消費0、1、2、3分區
    C1-0 將消費4、5、6分區
    C1-1 將消費7、8、9分區
    

    當存在有2個Kafka topic(t1和t2),它們都有有10個partition,那么最后分區結果為:

    C0-0 將消費t1主題的0、1、2、3分區以及t2主題的0、1、2、3分區
    C1-0 將消費t1主題的4、5、6分區以及t2主題的4、5、6分區
    C2-1 將消費t1主題的7、8、9分區以及t2主題的7、8、9分區
    

    如上場景,隨著topic的增多,那么針對每個topic,消費者C0-0都將多消費1個分區,topic越多比如為N個,C0-0消費的分區會比其他消費者明顯多消費N個分區。

    可以明顯的看到這樣的分配并不均勻,如果將類似的情形擴大,有可能會出現部分消費者過載的情況,這就是Range分區策略的一個很明顯的弊端。

    RoundRobin Strategy

    RoundRobin策略的工作原理:將所有topic的partition組成TopicAndPartition列表,然后對TopicAndPartition列表按照hashCode進行排序:

    val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
      info("Consumer %s rebalancing the following partitions for topic %s: %s"
           .format(ctx.consumerId, topic, partitions))
      partitions.map(partition => {
        TopicAndPartition(topic, partition)
      })
    }.toSeq.sortWith((topicPartition1, topicPartition2) => {
      /*
       * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending
       * up on one consumer (if it has a high enough stream count).
       */
      topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
    })
    

    最后按照RoundRobin風格將分區分別分配給不同的消費者。

    使用RoundRobin策略必須滿足以下條件:

    1.同一個Consumer Group里面的所有consumer的num.streams必須相等

    2.每個consumer訂閱的topic必須相同

    假設消費組CG1中有C0和C1兩個consumer的num.streams都為2。按照hashCode排序完的topic-partition組依次為t1-5, t1-3, t1-0, t1-8, t1-2, t1-1, t1-4, t1-7, t1-6, t1-9,我們的消費者排序為C0-0, C0-1, C1-0, C1-1,最后分區分配的結果為:

    C0-0將消費t1-5、t1-2、t1-6分區
    C0-1將消費t1-3、t1-1、t1-9分區
    C1-0將消費t1-0、t1-4分區
    C1-1將消費t1-8、t1-7分區
    

    多個主題的分區分配和單個主題類似,這里就不在介紹了。

    上面RoundRobin要求每個consumer訂閱的topic必須相同,當訂閱的topic不同時,那么在執行分區分配的時候就不是完全的輪詢分配,有可能會導致分區分配的不均勻。比如,某個consumer沒有訂閱消費組內的某個topic,那么在分配分區的時候,這個consumer將分配不到這個topic的分區。

    除了上述的介紹的RoundRobin和Range分配策略,Kafka還有Sticky分配策略,它主要有兩個目的:

    1. 分區的分配要盡可能的均勻
    2. 分區的分配盡可能的與上次分配的保持相同

    當兩者發生沖突時,第一個目標優先于第二個目標。鑒于這兩個目標,StickyAssignor策略的具體實現要比RangeAssignor和RoundRobinAssignor這兩種分配策略要復雜很多。

    topickafka
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    分布式流平臺Kafka
    2022-08-02 10:13:27
    無論消息是否被消費,Kafka集群都會持久的保存所有發布的消息,直到過期。Kafka中采用分區的設計主要有兩個目的:第一,當日志大小超過了單臺服務器的限制,允許日志進行擴展。在Kafka中實現消費的方式是將日志中的分區劃分到每一個消費者實例上,以便在任何時間,每個實例都是分區唯一的消費者。
    Kafka consumer為了及時消費消息,會以Consumer Group(消費組)的形式,啟動多個consumer消費消息。不同的消費組在消費消息時彼此互不影響,同一個消費組的consumer協調在一起消費訂閱的topic所有分區消息。
    Kafka只有在消息提交之后,才會將消息暴露給消費者。不論是producer端還是consumer端發往partition的請求,都通過leader數據副本所在的broker進行處理。這個過程由Kafka controller節點broker自動完成,主要是從Zookeeper讀取和修改受影響partition的一些元數據信息
    Apache Kafka是一個開源的分布式事件流平臺,被數千家公司用于高性能數據管道、流分析、數據集成和任務關鍵型應用程序。
    假設Mysql中canal_test庫下有一張表policy_cred,需要統計實時統計policy_status狀態為1的mor_rate的的變化趨勢,并標注比率的風險預警等級。?本次安裝的canal版本為1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ選用Kafka。服務器系統為Centos
    添加消息的任務我們稱為producer,而取出并使用消息的任務,我們稱之為consumer。kafka應運而生,它是專門設計用來做消息中間件的系統。這兩點也是kafka要解決的核心問題。為此,kafka提出了partition的概念。由于消息不會被刪除,因此可以等消費者明確告知kafka這條消息消費成功以后,再去更新游標。對于同一個topic,不同的消費組有各自的游標。
    GoReplay 是一款開源網絡監控工具,可以記錄您的實時流量,并將其用于shadowing、load testing、monitoring和detailed analysis。捕獲實時HTTP流量并將其重放到測試環境中,以便使用真實數據持續測試您的系統。
    安全設備在企業安全防護中起到攻擊監測告警和攻擊攔截作用,是安全日志的直接輸出者和防護策略生效者。統一日志平臺在Elastic Stack技術棧的基礎上,加入Kafka消息隊列,實現對安全設備告警日志的統一管理,系統架構如圖3所示。日志采集主要由Syslog、Beats、Kafka三部分組成,完成了從安全設備源端將告警日志采集至消息隊列的過程。
    為了存放這些元數據,kafka 集群會為每一個 partition 在 zk 集群上創建一個節點,partition 的數量直接決定了 zk 上的節點數。當 partition 數量變多,意味著單個 broker 節點上的 partitiion Leader 切換時間變長。
    Falco 由 Sysdig 于 2016 年創建,是第一個作為孵化級項目加入 CNCF 的運行時安全項目。
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类