這里介紹一下RabbitMQ重復消費的場景,以及如何解決消息重復消費的問題。
注:本文只做粗略邏輯實現借鑒,實際業務場景需根據實際情況再做細化處理。
目錄
- 消息重復消費
- MQ的一條消息被消費者消費了多次
- 重復消費場景重現測試
- 如何解決消息重復消費的問題
- 編碼
- 解決消息重復消費測試
消息重復消費
什么是消息重復消費?
首先我們來看一下消息的傳輸流程。消息生產者-->MQ-->消息消費者;消息生產者發送消息到MQ服務器,MQ服務器存儲消息,消息消費者監聽MQ的消息,發現有消息就消費消息。
所以消息重復也就出現在兩個階段:
1、生產者多發送了消息給MQ;
2、MQ的一條消息被消費者消費了多次。
第一種場景很好控制,只要保證消息生成者不重復發送消息給MQ即可。
我們著重來看一下第二個場景。
MQ的一條消息被消費者消費了多次
在保證MQ消息不重復的情況下,消費者消費消息成功后,在給MQ發送消息確認的時候出現了網絡異常(或者是服務中斷),MQ沒有接收到確認,此時MQ不會將發送的消息刪除,為了保證消息被消費,當消費者網絡穩定后,MQ就會繼續給消費者投遞之前的消息。這時候消費者就接收到了兩條一樣的消息。
重復消費場景重現測試
1、消息發送者發送1萬條消息給MQ
@GetMapping("/rabbitmq/sendToClient")
public String sendToClient() {
String message = "server message sendToClient";
for (int i = 0; i < 10000; i++) {
amqpTemplate.convertAndSend("queueName3",message+": "+i);
}
return message;
}
啟動消息發送服務,調用接口發送消息,mq成功收到1萬條消息。
2、消費者監聽消費消息
@RabbitListener(queues = "queueName3")//發送的隊列名稱 @RabbitListener注解到類和方法都可以
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("接收者2--接收到queueName3隊列的消息為:"+message);
}
啟動消費者服務,然后中斷消費服務,此時消費到了第7913個消息:

此時查看MQ的消息,現在MQ隊列中應該還有2087個消息,但還有2088個消息,說明最后一個消息被消費了沒有被MQ服務確認。

再次啟動消費者服務,消息從第7913個消息開始消費,而不是第7914個消息

如何解決消息重復消費的問題
為了保證消息不被重復消費,首先要保證每個消息是唯一的,所以可以給每一個消息攜帶一個全局唯一的id,流程如下:
- 消費者監聽到消息后獲取id,先去查詢這個id是否存中
- 如果不存在,則正常消費消息,并把消息的id存入 數據庫或者redis中(下面的編碼示例使用redis)
- 如果存在則丟棄此消息
編碼
消息生產者服務:
/**
* @Description: 發送消息 模擬消息重復消費
* 消息重復消費情景:消息生產者已把消息發送到mq,消息消費者在消息消費的過程中突然因為網絡原因或者其他原因導致消息消費中斷
* 消費者消費成功后,在給MQ確認的時候出現了網絡波動,MQ沒有接收到確認,
* 為了保證消息被消費,MQ就會繼續給消費者投遞之前的消息。這時候消費者就接收到了兩條一樣的消息
* @param:
* @return: java.lang.String
* @Author: chenping
*/
@GetMapping("/rabbitmq/sendMsgNoRepeat")
public String sendMsgNoRepeat() {
String message = "server message sendMsgNoRepeat";
for (int i = 0; i <10000 ; i++) {
Message msg = MessageBuilder.withBody((message+"--"+i).getBytes()).setMessageId(UUID.randomUUID()+"").build();
amqpTemplate.convertAndSend("queueName4",msg);
}
return message;
}
消息消費者服務:
方案1:將id存入string中(單消費者場景):
這樣一個隊列,redis數據只有一條,每次消息過來都覆蓋之前的消息,但是消費者多的情況不適用,可能會存在問題--一個消息被多個消費者消費
@RabbitListener(queues = "queueName4")//發送的隊列名稱 @RabbitListener注解到類和方法都可以
@RabbitHandler
public void receiveMessage(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");
String messageRedisValue = redisUtil.get("queueName4","");
if (messageRedisValue.equals(messageId)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);
redisUtil.set("queueName4",messageId);//以隊列為key,id為value
}
方案2:將id存入list中(多消費者場景)
這個方案可以解決多消費者的問題,但是隨著mq的消息增加,redis數據越來越多,需要去清除redis數據。
@RabbitListener(queues = "queueName4")//發送的隊列名稱 @RabbitListener注解到類和方法都可以
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");
List messageRedisValue = redisUtil.lrange("queueName4");
if (messageRedisValue.contains(messageId)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);
redisUtil.lpush("queueName4",messageId);//存入list
}
方案3:將id以key值增量存入string中并設置過期時間:
以消息id為key,消息內容為value存入string中,設置過期時間(可承受的redis服務器異常時間,比如設置過期時間為10分鐘,如果redis服務器斷了20分鐘,那么未消費的數據都會丟了)
@RabbitListener(queues = "queueName4")//發送的隊列名稱 @RabbitListener注解到類和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");
String messageRedisValue = redisUtil.get(messageId,"");
if (msg.equals(messageRedisValue)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);
redisUtil.set(messageId,msg,10L);//以id為key,消息內容為value,過期時間10分鐘
}
解決消息重復消費測試:
首先,啟動消息生成服務,發送一萬條消息:

啟動消息消費服務,然后中斷服務,消費了1934條消息:

查看未被消費的消息條數為8067條,多了一條(10000-1934=8066 ):

再次啟動消費者服務,消費者舍棄了已被消費的第1934條消息

安全俠
007bug
上官雨寶
RacentYY
ManageEngine卓豪
Andrew
Anna艷娜
Andrew
X0_0X