<menu id="guoca"></menu>
<nav id="guoca"></nav><xmp id="guoca">
  • <xmp id="guoca">
  • <nav id="guoca"><code id="guoca"></code></nav>
  • <nav id="guoca"><code id="guoca"></code></nav>

    分布式任務 + 消息隊列框架 go-queue

    VSole2022-08-04 11:11:15

    為什么要寫這個庫?

    在開始自研 go-queue 之前,針對以下我們調研目前的開源隊列方案:

    beanstalkd

    beanstalkd 有一些特殊好用功能:支持任務 priority、延時 (delay)、超時重發 (time-to-run) 和預留 (buried),能夠很好的支持分布式的后臺任務和定時任務處理。如下是 beanstalkd 基本部分:

    • job:任務單元;
    • tube:任務隊列,存儲統一類型 job。producer 和 consumer 操作對象;
    • producerjob 生產者,通過 put 將 job 加入一個 tube;
    • consumerjob 消費者,通過 reserve/release/bury/delete 來獲取 job 或改變 job 的狀態;

    很幸運的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。

    但是這對不熟悉 beanstalkd 操作的 go 開發者而言,需要學習成本。

    kafka

    類似基于 kafka 消息隊列作為存儲的方案,存儲單元是消息,如果要實現延時執行,可以想到的方案是以延時執行的時間作為 topic,這樣在大型的消息系統中,充斥大量一次性的 topicdq_1616324404788, dq_1616324417622),當時間分散,會容易造成磁盤隨機寫的情況。

    而且在 go 生態中,

    同時考慮以下因素:

    • 支持延時任務
    • 高可用,保證數據不丟失
    • 可擴展資源和性能

    所以我們自己基于以上兩個基礎組件開發了 go-queue

    1. 基于 beanstalkd 開發了 dq,支持定時和延時操作。同時加入 redis 保證消費唯一性。
    2. 基于 kafka 開發了 kq,簡化生產者和消費者的開發 API,同時在寫入 kafka 使用批量寫,節省 IO。

    整體設計如下:

    應用場景

    首先在消費場景來說,一個是針對任務隊列,一個是消息隊列。而兩者最大的區別:

    • 任務是沒有順序約束;消息需要;
    • 任務在加入中,或者是等待中,可能存在狀態更新(或是取消);消息則是單一的存儲即可;

    所以在背后的基礎設施選型上,也是基于這種消費場景。

    • dq:依賴于beanstalkd ,適合延時、定時任務執行;
    • kq:依賴于 kafka ,適用于異步、批量任務執行;

    而從其中 dq的 API 中也可以看出:

    // 延遲任務執行
    - dq.Delay(msg, delayTime);
    // 定時任務執行
    - dq.At(msg, atTime);
    

    而在我們內部:

    • 如果是 異步消息消費/推送 ,則會選擇使用kqkq.Push(msg)
    • 如果是 15 分鐘提醒/ 明天中午發送短信 等,則使用 dq

    如何使用

    分別介紹dq和 kq的使用方式:

    dq

    // [Producer]
    producer := dq.NewProducer([]dq.Beanstalk{
        {
            Endpoint: "localhost:11300",
            Tube:     "tube",
        },
        {
            Endpoint: "localhost:11301",
            Tube:     "tube",
        },
    })  
    for i := 1000; i < 1005; i++ {
        _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
        if err != nil {
            fmt.Println(err)
        }
    }
    // [Consumer]
    consumer := dq.NewConsumer(dq.DqConf{
      Beanstalks: []dq.Beanstalk{
        {
          Endpoint: "localhost:11300",
          Tube:     "tube",
        },
        {
          Endpoint: "localhost:11301",
          Tube:     "tube",
        },
      },
      Redis: redis.RedisConf{
        Host: "localhost:6379",
        Type: redis.NodeType,
      },
    })
    consumer.Consume(func(body []byte) {
      // your consume logic
      fmt.Println(string(body))
    })
    

    和普通的 生產者 - 消費者 模型類似,開發者也只需要關注以下:

    1. 開發者只需要關注自己的任務類型「延時/定時」
    2. 消費端的消費邏輯

    kq

    producer.go:

    // message structure
    type message struct {
        Key     string `json:"key"`
        Value   string `json:"value"`
        Payload string `json:"message"`
    }
    pusher := kq.NewPusher([]string{
        "127.0.0.1:19092",
        "127.0.0.1:19093",
        "127.0.0.1:19094",
    }, "kq")
    ticker := time.NewTicker(time.Millisecond)
    for round := 0; round < 3; round++ {
        select {
        case <-ticker.C:
            count := rand.Intn(100)
        // 準備消息
            m := message{
                Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
                Value:   fmt.Sprintf("%d,%d", round, count),
                Payload: fmt.Sprintf("%d,%d", round, count),
            }
            body, err := json.Marshal(m)
            if err != nil {
                log.Fatal(err)
            }
            fmt.Println(string(body))
        // push to kafka broker
            if err := pusher.Push(string(body)); err != nil {
                log.Fatal(err)
            }
        }
    }
    

    config.yaml

    Name: kq
    Brokers:
      - 127.0.0.1:19092
      - 127.0.0.1:19092
      - 127.0.0.1:19092
    Group: adhoc
    Topic: kq
    Offset: first
    Consumers: 1
    

    consumer.go:

    var c kq.KqConf
    conf.MustLoad("config.yaml", &c)
    // WithHandle: 具體的處理msg的logic
    // 這也是開發者需要根據自己的業務定制化
    q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
      fmt.Printf("=> %s\n", v)
      return nil
    }))
    defer q.Stop()
    q.Start()
    

    和 dq 不同的是:開發者不需要關注任務類型(在這里也沒有任務的概念,傳遞的都是 message data)。

    其他操作和 dq 類似,只是將 業務處理函數 當成配置直接傳入消費者中。

    總結 

    在我們目前的場景中,kq 大量使用在我們的異步消息服務;而延時任務,我們除了 dq,還可以使用內存版的 TimingWheel「go-zero 生態組件」。

    關于 go-queue 更多的設計和實現文章,可以持續關注我們。歡迎大家去關注和使用。

    消息隊列
    本作品采用《CC 協議》,轉載必須注明作者和本文鏈接
    之前,針對以下我們調研目前的開源隊列方案:beanstalkdbeanstalkd?消費者,通過 reserve/release/bury/delete 來獲取 job 或改變 job 的狀態;很幸運的是官方提供了 go client:https://github.com/beanstalkd/go-beanstalk。但是這對不熟悉 beanstalkd 操作的 go 開發者而言,需要學習成本
    場景痛點介紹Cloud Native在消息隊列的使用過程中,由于其分布式特性難免會遇到消息丟失、消息重傳等問題。
    添加消息的任務我們稱為producer,而取出并使用消息的任務,我們稱之為consumer。kafka應運而生,它是專門設計用來做消息中間件的系統。這兩點也是kafka要解決的核心問題。為此,kafka提出了partition的概念。由于消息不會被刪除,因此可以等消費者明確告知kafka這條消息消費成功以后,再去更新游標。對于同一個topic,不同的消費組有各自的游標。
    死信:無法被消費的消息,稱為死信。 如果死信一直留在隊列中,會導致一直被消費,卻從不消費成功。 所以我們專門開辟了一個來存放死信的隊列,叫死信隊列(DLX,dead-letter-exchange)。 死信的幾種來源: 消息 TTL 過期(time to live,存活時間,可以用在限時支付消息隊列達到最大長度(隊列滿了,無法路由到該隊列消息被拒絕( basic.reject / b
    windows消息機制詳解
    2022-06-10 16:15:55
    要想深入理解windows,消息機制的知識是必不可少的。
    安全研究人員和專家警告稱,Windows 消息隊列 中間件服務中存在一個高危漏洞 CVE-2023-21554。Windows 消息隊列 在所有Windows版本里都可用,主要用于為應用程序提供“消息傳遞保證”網絡功能、啟動 PowerShell 或控制面板。
    代表的a的二進制位的修改。對應的ASCII碼是97,轉換為二進制數據是01100001. 因為bit非常節省空間,可以用來做大數據量的統計。BITOPNOTdestkeykey ,對給定 key 求邏輯非,并將結果保存到 destkey 。獲取今天點擊最多的15條:zrevrange hotNews:20190926 0 15 withscores
    分布式流平臺Kafka
    2022-08-02 10:13:27
    無論消息是否被消費,Kafka集群都會持久的保存所有發布的消息,直到過期。Kafka中采用分區的設計主要有兩個目的:第一,當日志大小超過了單臺服務器的限制,允許日志進行擴展。在Kafka中實現消費的方式是將日志中的分區劃分到每一個消費者實例上,以便在任何時間,每個實例都是分區唯一的消費者。
    在Windows大部分應用都是基于消息機制,他們都擁有一個消息過程函數,根據不同消息完成不同功能,windows通過鉤子機制來截獲和監視系統中的這些消息。一般鉤子分局部鉤子與全局鉤子,局部鉤子一般用于某個線程,而全局鉤子一般通過dll文件實現相應的鉤子函數。
    全局鉤子注入在Windows大部分應用都是基于消息機制,他們都擁有一個消息過程函數,根據不同消息完成不同功能,windows通過鉤子機制來截獲和監視系統中的這些消息。一般鉤子分局部鉤子與全局鉤子,局部鉤子一般用于某個線程,而全局鉤子一般通過dll文件實現相應的鉤子函數。
    VSole
    網絡安全專家
      亚洲 欧美 自拍 唯美 另类