亚洲精品无码乱码成人|最近中文字幕免费大全|日韩欧美卡一卡二卡新区|熟妇性饥渴一区二区三区|久久久久无码精品国产AV|欧美日韩国产va在线观看|久久精品一本到99热动态图|99国产精品欧美一区二区三区

您現(xiàn)在的位置是: 汽車 > > 正文

RabbitMQ快速使用代碼手冊(cè)

時(shí)間:2023-06-16 19:50:45 來源:博客園 發(fā)布者:DN032

本篇博客的內(nèi)容為RabbitMQ在開發(fā)過程中的快速上手使用,側(cè)重于代碼部分,幾乎沒有相關(guān)概念的介紹,相關(guān)概念請(qǐng)參考以下csdn博客,兩篇都是我找的精華帖,供大家學(xué)習(xí)。本篇博客也持續(xù)更新~~~內(nèi)容代碼部分由于word轉(zhuǎn)md格式有些問題,可以直接查看我的有道云筆記,鏈接:https://note.youdao.com/s/Ab7Cjiu

參考文檔

csdn博客:


(相關(guān)資料圖)

基礎(chǔ)部分:https://blog.csdn.net/qq_35387940/article/details/100514134

高級(jí)部分:https://blog.csdn.net/weixin_49076273/article/details/124991012

application.yml
server:port: 8021spring:#給項(xiàng)目來個(gè)名字application:name: rabbitmq-provider#配置rabbitMq 服務(wù)器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虛擬host 可以不設(shè)置,使用server默認(rèn)hostvirtual-host: JCcccHost#確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)#publisher-confirms: truepublisher-confirm-type: correlated#確認(rèn)消息已發(fā)送到隊(duì)列(Queue)publisher-returns: true

完善更多信息

spring:rabbitmq:host: localhostport: 5672virtual-host: /username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: trueretry:#發(fā)布重試,默認(rèn)falseenabled: true#重試時(shí)間 默認(rèn)1000msinitial-interval: 1000#重試最大次數(shù) 最大3max-attempts: 3#重試最大間隔時(shí)間max-interval: 10000#重試的時(shí)間隔乘數(shù),比如配2,0第一次等于10s,第二次等于20s,第三次等于40smultiplier: 1listener:\# 默認(rèn)配置是simpletype: simplesimple:\# 手動(dòng)ack Acknowledge mode of container. auto noneacknowledge-mode: manual#消費(fèi)者調(diào)用程序線程的最小數(shù)量concurrency: 10#消費(fèi)者最大數(shù)量max-concurrency: 10#限制消費(fèi)者每次只處理一條信息,處理完在繼續(xù)下一條prefetch: 1#啟動(dòng)時(shí)是否默認(rèn)啟動(dòng)容器auto-startup: true#被拒絕時(shí)重新進(jìn)入隊(duì)列default-requeue-rejected: true
相關(guān)注解說明

@RabbitListener 注解是指定某方法作為消息消費(fèi)的方法,例如監(jiān)聽某 Queue里面的消息。

@RabbitListener標(biāo)注在方法上,直接監(jiān)聽指定的隊(duì)列,此時(shí)接收的參數(shù)需要與發(fā)送市類型一致。

\@Componentpublic class PointConsumer {//監(jiān)聽的隊(duì)列名\@RabbitListener(queues = \"point.to.point\")public void processOne(String name) {System.out.println(\"point.to.point:\" + name);}}

@RabbitListener 可以標(biāo)注在類上面,需配合 @RabbitHandler 注解一起使用

@RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候,就交給@RabbitHandler 的方法處理,根據(jù)接受的參數(shù)類型進(jìn)入具體的方法中。

\@Component\@RabbitListener(queues = \"consumer_queue\")public class Receiver {\@RabbitHandlerpublic void processMessage1(String message) {System.out.println(message);}\@RabbitHandlerpublic void processMessage2(byte\[\] message) {System.out.println(new String(message));}}

@Payload

可以獲取消息中的 body 信息

\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body) {System.out.println(\"body:\"+body);}

@Header,@Headers

可以獲得消息中的 headers 信息

\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body, \@Header String token){System.out.println(\"body:\"+body);System.out.println(\"token:\"+token);}\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body, \@HeadersMap\ headers) {System.out.println(\"body:\"+body);System.out.println(\"Headers:\"+headers);}
快速使用配置xml文件
\org.springframework.boot\\spring-boot-starter-amqp\\
配置exchange、queue注解快速創(chuàng)建版本
\@Configurationpublic class RabbitmqConfig {//創(chuàng)建交換機(jī)//通過ExchangeBuilder能創(chuàng)建direct、topic、Fanout類型的交換機(jī)\@Bean(\"bootExchange\")public Exchange bootExchange() {returnExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();}//創(chuàng)建隊(duì)列\(zhòng)@Bean(\"bootQueue\")public Queue bootQueue() {return QueueBuilder.durable(\"zx_queue\").build();}/\*\*\* 將隊(duì)列與交換機(jī)綁定\*\* \@param queue\* \@param exchange\* \@return\*/\@Beanpublic Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,\@Qualifier(\"bootExchange\") Exchange exchange) {returnBindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();}}
Direct
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class DirectRabbitConfig {//隊(duì)列 起名:TestDirectQueue\@Beanpublic Queue TestDirectQueue() {//durable:是否持久化,默認(rèn)是false,持久化隊(duì)列:會(huì)被存儲(chǔ)在磁盤上,當(dāng)消息代理重啟時(shí)仍然存在,暫存隊(duì)列:當(dāng)前連接有效//exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級(jí)高于durable//autoDelete:是否自動(dòng)刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除。// return new Queue(\"TestDirectQueue\",true,true,false);//一般設(shè)置一下隊(duì)列的持久化就好,其余兩個(gè)就是默認(rèn)falsereturn new Queue(\"TestDirectQueue\",true);}//Direct交換機(jī) 起名:TestDirectExchange\@BeanDirectExchange TestDirectExchange() {// return new DirectExchange(\"TestDirectExchange\",true,true);return new DirectExchange(\"TestDirectExchange\",true,false);}//綁定 將隊(duì)列和交換機(jī)綁定, 并設(shè)置用于匹配鍵:TestDirectRouting\@BeanBinding bindingDirect() {returnBindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");}\@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange(\"lonelyDirectExchange\");}}
Fanout
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class FanoutRabbitConfig {/\*\*\* 創(chuàng)建三個(gè)隊(duì)列 :fanout.A fanout.B fanout.C\* 將三個(gè)隊(duì)列都綁定在交換機(jī) fanoutExchange 上\* 因?yàn)槭巧刃徒粨Q機(jī), 路由鍵無需配置,配置也不起作用\*/\@Beanpublic Queue queueA() {return new Queue(\"fanout.A\");}\@Beanpublic Queue queueB() {return new Queue(\"fanout.B\");}\@Beanpublic Queue queueC() {return new Queue(\"fanout.C\");}\@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(\"fanoutExchange\");}\@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}\@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}\@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
Topic
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class TopicRabbitConfig {//綁定鍵public final static String man = \"topic.man\";public final static String woman = \"topic.woman\";\@Beanpublic Queue firstQueue() {return new Queue(TopicRabbitConfig.man);}\@Beanpublic Queue secondQueue() {return new Queue(TopicRabbitConfig.woman);}\@BeanTopicExchange exchange() {return new TopicExchange(\"topicExchange\");}//將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man//這樣只要是消息攜帶的路由鍵是topic.man,才會(huì)分發(fā)到該隊(duì)列\(zhòng)@BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}//將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會(huì)分發(fā)到該隊(duì)列\(zhòng)@BeanBinding bindingExchangeMessage2() {returnBindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");}}
生產(chǎn)者發(fā)送消息

直接發(fā)送給隊(duì)列

//指定消息隊(duì)列的名字,直接發(fā)送消息到消息隊(duì)列中\(zhòng)@Testpublic void testSimpleQueue() {// 隊(duì)列名稱String queueName = \"simple.queue\";// 消息String message = \"hello, spring amqp!\";// 發(fā)送消息rabbitTemplate.convertAndSend(queueName, message);}

發(fā)送給交換機(jī),然后走不同的模式

////指定交換機(jī)的名字,將消息發(fā)送給交換機(jī),然后不同模式下,消息隊(duì)列根據(jù)key得到消息\@Testpublic void testSendDirectExchange() {// 交換機(jī)名稱,有三種類型String exchangeName = \"itcast.direct\";// 消息String message =\"紅色警報(bào)!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!\";// 發(fā)送消息,red為隊(duì)列的key,因此此隊(duì)列會(huì)得到消息rabbitTemplate.convertAndSend(exchangeName, \"red\", message);}

也可以將發(fā)送的消息封裝到HashMap中然后發(fā)送給交換機(jī)

import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.HashMap;import java.util.Map;import java.util.UUID;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@RestControllerpublic class SendMessageController {\@AutowiredRabbitTemplate rabbitTemplate;//使用RabbitTemplate,這提供了接收/發(fā)送等等方法\@GetMapping(\"/sendDirectMessage\")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = \"test message, hello!\";String createTime =LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-ddHH:mm:ss\"));Map\ map=new HashMap\<\>();map.put(\"messageId\",messageId);map.put(\"messageData\",messageData);map.put(\"createTime\",createTime);//將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機(jī)TestDirectExchangerabbitTemplate.convertAndSend(\"TestDirectExchange\",\"TestDirectRouting\", map);return \"ok\";}}
消費(fèi)者接收消息
//使用注解@RabbitListener定義當(dāng)前方法監(jiān)聽RabbitMQ中指定名稱的消息隊(duì)列。\@Componentpublic class MessageListener {\@RabbitListener(queues = \"direct_queue\")public void receive(String id){System.out.println(\"已完成短信發(fā)送業(yè)務(wù)(rabbitmq direct),id:\"+id);}}參數(shù)用Map接收也可以\@Component\@RabbitListener(queues = \"TestDirectQueue\")//監(jiān)聽的隊(duì)列名稱TestDirectQueuepublic class DirectReceiver {\@RabbitHandlerpublic void process(Map testMessage) {System.out.println(\"DirectReceiver消費(fèi)者收到消息 : \" +testMessage.toString());}}
高級(jí)特性消息可靠性傳遞

有confirm和return兩種

在application.yml中添加以下配置項(xiàng):

server:port: 8021spring:#給項(xiàng)目來個(gè)名字application:name: rabbitmq-provider#配置rabbitMq 服務(wù)器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虛擬host 可以不設(shè)置,使用server默認(rèn)hostvirtual-host: JCcccHost#確認(rèn)消息已發(fā)送到交換機(jī)(Exchange)#publisher-confirms: truepublisher-confirm-type: correlated#確認(rèn)消息已發(fā)送到隊(duì)列(Queue)publisher-returns: true

有兩種配置方法:

寫到配置類中

寫到工具類或者普通類中,但是這個(gè)類得實(shí)現(xiàn)那兩個(gè)接口

寫法一

編寫消息確認(rèn)回調(diào)函數(shù)

import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configurationpublic class RabbitConfig {\@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//設(shè)置開啟Mandatory,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強(qiáng)制調(diào)用回調(diào)函數(shù)rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {\@Overridepublic void confirm(CorrelationData correlationData, boolean ack, Stringcause) {System.out.println(\"ConfirmCallback:\"+\"相關(guān)數(shù)據(jù):\"+correlationData);System.out.println(\"ConfirmCallback: \"+\"確認(rèn)情況:\"+ack);System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {\@Overridepublic void returnedMessage(Message message, int replyCode, StringreplyText, String exchange, String routingKey) {System.out.println(\"ReturnCallback: \"+\"消息:\"+message);System.out.println(\"ReturnCallback: \"+\"回應(yīng)碼:\"+replyCode);System.out.println(\"ReturnCallback: \"+\"回應(yīng)信息:\"+replyText);System.out.println(\"ReturnCallback: \"+\"交換機(jī):\"+exchange);System.out.println(\"ReturnCallback: \"+\"路由鍵:\"+routingKey);}});return rabbitTemplate;}}
寫法二
\@Component\@Slf4jpublic class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {\@Resourceprivate RedisTemplate\ redisTemplate;\@Resourceprivate RabbitTemplate rabbitTemplate;private String finalId = null;private SmsDTO smsDTO = null;/\*\*\* 發(fā)布者確認(rèn)的回調(diào)\*\* \@param correlationData 回調(diào)的相關(guān)數(shù)據(jù)。\* \@param b ack為真,nack為假\* \@param s 一個(gè)可選的原因,用于nack,如果可用,否則為空。\*/\@Overridepublic void confirm(CorrelationData correlationData, boolean b, Strings) {// 消息發(fā)送成功,將redis中消息的狀態(tài)(status)修改為1if (b) {redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +finalId, \"status\", 1);} else {// 發(fā)送失敗,放入redis失敗集合中,并刪除集合數(shù)據(jù)log.error(\"短信消息投送失?。簕}\--\>{}\", correlationData, s);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,this.smsDTO);}}/\*\*\* 發(fā)生異常時(shí)的消息返回提醒\*\* \@param returnedMessage\*/\@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error(\"發(fā)生異常,返回消息回調(diào):{}\", returnedMessage);// 發(fā)送失敗,放入redis失敗集合中,并刪除集合數(shù)據(jù)redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,this.smsDTO);}\@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}}
消息確認(rèn)機(jī)制

手動(dòng)確認(rèn)

yml配置#手動(dòng)確認(rèn) manuallistener:simple:acknowledge-mode: manual
寫法一

首先在消費(fèi)者項(xiàng)目中創(chuàng)建MessageListenerConfig

import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configurationpublic class MessageListenerConfig {\@Autowiredprivate CachingConnectionFactory connectionFactory;\@Autowiredprivate MyAckReceiver myAckReceiver;//消息接收處理類\@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //RabbitMQ默認(rèn)是自動(dòng)確認(rèn),這里改為手動(dòng)確認(rèn)消息//設(shè)置一個(gè)隊(duì)列container.setQueueNames(\"TestDirectQueue\");//如果同時(shí)設(shè)置多個(gè)如下: 前提是隊(duì)列都是必須已經(jīng)創(chuàng)建存在的//container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");//另一種設(shè)置隊(duì)列的方法,如果使用這種情況,那么要設(shè)置多個(gè),就使用addQueues//container.setQueues(new Queue(\"TestDirectQueue\",true));//container.addQueues(new Queue(\"TestDirectQueue2\",true));//container.addQueues(new Queue(\"TestDirectQueue3\",true));container.setMessageListener(myAckReceiver);return container;}}

然后創(chuàng)建手動(dòng)確認(rèn)監(jiān)聽類MyAckReceiver(手動(dòng)確認(rèn)模式需要實(shí)現(xiàn)ChannelAwareMessageListener)

import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.Map;\@Componentpublic class MyAckReceiver implements ChannelAwareMessageListener {\@Overridepublic void onMessage(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte\[\] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(newByteArrayInputStream(body));Map\ msgMap = (Map\) ois.readObject();String messageId = msgMap.get(\"messageId\");String messageData = msgMap.get(\"messageData\");String createTime = msgMap.get(\"createTime\");ois.close();System.out.println(\" MyAckReceiver messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"消費(fèi)的主題消息來自:\"+message.getMessageProperties().getConsumerQueue());channel.basicAck(deliveryTag, true);//第二個(gè)參數(shù),手動(dòng)確認(rèn)可以被批處理,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn)delivery_tag 小于等于傳入值的所有消息//channel.basicReject(deliveryTag,true);//第二個(gè)參數(shù),true會(huì)重新放回隊(duì)列,所以需要自己根據(jù)業(yè)務(wù)邏輯判斷什么時(shí)候使用拒絕} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}

如果想實(shí)現(xiàn)不同的隊(duì)列,有不同的監(jiān)聽確認(rèn)處理機(jī)制,做不同的業(yè)務(wù)處理,那么這樣做:

首先需要在配置類中綁定隊(duì)列,然后只需要根據(jù)消息來自不同的隊(duì)列名進(jìn)行區(qū)分處理即可

import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.Map;\@Componentpublic class MyAckReceiver implements ChannelAwareMessageListener {\@Overridepublic void onMessage(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte\[\] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(newByteArrayInputStream(body));Map\ msgMap = (Map\) ois.readObject();String messageId = msgMap.get(\"messageId\");String messageData = msgMap.get(\"messageData\");String createTime = msgMap.get(\"createTime\");ois.close();if(\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){System.out.println(\"消費(fèi)的消息來自的隊(duì)列名為:\"+message.getMessageProperties().getConsumerQueue());System.out.println(\"消息成功消費(fèi)到 messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"執(zhí)行TestDirectQueue中的消息的業(yè)務(wù)處理流程\...\...\");}if(\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){System.out.println(\"消費(fèi)的消息來自的隊(duì)列名為:\"+message.getMessageProperties().getConsumerQueue());System.out.println(\"消息成功消費(fèi)到 messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"執(zhí)行fanout.A中的消息的業(yè)務(wù)處理流程\...\...\");}channel.basicAck(deliveryTag, true);//channel.basicReject(deliveryTag, true);//為true會(huì)重新放回隊(duì)列} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}
寫法二
\@Component\@Slf4jpublic class SendSmsListener {\@Resourceprivate RedisTemplate\ redisTemplate;\@Resourceprivate SendSmsUtils sendSmsUtils;/\*\*\* 監(jiān)聽發(fā)送短信普通隊(duì)列\(zhòng)* \@param smsDTO\* \@param message\* \@param channel\* \@throws IOException\*/\@RabbitListener(queues = SMS_QUEUE_NAME)public void sendSmsListener(SmsDTO smsDTO, Message message, Channelchannel) throws IOException {String messageId = message.getMessageProperties().getMessageId();int retryCount = (int)redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +messageId, \"retryCount\");if (retryCount \> 3) {//重試次數(shù)大于3,直接放到死信隊(duì)列l(wèi)og.error(\"短信消息重試超過3次:{}\", messageId);//basicReject方法拒絕deliveryTag對(duì)應(yīng)的消息,第二個(gè)參數(shù)是否requeue,true則重新入隊(duì)列,否則丟棄或者進(jìn)入死信隊(duì)列。//該方法reject后,該消費(fèi)者還是會(huì)消費(fèi)到該條被reject的消息。channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);return;}try {String phoneNum = smsDTO.getPhoneNum();String code = smsDTO.getCode();if(StringUtils.isAnyBlank(phoneNum,code)){throw new RuntimeException(\"sendSmsListener參數(shù)為空\");}// 發(fā)送消息SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,code);SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();SendStatus sendStatus = sendStatusSet\[0\];if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"sendsuccess\".equals(sendStatus.getMessage())){throw new RuntimeException(\"發(fā)送驗(yàn)證碼失敗\");}//手動(dòng)確認(rèn)消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info(\"短信發(fā)送成功:{}\",smsDTO);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);} catch (Exception e) {redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}/\*\*\* 監(jiān)聽到發(fā)送短信死信隊(duì)列\(zhòng)* \@param sms\* \@param message\* \@param channel\* \@throws IOException\*/\@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)public void smsDelayQueueListener(SmsDTO sms, Message message, Channelchannel) throws IOException {try{log.error(\"監(jiān)聽到死信隊(duì)列消息==\>{}\",sms);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
消費(fèi)端限流
#配置RabbitMQspring:rabbitmq:host: 192.168.126.3port: 5672username: guestpassword: guestvirtual-host: /#開啟自動(dòng)確認(rèn) none 手動(dòng)確認(rèn) manuallistener:simple:#消費(fèi)端限流機(jī)制必須開啟手動(dòng)確認(rèn)acknowledge-mode: manual#消費(fèi)端最多拉取的消息條數(shù),簽收后不滿該條數(shù)才會(huì)繼續(xù)拉取prefetch: 5
消息存活時(shí)間TTL

可以設(shè)置隊(duì)列的存活時(shí)間,也可以設(shè)置具體消息的存活時(shí)間

設(shè)置隊(duì)列中所有消息的存活時(shí)間

return QueueBuilder

.durable(QUEUE_NAME)//隊(duì)列持久化

.ttl(10000)//設(shè)置隊(duì)列的所有消息存活10s

.build();

即在創(chuàng)建隊(duì)列時(shí),設(shè)置存活時(shí)間

設(shè)置某條消息的存活時(shí)間

//發(fā)送消息,并設(shè)置該消息的存活時(shí)間

\@Testpublic void testSendMessage(){//1.創(chuàng)建消息屬性MessageProperties messageProperties = new MessageProperties();//2.設(shè)置存活時(shí)間messageProperties.setExpiration(\"10000\");//3.創(chuàng)建消息對(duì)象Message message = newMessage(\"sendMessage\...\".getBytes(),messageProperties);//4.發(fā)送消息rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);}

若設(shè)置中間的消息的存活時(shí)間,當(dāng)過期時(shí),該消息不會(huì)被移除,但是該消息已經(jīng)不會(huì)被消費(fèi)了,需要等到該消息到隊(duì)里頂端才會(huì)被移除。因?yàn)殛?duì)列是頭出,尾進(jìn),故而要移除它需要等到它在頂端時(shí)才可以。

在隊(duì)列設(shè)置存活時(shí)間,也在單條消息設(shè)置存活時(shí)間,則以時(shí)間短的為準(zhǔn)

死信隊(duì)列

死信隊(duì)列和普通隊(duì)列沒有任何區(qū)別,只需要將普通隊(duì)列需要綁定死信交換機(jī)和死信隊(duì)列就能夠?qū)崿F(xiàn)功能

import org.springframework.amqp.core.\*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configuration//Rabbit配置類public class RabbitConfig4 {private final String DEAD_EXCHANGE = \"dead_exchange\";private final String DEAD_QUEUE = \"dead_queue\";private final String NORMAL_EXCHANGE = \"normal_exchange\";private final String NORMAL_QUEUE = \"normal_queue\";//創(chuàng)建死信交換機(jī)\@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE)//交換機(jī)類型 ;參數(shù)為名字topic為通配符模式的交換機(jī).durable(true)//是否持久化,true即存到磁盤,false只在內(nèi)存上.build();}//創(chuàng)建死信隊(duì)列\(zhòng)@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE)//隊(duì)列持久化//.maxPriority(10)//設(shè)置隊(duì)列的最大優(yōu)先級(jí),最大可以設(shè)置255,但官網(wǎng)推薦不超過10,太高比較浪費(fèi)資源.build();}//死信交換機(jī)綁定死信隊(duì)列\(zhòng)@Bean//@Qualifier注解,使用名稱裝配進(jìn)行使用public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchangeexchange, \@Qualifier(DEAD_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(\"dead_routing\").noargs();}//創(chuàng)建普通交換機(jī)\@Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE)//交換機(jī)類型 ;參數(shù)為名字topic為通配符模式的交換機(jī).durable(true)//是否持久化,true即存到磁盤,false只在內(nèi)存上.build();}//創(chuàng)建普通隊(duì)列\(zhòng)@Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE)//隊(duì)列持久化//.maxPriority(10)//設(shè)置隊(duì)列的最大優(yōu)先級(jí),最大可以設(shè)置255,但官網(wǎng)推薦不超過10,太高比較浪費(fèi)資源.deadLetterExchange(DEAD_EXCHANGE)//綁定死信交換機(jī).deadLetterRoutingKey(\"dead_routing\")//死信隊(duì)列路由關(guān)鍵字.ttl(10000)//消息存活10s.maxLength(10)//隊(duì)列最大長(zhǎng)度為10.build();}//普通交換機(jī)綁定普通隊(duì)列\(zhòng)@Bean//@Qualifier注解,使用名稱裝配進(jìn)行使用public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchangeexchange, \@Qualifier(NORMAL_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(\"my_routing\").noargs();}}
延遲隊(duì)列

RabbitMQ并未實(shí)現(xiàn)延遲隊(duì)列功能,所以可以通過死信隊(duì)列實(shí)現(xiàn)延遲隊(duì)列的功能

即給普通隊(duì)列設(shè)置存活時(shí)間30分鐘,過期后發(fā)送至死信隊(duì)列,在死信消費(fèi)者監(jiān)聽死信隊(duì)列消息,查看訂單狀態(tài),是否支付,未支付則取消訂單,回退庫存即可。

消費(fèi)者監(jiān)聽延遲隊(duì)列

\@Componentpublic class ExpireOrderConsumer {//監(jiān)聽過期訂單隊(duì)列\(zhòng)@RabbitListener(queues = \"expire_queue\")public void listenMessage(String orderId){//模擬處理數(shù)據(jù)庫等業(yè)務(wù)System.out.println(\"查詢\"+orderId+\"號(hào)訂單的狀態(tài),如果已支付無需處理,如果未支付則回退庫存\");}}控制層代碼\@RestControllerpublic class OrderController {\@Autowiredprivate RabbitTemplate rabbitTemplate;\@RequestMapping(value = \"/place/{orderId}\",method =RequestMethod.GET)public String placeOrder(@PathVariable String orderId){//模擬service層處理System.out.println(\"處理訂單數(shù)據(jù)\...\");//將訂單id發(fā)送到訂單隊(duì)列rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);return \"下單成功,修改庫存\";}}

標(biāo)簽:

搶先讀

相關(guān)文章

熱文推薦

精彩放送

關(guān)于我們| 聯(lián)系我們| 投稿合作| 法律聲明| 廣告投放

版權(quán)所有© 2011-2023  產(chǎn)業(yè)研究網(wǎng)  m.www-332159.com

所載文章、數(shù)據(jù)僅供參考.本站不作任何非法律允許范圍內(nèi)服務(wù)!

聯(lián)系我們:39 60 29 14 2 @qq.com

皖I(lǐng)CP備2022009963號(hào)-13