<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    解析SparkStreaming和Kafka集成的兩種方式

    VSole2022-08-04 16:32:08

    spark streaming是基于微批處理的流式計算引擎,通常是利用spark core或者spark core與spark sql一起來處理數據。在企業實時處理架構中,通常將spark streaming和kafka集成作為整個大數據處理架構的核心環節之一。

    針對不同的spark、kafka版本,集成處理數據的方式分為兩種:Receiver based Approach和Direct Approach,不同集成版本處理方式的支持,可參考下圖:

    Receiver based Approach

    基于receiver的方式是使用kafka消費者高階API實現的。

    對于所有的receiver,它通過kafka接收的數據會被存儲于spark的executors上,底層是寫入BlockManager中,默認200ms生成一個block(通過配置參數spark.streaming.blockInterval決定)。然后由spark streaming提交的job構建BlockRdd,最終以spark core任務的形式運行。

    關于receiver方式,有以下幾點需要注意:

    1. receiver作為一個常駐線程調度到executor上運行,占用一個cpu
    2. receiver個數由KafkaUtils.createStream調用次數決定,一次一個receiver
    3. kafka中的topic分區并不能關聯產生在spark streaming中的rdd分區
    4. 增加在KafkaUtils.createStream()中的指定的topic分區數,僅僅增加了單個receiver消費的topic的線程數,它不會增加處理數據中的并行的spark的數量
    5. 【topicMap[topic,num_threads]map的value對應的數值是每個topic對應的消費線程數】
    6. receiver默認200ms生成一個block,建議根據數據量大小調整block生成周期
    7. receiver接收的數據會放入到BlockManager,每個executor都會有一個BlockManager實例,由于數據本地性,那些存在receiver的executor會被調度執行更多的task,就會導致某些executor比較空閑
    8. 建議通過參數spark.locality.wait調整數據本地性。該參數設置的不合理,比如設置為10而任務2s就處理結束,就會導致越來越多的任務調度到數據存在的executor上執行,導致任務執行緩慢甚至失敗(要和數據傾斜區分開
    9. 多個kafka輸入的DStreams可以使用不同的groups、topics創建,使用多個receivers接收處理數據
    10. 兩種receiver
    11. 可靠的receiver:可靠的receiver在接收到數據并通過復制機制存儲在spark中時準確的向可靠的數據源發送ack確認
    12. 不可靠的receiver:不可靠的receiver不會向數據源發送數據已接收確認。這適用于用于不支持ack的數據源
    13. 當然,我們也可以自定義receiver。
    14. receiver處理數據可靠性默認情況下,receiver是可能丟失數據的
    15. 可以通過設置spark.streaming.receiver.writeAheadLog.enable為true開啟預寫日志機制,將數據先寫入一個可靠地分布式文件系統如hdfs,確保數據不丟失,但會失去一定性能
    16. 限制消費者消費的最大速率
    17. 涉及三個參數:
    18. spark.streaming.backpressure.enabled:默認是false,設置為true,就開啟了背壓機制
    19. spark.streaming.backpressure.initialRate:默認沒設置初始消費速率,第一次啟動時每個receiver接收數據的最大值
    20. spark.streaming.receiver.maxRate:默認值沒設置,每個receiver接收數據的最大速率(每秒記錄數)。每個流每秒最多將消費此數量的記錄,將此配置設置為0或負數將不會對最大速率進行限制
    21. 在產生job時,會將當前job有效范圍內的所有block組成一個BlockRDD,一個block對應一個分區
    22. kafka082版本消費者高階API中,有分組的概念,建議使消費者組內的線程數(消費者個數)和kafka分區數保持一致。如果多于分區數,會有部分消費者處于空閑狀態

    Direct Approach

    direct approach是spark streaming不使用receiver集成kafka的方式,一般在企業生產環境中使用較多。相較于receiver,有以下特點:

    1. 不使用receiver
    2. 不需要創建多個kafka streams并聚合它們
    3. 減少不必要的CPU占用
    4. 減少了receiver接收數據寫入BlockManager,然后運行時再通過blockId、網絡傳輸、磁盤讀取等來獲取數據的整個過程,提升了效率
    5. 無需wal,進一步減少磁盤IO操作
    6. direct方式生的rdd是KafkaRDD,它的分區數與kafka分區數保持一致一樣多的rdd分區來消費,更方便我們對并行度進行控制
    7. 注意:在shuffle或者repartition操作后生成的rdd,這種對應關系會失效
    8. 可以手動維護offset,實現exactly once語義
    9. 數據本地性問題。在KafkaRDD在compute函數中,使用SimpleConsumer根據指定的topic、分區、offset去讀取kafka數據。
    10. 但在010版本后,又存在假如kafka和spark處于同一集群存在數據本地性的問題
    11. 限制消費者消費的最大速率
    12. spark.streaming.kafka.maxRatePerPartition:從每個kafka分區讀取數據的最大速率(每秒記錄數)。這是針對每個分區進行限速,需要事先知道kafka分區數,來評估系統的吞吐量
    kafkaspark
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    在企業實時處理架構中,通常將spark streaming和kafka集成作為整個大數據處理架構的核心環節之一。
    假設Mysql中canal_test庫下有一張表policy_cred,需要統計實時統計policy_status狀態為1的mor_rate的的變化趨勢,并標注比率的風險預警等級。?本次安裝的canal版本為1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ選用Kafka。服務器系統為Centos
    本文主要介紹,SparkStreaming和Kafka使用Direct Approach方式處理任務時,如何自己管理offset?在調用該方法時,會先創建KafkaCluster:val kc = new KafkaCluster. | is a list of one or more kafka topics to consume from
    Log4Shell目前依然是一個廣泛且嚴重的安全威脅。4月26日,安全公司Rezilion發布了一份最新研究報告,顯示截至4月底,仍然有40%的易受攻擊的Log4j版本被運用。Rezilion 認為導致目前較差的更新狀況原因較為復雜,但包括了缺乏適當的漏洞管理流程,以及漏洞的可見性差等因素。因此,專家建議用戶掃描并確認系統環境,找到正在使用的版本,如果是過時或易受攻擊的Log4j版本,務必盡快制定升級計劃。
    前言Scala是以JVM為運行環境的面向對象的函數式編程語言,它可以直接訪問Java類庫并且與Java框架進行交互操作。正如之前所介紹,Spark是用Scala語言編寫的,Kafka server端也是,那么深入學習Scala對掌握SparkKafka是必備掌握技能。
    Kafka消息積壓的典型場景:1.實時/消費任務掛掉比如,我們寫的實時應用因為某種原因掛掉了,并且這個任務沒有被監控程序監控發現通知相關負責人,負責人又沒有寫自動拉起任務的腳本進行重啟。此外,Kafka分區數是Kafka并行度調優的最小單元,如果Kafka分區數設置的太少,會影響Kafka consumer消費的吞吐量。
    場景痛點介紹Cloud Native在消息隊列的使用過程中,由于其分布式特性難免會遇到消息丟失、消息重傳等問題。
    此外,PyDeequ 可以與 Pandas DataFrames 進行流暢的接口,而不是在 Apache Spark DataFrames 內進行限制。Deequ 負責導出要對數據進行計算的所需指標集。Deequ 生成數據質量報告,其中包含約束驗證的結果。包裝器將命令轉換為底層 Deequ 調用并返回它們的響應。
    近日,綠盟科技聯合廣州大學網絡空間先進技術研究院發布2021年《APT組織情報研究年鑒》(下文簡稱“年鑒”)。 年鑒借助網絡空間威脅建模知識圖譜和大數據復合語義追蹤技術,對全球372個APT組織知識進行了知識圖譜歸因建檔,形成APT組織檔案館,并對APT組織活動進行大數據追蹤,從而對本年度新增和活躍的攻擊組織的攻擊活動態勢進行分析。目前年鑒所涉及的相關情報和技術已經應用在綠盟科技的威脅情報平臺(
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类