RabbitMq(五) -- SpringBoot整合 RabbitMQ 完整实现

张开发
2026/4/10 17:13:09 15 分钟阅读

分享文章

RabbitMq(五) -- SpringBoot整合 RabbitMQ 完整实现
1. 概述RabbitMQ 是基于 AMQP 协议的消息中间件在分布式系统中常用于解耦、异步处理、流量削峰等场景。SpringBoot 通过 spring-boot-starter-amqp 提供了对 RabbitMQ 的深度整合让开发者能够以声明式或编程式的方式轻松操作消息队列。本文将详细讲解队列创建的两种方式Bean 方式与 AmqpAdmin 方式消息接收的两种方式RabbitListener 与 RabbitHandler每个知识点均配有详细的文字解释和完整的代码示例参数含义会逐一说明。2. 环境准备2.1 添加依赖dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency2.2 配置连接信息spring:rabbitmq:host:127.0.0.1# RabbitMQ 服务地址port:5672# 连接端口注意不是管理界面端口 15672username:guest# 用户名password:guest# 密码virtual-host:/# 虚拟主机默认为 /# 可选连接超时、重试等配置connection-timeout:5000publisher-confirm-type:correlated# 开启发送确认publisher-returns:true# 开启消息返回3. 队列创建的两种方式3.1 方式一使用 Bean 声明式创建通过在 Configuration 类中定义 Queue、Exchange、Binding 的 BeanSpring 容器启动时会自动在 RabbitMQ 服务端创建这些组件。3.1.1 基本示例importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;ConfigurationpublicclassRabbitMQBeanConfig{// 1. 创建队列BeanpublicQueuemyQueue(){// 参数详解// name: 队列名称必须唯一// durable: 是否持久化true: 重启后队列依然存在// exclusive: 是否独占true: 仅当前连接可用连接关闭后队列删除// autoDelete: 是否自动删除最后一个消费者取消后删除队列// 还有另一种构造方式new Queue(name, durable, exclusive, autoDelete, arguments)returnnewQueue(myQueue,true,false,false);}// 2. 创建交换机以 DirectExchange 为例BeanpublicDirectExchangemyExchange(){// 参数详解// name: 交换机名称// durable: 是否持久化// autoDelete: 是否自动删除// arguments: 扩展参数returnnewDirectExchange(myExchange,true,false);}// 3. 创建绑定关系BeanpublicBindingmyBinding(){returnBindingBuilder.bind(myQueue())// 绑定哪个队列.to(myExchange())// 绑定到哪个交换机.with(myRoutingKey);// 路由键}}3.1.2 带参数的队列死信队列、TTL 等BeanpublicQueuettlQueue(){MapString,ObjectargsnewHashMap();// 设置队列中消息的 TTL毫秒args.put(x-message-ttl,60000);// 设置死信交换机args.put(x-dead-letter-exchange,deadLetterExchange);// 设置死信路由键args.put(x-dead-letter-routing-key,deadLetterKey);// 设置队列最大长度条数args.put(x-max-length,1000);// 设置队列最大容量字节args.put(x-max-length-bytes,10485760);returnnewQueue(ttlQueue,true,false,false,args);}3.2 方式二使用 AmqpAdmin 动态创建AmqpAdmin 是 Spring 提供的用于管理 RabbitMQ 组件的接口可以在运行时动态声明队列、交换机和绑定关系适合需要根据业务逻辑动态创建临时队列的场景。3.2.1 AmqpAdmin 常用方法importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;ServicepublicclassDynamicQueueService{AutowiredprivateAmqpAdminamqpAdmin;// 动态创建队列publicvoidcreateQueue(StringqueueName,booleandurable,booleanexclusive,booleanautoDelete){QueuequeuenewQueue(queueName,durable,exclusive,autoDelete);amqpAdmin.declareQueue(queue);}// 动态创建交换机publicvoidcreateExchange(StringexchangeName,Stringtype,booleandurable){AbstractExchangeexchange;switch(type){casedirect:exchangenewDirectExchange(exchangeName,durable,false);break;casetopic:exchangenewTopicExchange(exchangeName,durable,false);break;casefanout:exchangenewFanoutExchange(exchangeName,durable,false);break;caseheaders:exchangenewHeadersExchange(exchangeName,durable,false);break;default:thrownewIllegalArgumentException(不支持的类型: type);}amqpAdmin.declareExchange(exchange);}// 动态创建绑定publicvoidcreateBinding(StringqueueName,StringexchangeName,StringroutingKey){BindingbindingnewBinding(queueName,Binding.DestinationType.QUEUE,exchangeName,routingKey,null// arguments);amqpAdmin.declareBinding(binding);}// 删除队列publicbooleandeleteQueue(StringqueueName){returnamqpAdmin.deleteQueue(queueName);}// 清空队列消息不删除队列publicvoidpurgeQueue(StringqueueName){amqpAdmin.purgeQueue(queueName,false);}}3.2.2 动态创建带参数的队列publicvoidcreateQueueWithArgs(StringqueueName){MapString,ObjectargsnewHashMap();args.put(x-message-ttl,30000);// 30秒过期args.put(x-max-priority,10);// 支持优先级QueuequeuenewQueue(queueName,true,false,false,args);amqpAdmin.declareQueue(queue);}3.3 两种方式的对比4. 消息发送RabbitTemplate在讲解消息接收之前先了解如何发送消息以便测试接收功能。importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;ServicepublicclassMessageSender{AutowiredprivateRabbitTemplaterabbitTemplate;// 简单发送字符串publicvoidsend(Stringexchange,StringroutingKey,Stringmessage){rabbitTemplate.convertAndSend(exchange,routingKey,message);}// 发送带消息属性的消息publicvoidsendWithProperties(Stringexchange,StringroutingKey,Stringmessage){MessagePropertiespropertiesnewMessageProperties();properties.setContentType(text/plain);properties.setPriority(5);// 优先级properties.setExpiration(60000);// 消息过期时间毫秒properties.setHeader(customKey,customValue);MessagemsgMessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)).andProperties(properties).build();rabbitTemplate.send(exchange,routingKey,msg);}// 发送 Java 对象需要配置 Jackson 转换器publicvoidsendObject(Stringexchange,StringroutingKey,Objectobj){rabbitTemplate.convertAndSend(exchange,routingKey,obj);}}5. 消息接收的两种方式5.1 方式一RabbitListener 注解RabbitListener 可以标注在方法或类上用于声明一个消息监听容器自动监听指定队列。5.1.1 基础用法importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;ComponentpublicclassBasicListener{// 监听单个队列自动 ACK默认RabbitListener(queuesmyQueue)publicvoidlistenSimple(Stringmessage){System.out.println(收到消息: message);}// 监听多个队列RabbitListener(queues{queueA,queueB})publicvoidlistenMultiple(Stringmessage){System.out.println(从多个队列收到: message);}// 使用路由键动态创建队列需要交换机已存在RabbitListener(bindingsQueueBinding(valueQueue(valueautoQueue,durabletrue),exchangeExchange(valuemyExchange,typeExchangeTypes.DIRECT),keymyRoutingKey))publicvoidlistenWithBinding(Stringmessage){System.out.println(通过 QueueBinding 自动创建: message);}}5.1.2 RabbitListener 常用参数详解5.1.3 手动确认消息ACK默认情况下消息消费成功后会自动 ACK。若需要手动确认需配置 acknowledge-mode 为 MANUAL并在监听方法中引入 Channel 和 deliveryTag。importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;ComponentpublicclassManualAckListener{RabbitListener(queuesmyQueue,containerFactorymanualAckContainerFactory)publicvoidlistenManual(Messagemessage,Channelchannel)throwsException{longdeliveryTagmessage.getMessageProperties().getDeliveryTag();try{// 业务处理System.out.println(处理消息: newString(message.getBody()));// 手动确认单条channel.basicAck(deliveryTag,false);}catch(Exceptione){// 拒绝并重新入队第二个参数 true 表示重新入队channel.basicNack(deliveryTag,false,true);}}}需要配置 containerFactoryBeanpublicRabbitListenerContainerFactory?manualAckContainerFactory(ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactorynewSimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动确认returnfactory;}5.2 方式二RabbitListener RabbitHandler 组合当一个队列中包含多种类型或格式的消息时可以在类上标注 RabbitListener然后在多个方法上标注 RabbitHandler根据消息类型自动路由到对应方法。5.2.1 根据消息类型分发importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;ComponentRabbitListener(queuesmultiTypeQueue)publicclassMultiTypeListener{// 处理 String 类型RabbitHandlerpublicvoidhandleString(Stringmessage){System.out.println(字符串消息: message);}// 处理 Integer 类型RabbitHandlerpublicvoidhandleInteger(Integernumber){System.out.println(整数消息: number);}// 处理自定义 User 对象需要配置 Jackson 转换器RabbitHandlerpublicvoidhandleUser(Useruser){System.out.println(用户对象: user.getName(), user.getAge());}// 处理 byte[] 类型原始消息体RabbitHandlerpublicvoidhandleBytes(byte[]bytes){System.out.println(字节消息: newString(bytes));}// 处理 Message 原始对象包含属性RabbitHandlerpublicvoidhandleRawMessage(Messagemessage){StringbodynewString(message.getBody());StringcontentTypemessage.getMessageProperties().getContentType();System.out.println(原始消息 - 类型: contentType, 内容: body);}}5.2.2 RabbitHandler 方法参数匹配规则根据方法参数的 Java 类型进行匹配需要 MessageConverter 能将消息体转换成对应类型如果存在多个匹配的方法会按照最具体的类型进行选择可以通过 Payload 和 Headers 注解精确控制RabbitHandlerpublicvoidhandleWithAnnotations(PayloadUseruser,HeadersMapString,Objectheaders){System.out.println(Payload: user);System.out.println(Headers: headers);}5.2.3 配置 Jackson 消息转换器支持对象importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;ConfigurationpublicclassRabbitMQConverterConfig{BeanpublicMessageConverterjacksonMessageConverter(){returnnewJackson2JsonMessageConverter();}}配置后发送对象时会自动转为 JSON接收时会自动反序列化为对应的 Java 类型。6. 完整示例综合运用6.1 配置类使用 Bean 创建队列ConfigurationpublicclassFullConfig{BeanpublicQueueorderQueue(){returnnewQueue(order.queue,true);}BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange(order.exchange,true,false);}BeanpublicBindingorderBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with(order.create);}}6.2 发送者ServicepublicclassOrderSender{AutowiredprivateRabbitTemplaterabbitTemplate;publicvoidsendOrder(Orderorder){rabbitTemplate.convertAndSend(order.exchange,order.create,order);}}6.3 接收者混合使用两种接收方式ComponentpublicclassOrderListener{// 方式一简单接收RabbitListener(queuesorder.queue)publicvoidhandleOrder(Orderorder){System.out.println(收到订单: order.getOrderId());}// 方式二多类型处理器ComponentRabbitListener(queuesorder.queue)publicstaticclassMultiHandler{RabbitHandlerpublicvoidonOrder(Orderorder){System.out.println(Handler 处理订单: order.getOrderId());}RabbitHandlerpublicvoidonString(Stringstr){System.out.println(意外收到字符串: str);}}}7. 常见问题与注意事项队列是否存在使用 RabbitListener 时若队列不存在且没有 bindings 自动创建启动会报错。建议使用 Bean 预先声明或 bindings 属性。消息转换器如果发送和接收使用不同的消息转换器可能导致类型转换异常。全局统一配置 MessageConverter 为 Jackson2JsonMessageConverter 可避免大部分问题。并发消费concurrency 参数可以增加消费者实例提高吞吐量但要注意业务幂等性。消息确认生产环境建议使用手动 ACK避免业务异常导致消息丢失。死信队列务必为重要业务队列配置死信交换机防止消息堆积或异常丢失后无法追溯。8. 总结本文详细介绍了 SpringBoot 整合 RabbitMQ 的核心操作队列创建Bean 方式适合固定结构AmqpAdmin 适合动态需求消息接收RabbitListener 简单直接配合 RabbitHandler 可处理多类型消息

更多文章