阿里云消息隊列 Kafka-消息檢索實踐
場景痛點介紹
Cloud Native
在消息隊列的使用過程中,由于其分布式特性難免會遇到消息丟失、消息重傳等問題。
- 例如在日志聚合場景中,通常是多個異構數據源生產數據到 Kafka 中以提供給下游的 Spark 等計算引擎消費。而當某些日志缺失時,由于消息數據的發送方式、數據結構等種類繁雜,導致難以直接從客戶端的日志來排查。
- 再例如消息轉發的過程中,消費端可能會重復消費到同樣的數據,這就需要根據內容從消息隊列中檢索數據以判斷消息是否重復生產,而消息隊列通常只能按照分區和消費位點遍歷掃描,并不能靈活的實現消息檢索。
業內現有的消息隊列產品都沒有較好的工具和方式來實現對消息內容的檢索,這將使得排查難度和投入成本大大增加。
Kafka 消息檢索組件
Cloud Native
檢索組件介紹
消息隊列 Kafka 「檢索組件」是一個全托管、高彈性、交互式的檢索組件,具備萬億級消息內容檢索的秒級響應能力,旨在解決業內消息產品不支持檢索消息內容的難題。消息隊列 Kafka 「檢索組件」是通過 Kafka Connector 將 Topic 中的消息數據轉存到表格存儲(Tablestore)中,基于表格存儲的多元索引功能提供消息檢索能力。能夠支持通過消息的分區、位點、發送的時間范圍等一個或多個條件組合檢索,還支持根據消息 Key、Value 全文檢索消息。
案例實踐
Cloud Native
案例背景
假設某運維團隊需要監控線上集群的運行情況,采集進程級別的日志導入到 Kafka 中,下游使用 Flink 消費,實時計算各進程資源消耗情況。當在 Flink 中發現某個進程的某個時間段的日志數據丟失時,需要使用消息隊列 Kafka 「檢索組件」,基于消息 Value 和時間范圍檢索消息數據,判斷日志是否已經成功推送到了消息隊列 Kafka 中。
例如采集的日志數據為 JSON 結構,某一條日志數據格式為:
key = 276value = {"PID":"276","COMMAND":"Google Chrom","CPU_USE":"7.2","TIME":"00:01:44","MEM":"8836K","STATE":"sleeping","UID":"0","IP":"164.29.0.1"}
開通消息檢索
1. 首先需要登錄到阿里云消息隊列 Kafka 控制臺中,選擇對應的 topic,開通消息檢索服務。

- 消息檢索服務開通后,將自動創建一個 Tablestore 實例,之后將消息數據轉存到 Tablestore,并創建索引提供消息檢索能力。每一個 topic 對應了 Tablestore 中的一張數據表。可以在消息隊列 Kafka 控制臺上查看每個 topic 的消息檢索組件詳情。

消息檢索實踐
- 消息檢索服務開通后,就可以使用消息中的多個搜索項檢索消息,實現上述案例。例如指定一個時間范圍,并且檢索消息 Value 中包含 PID = 276 的消息。

- 返回結果示例
能力擴展
Cloud Native
表格存儲 Tablestore 介紹
表格存儲 Tablestore 是基于底層飛天平臺構建的結構化數據存儲,能夠提供千億級規模數據存儲、毫秒級數據檢索的服務能力。消息隊列 Kafka 轉存消息到 Tablestore 后,支持通過 Tablestore 原生的數據訪問方式來檢索消息,Tablestore 支持更復雜的檢索邏輯,同時支持通過 SQL 語法檢索消息。下面列舉兩種消息檢索方式:
多元索引搜索
- 登錄到表格存儲 Tablestore 控制臺中,進入 Kafka 消息數據轉存對應的 Tablestore 實例和數據表中,在索引管理頁面選擇多元索引搜索消息。

- 例如需要檢索消息 Value 中包含 PID=276 或者 PID=277 的消息。

- 返回結果

SQL 檢索消息
- 表格存儲 Tablestore 支持基于 SQL 語法來檢索消息,首先需要在消息轉存的數據表上創建一張 SQL 映射表。

- 基于 Tablestore SQL 檢索 PID=276 的消息。

總結
Cloud Native
阿里云消息隊列 Kafka 「檢索組件」是消息隊列領域率先支持交互式消息內容檢索的組件,基于數據轉存表格存儲 Tablestore 提供消息檢索服務能力,支持根據 Key、Value、分區等任意個條件自由組合檢索消息,同時支持 Key、Value 全文檢索消息,具備免開發、免運維、高彈性的特點。同時也可以直接通過表格存儲 Tablestore 索引或者 SQL 來檢索消息,極大地提高了日常排查消息存在或正確性的速度。