<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    SparkStreaming和Kafka基于Direct Approach如何管理offset

    VSole2022-08-03 16:33:42

    本文主要介紹,SparkStreaming和Kafka使用Direct Approach方式處理任務時,如何自己管理offset?

    SparkStreaming通過Direct Approach接收數據的入口:

    KafkaUtils.createDirectStream。在調用該方法時,會先創建

    KafkaCluster:val kc = new KafkaCluster(kafkaParams)
    

    KafkaCluster負責和Kafka,該類會獲取Kafka的分區信息、創建DirectKafkaInputDStream,每個DirectKafkaInputDStream對應一個topic,每個DirectKafkaInputDStream也會持有一個KafkaCluster實例。

    到了計算周期后,會調用DirectKafkaInputDStream的compute方法,執行以下操作:

    1. 獲取對應Kafka Partition的untilOffset,以確定需要獲取數據的區間
    2. 構建KafkaRDD實例。每個計算周期里,DirectKafkaInputDStream和KafkaRDD是一一對應的
    3. 將相關的offset信息報給InputInfoTracker
    4. 返回該RDD

    關于KafkaRDD和Kafka的分區對應關系,可以參考這篇文章

    《重要 | Spark分區并行度決定機制》

    SparkStreaming和Kafka通過Direct方式集成,自己管理offsets代碼實踐:

    1. 業務邏輯處理

    object SparkStreamingKafkaDirect {
      def main(args: Array[String]) {
        if (args.length < 3) {
          System.err.println(
            s"""
               |Usage: SparkStreamingKafkaDirect <brokers> <topics> <groupid>
               |  <brokers> is a list of one or more Kafka brokers
               |  <topics> is a list of one or more kafka topics to consume from
               |  <groupid> is a consume group
               |
            """.stripMargin)
          System.exit(1)
        }
        val Array(brokers, topics, groupId) = args
        val sparkConf = new SparkConf().setAppName("DirectKafka")
        sparkConf.setMaster("local[*]")
        sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val ssc = new StreamingContext(sparkConf, Seconds(6))
        val topicsSet = topics.split(",").toSet
        val kafkaParams = Map[String, String](
          "metadata.broker.list" -> brokers,
          "group.id" -> groupId,
          "auto.offset.reset" -> "smallest"
        )
        val km = new KafkaManager(kafkaParams)
        val streams = km.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
       streams.foreachRDD(rdd => {
          if (!rdd.isEmpty()) {
            // 先處理消息
            do something...
            // 再更新offsets
            km.updateZKOffsets(rdd)
          }
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    2. offset管理核心邏輯

    2.1 利用zookeeper

    注意:自定義的KafkaManager必須在包org.apache.spark.streaming.kafka下

    package org.apache.spark.streaming.kafka
    /**
    * @Author: 微信公眾號-大數據學習與分享
    *  Spark-Streaming和Kafka直連方式:自己管理offsets
    */
    class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
      private val kc = new KafkaCluster(kafkaParams)
      def createDirectStream[
      K: ClassTag,
      V: ClassTag,
      KD <: Decoder[K] : ClassTag,
      VD <: Decoder[V] : ClassTag](ssc: StreamingContext,
                                   kafkaParams: Map[String, String],
                                   topics: Set[String]): InputDStream[(K, V)] = {
        val groupId = kafkaParams.get("group.id").get
        //從zookeeper上讀取offset前先根據實際情況更新offset
        setOrUpdateOffsets(topics, groupId)
        //從zookeeper上讀取offset開始消費message
        val messages = {
          //獲取分區      //Either處理異常的類,通常Left表示異常,Right表示正常
          val partitionsE: Either[Err, Set[TopicAndPartition]] = kc.getPartitions(topics)
          if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}")
          val partitions = partitionsE.right.get
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
          if (consumerOffsetsE.isLeft) throw new SparkException(s"get kafka consumer offsets failed:${consumerOffsetsE.left.get}")
          val consumerOffsets = consumerOffsetsE.right.get
          KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
        }
        messages
      }
      /** 創建數據流之前,根據實際情況更新消費offsets */
      def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
        topics.foreach { topic =>
          var hasConsumed = true
          //獲取每一個topic分區
          val partitionsE = kc.getPartitions(Set(topic))
          if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}")
          //正常獲取分區結果
          val partitions = partitionsE.right.get
          //獲取消費偏移量
          val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
          if (consumerOffsetsE.isLeft) hasConsumed = false
          if (hasConsumed) {
            val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
            if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
            val earliestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earliestLeaderOffsetsE.right.get
            val consumerOffsets: Map[TopicAndPartition, Long] = consumerOffsetsE.right.get
            var offsets: mutable.HashMap[TopicAndPartition, Long] = mutable.HashMap[TopicAndPartition, Long]()
            consumerOffsets.foreach { case (tp, n) =>
              val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
              //offsets += (tp -> n)
              if (n < earliestLeaderOffset) {
                println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + "offsets已過時,更新為:" + earliestLeaderOffset)
                offsets += (tp -> earliestLeaderOffset)
              }
              println(n, earliestLeaderOffset, kc.getLatestLeaderOffsets(partitions).right)
            }
            println("map...." + offsets)
            if (offsets.nonEmpty) kc.setConsumerOffsets(groupId, offsets.toMap)
            //        val cs = consumerOffsetsE.right.get
            //        val lastest = kc.getLatestLeaderOffsets(partitions).right.get
            //        val earliest = kc.getEarliestLeaderOffsets(partitions).right.get
            //        var newCS: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
            //        cs.foreach { f =>
            //          val max = lastest.get(f._1).get.offset
            //          val min = earliest.get(f._1).get.offset
            //          newCS += (f._1 -> f._2)
            //          //如果zookeeper中記錄的offset在kafka中不存在(已過期)就指定其現有kafka的最小offset位置開始消費
            //          if (f._2 < min) {
            //            newCS += (f._1 -> min)
            //          }
            //          println(max + "-----" + f._2 + "--------" + min)
            //        }
            //        if (newCS.nonEmpty) kc.setConsumerOffsets(groupId, newCS)
          } else {
            println("沒有消費過....")
            val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
            val leaderOffsets: Map[TopicAndPartition, LeaderOffset] = if (reset == Some("smallest")) {
              val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
              if (leaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
              leaderOffsetsE.right.get
            } else {
              //largest
              val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
              if (leaderOffsetsE.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
              leaderOffsetsE.right.get
            }
            val offsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) }
            kc.setConsumerOffsets(groupId, offsets)
            /*
            val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        val result = for {
          topicPartitions <- kc.getPartitions(topics).right
          leaderOffsets <- (if (reset == Some("smallest")) {
            kc.getEarliestLeaderOffsets(topicPartitions)
          } else {
            kc.getLatestLeaderOffsets(topicPartitions)
          }).right
        } yield {
          leaderOffsets.map { case (tp, lo) =>
              (tp, lo.offset)
          }
        }
            */
          }
        }
      }
      /** 更新zookeeper上的消費offsets */
      def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
        val groupId = kafkaParams("group.id")
        val offsetList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetList.foreach { offset =>
          val topicAndPartition = TopicAndPartition(offset.topic, offset.partition)
          val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offset.untilOffset)))
          if (o.isLeft) println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
        }
      }
    }
    

    2.2 不利用zookeeper

    /**
      * @author 大數據學習與分享
      * Spark Streaming和Kafka082通過mysql維護offset
      */
    object SaveOffset2Mysql {
      def getLastOffsets(database: String, sql: String, jdbcOptions:Map[String,String]): HashMap[TopicAndPartition, Long] = {
        val getConnection: () => Connection = JdbcUtils.createConnectionFactory(new JDBCOptions(jdbcOptions))
        val conn = getConnection()
        val pst = conn.prepareStatement(sql)
        val res = pst.executeQuery()
    var map: HashMap[TopicAndPartition, Long] = HashMap()
    while (res.next()) {
          val o = res.getString(1)
          val jSONArray = JSONArray.fromObject(o)
          jSONArray.toArray.foreach { offset =>
            val json = JSONObject.fromObject(offset)
            val topicAndPartition = TopicAndPartition(json.getString("topic"), json.getInt("partition"))
            map += topicAndPartition -> json.getLong("untilOffset")
          }
        }
        pst.close()
        conn.close()
        map
      }
      def offsetRanges2Json(offsetRanges: Array[OffsetRange]): JSONArray = {
        val jSONArray = new JSONArray
        offsetRanges.foreach { offsetRange =>
          val jSONObject = new JSONObject()
          jSONObject.accumulate("topic", offsetRange.topic)
          jSONObject.accumulate("partition", offsetRange.partition)
          jSONObject.accumulate("fromOffset", offsetRange.fromOffset)
          jSONObject.accumulate("untilOffset", offsetRange.untilOffset)
          jSONArray.add(jSONObject)
        }
        jSONArray
      }
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("test").setMaster("local[*]")
        val ssc = new StreamingContext(conf, Seconds(5))
        val kafkaParams = Map("metadata.broker.list" -> SystemProperties.BROKERS,
    "zookeeper.connect" -> SystemProperties.ZK_SERVERS,
    "zookeeper.connection.timeout.ms" -> "10000")
        val topics = Set("pv")
        val tpMap = getLastOffsets("test", "select offset from res where id = (select max(id) from res)")
    var messages: InputDStream[(String, String)] = null
    if (tpMap.nonEmpty) {
          messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
            ssc, kafkaParams, tpMap, (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))
        } else {
          kafkaParams + ("auto.offset.reset" -> "largest")
          messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }
    //    var oRanges = Array[OffsetRange]()
    //    messages.transform { rdd =>
    //      oRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //      rdd
    //    }.foreachRDD { rdd =>
    //      val offset = offsetRanges2Json(oRanges).toString
    //    }
        messages.foreachRDD { rdd =>
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.map(_._2).flatMap(_.split(" ")).map((_, 1L)).reduceByKey(_ + _).repartition(1)
           .foreachPartition { par =>
    if (par.nonEmpty) {
               val conn = MysqlUtil.getConnection("test")
               conn.setAutoCommit(false)
               val pst = conn.prepareStatement("INSERT INTO res (word,count,offset,time) VALUES (?,?,?,?)")
               par.foreach { case (word, count) =>
                 pst.setString(1, word)
                 pst.setLong(2, count)
                 pst.setString(3, offset)
                 pst.setTimestamp(4, new Timestamp(System.currentTimeMillis()))
                 pst.addBatch()
               }
               pst.executeBatch()
               conn.commit()
               pst.close()
               conn.close()
             }
           }
        }
        ssc.start()
        ssc.awaitTermination()
      }
    }
    // Spark Streaming和Kafka010整合維護offset
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> SystemProperties.BROKERS,
    "key.deserializer" -> classOf[StringDeserializer],
    "key.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "g1",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean))
    val messages = KafkaUtils.createDirectStream[String, String](ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe(topicSet, kafkaParams, getLastOffsets(kafkaParams, topicSet)))
        messages.foreachRDD { rdd =>
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
            println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
            iter.foreach { each =>
              s"Do Something with $each"
            }
          }
          messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        }
       
     def getLastOffsets(kafkaParams: Map[String, Object], topicSet: Set[String]): Map[TopicPartition, Long] = {
    val props = new Properties()
        props.putAll(kafkaParams.asJava)
        val consumer = new KafkaConsumer[String, String](props)
        consumer.subscribe(topicSet.asJavaCollection)
        paranoidPoll(consumer)
    val consumerAssign = consumer.assignment().asScala.map(tp => tp -> consumer.position(tp)).toMap
        consumer.close()
        consumerAssign
      }
    /** 思考: 消息已消費但提交offsets失敗時的offsets矯正? */
      def paranoidPoll(consumer: KafkaConsumer[String, String]): Unit = {
        val msg = consumer.poll(Duration.ZERO)
    if (!msg.isEmpty) {
    // position should be minimum offset per topic partition
    //      val x: ((Map[TopicPartition, Long], ConsumerRecord[String, String]) => Map[TopicPartition, Long]) => Map[TopicPartition, Long] = msg.asScala.foldLeft(Map[TopicPartition, Long]())
          msg.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
    val tp = new TopicPartition(m.topic(), m.partition())
    val off = acc.get(tp).map(o => Math.min(o, m.offset())).getOrElse(m.offset())
            acc + (tp -> off)
          }.foreach { case (tp, off) =>
            consumer.seek(tp, off)
          }
        }
      }
    

    上述給出一個demo思路。實際生產中,還要結合具體的業務場景,根據不同情況做特殊處理。

    offsetkafka
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    本文主要介紹,SparkStreaming和Kafka使用Direct Approach方式處理任務時,如何自己管理offset?在調用該方法時,會先創建KafkaCluster:val kc = new KafkaCluster. | is a list of one or more kafka topics to consume from
    在企業實時處理架構中,通常將spark streaming和kafka集成作為整個大數據處理架構的核心環節之一。
    分布式流平臺Kafka
    2022-08-02 10:13:27
    無論消息是否被消費,Kafka集群都會持久的保存所有發布的消息,直到過期。Kafka中采用分區的設計主要有兩個目的:第一,當日志大小超過了單臺服務器的限制,允許日志進行擴展。在Kafka中實現消費的方式是將日志中的分區劃分到每一個消費者實例上,以便在任何時間,每個實例都是分區唯一的消費者。
    如果是在消費端丟失數據,那么多次消費結果完全一模一樣的幾率很低。這時已經fetch的數據還沒有處理完成但已經被commit掉,因此沒有機會再次被處理,數據丟失。網絡負載很高或者磁盤很忙寫入失敗的情況下,沒有自動重試重發消息。
    Kafka消息積壓的典型場景:1.實時/消費任務掛掉比如,我們寫的實時應用因為某種原因掛掉了,并且這個任務沒有被監控程序監控發現通知相關負責人,負責人又沒有寫自動拉起任務的腳本進行重啟。此外,Kafka分區數是Kafka并行度調優的最小單元,如果Kafka分區數設置的太少,會影響Kafka consumer消費的吞吐量。
    隨機讀寫會導致尋址時間延長,從而影響磁盤的讀寫速度。而Kafka在將數據持久化到磁盤時,采用只追加的順序寫,有效降低了尋址時間,提高效率。當讀操作發生時,先從PageCache中查找,如果發生缺頁才進行磁盤調度,最終返回需要的數據。對應到Kafka生產和消費消息中:producer把消息發到broker后,數據并不是直接落入磁盤的,而是先進入PageCache。
    假設Mysql中canal_test庫下有一張表policy_cred,需要統計實時統計policy_status狀態為1的mor_rate的的變化趨勢,并標注比率的風險預警等級。?本次安裝的canal版本為1.1.2,Canal版本最后在1.1.1之后。server端采用MQ模式,MQ選用Kafka。服務器系統為Centos
    添加消息的任務我們稱為producer,而取出并使用消息的任務,我們稱之為consumer。kafka應運而生,它是專門設計用來做消息中間件的系統。這兩點也是kafka要解決的核心問題。為此,kafka提出了partition的概念。由于消息不會被刪除,因此可以等消費者明確告知kafka這條消息消費成功以后,再去更新游標。對于同一個topic,不同的消費組有各自的游標。
    之前,針對以下我們調研目前的開源隊列方案:beanstalkdbeanstalkd?消費者,通過 reserve/release/bury/delete 來獲取 job 或改變 job 的狀態;很幸運的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。但是這對不熟悉 beanstalkd 操作的 go 開發者而言,需要學習成本
    目前業界常見的延時消息方案
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类