Preface
MQ(Message Queue, 消息队列)是一种应用系统之间的通信方法. 是通过读写出入队列的消息来通信(RPC则是通过直接调用彼此来通信的).
AMQP, 即Advanced Message Queuing Protocol, 高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计. 消息中间件主要用于组件之间的解耦, 消息的发送者无需知道消息使用者的存在, 反之亦然.
AMQP的主要特征是面向消息, 队列, 路由(包括点对点和发布/订阅), 可靠性, 安全.RabbitMQ是一个开源的AMQP实现, 服务器端用Erlang语言编写.
启动
这里使用Docker启动.
docker-compose:
1 | version: '3.7' |
基本概念
Hello World
先来利用原生的 RabbitMQ Java Client 来运行一个 Hello World 程序:
1 | package com.yangbingdong.rabbitmq.basic; |
工具类:
1 | package com.yangbingdong.rabbitmq.basic; |
运行结果:
Connection
ConnectionFactory
, Connection
, Channel
这三个都是RabbitMQ对外提供的API中最基本的对象, 不管是服务器端还是客户端都会首先创建这三类对象.ConnectionFactory
为Connection
的制造工厂.Connection
是与RabbitMQ服务器的socket链接, 它封装了socket协议及身份验证相关部分逻辑.Channel
是我们与RabbitMQ打交道的最重要的一个接口, 大部分的业务操作是在Channel这个接口中完成的, 包括定义Queue, 定义Exchange, 绑定Queue与Exchange, 发布消息等.
Queue
Queue是RabbitMQ的内部对象, 用于存储消息, RabbitMQ中的消息都只能存储在Queue中, 生产者生产消息并最终投递到Queue中, 消费者可以从Queue中获取消息并消费. 队列是有Channel声明的, 而且这个操作是幂等的, 同名的队列多次声明也只会创建一次.
Exchange
RabbitMQ消息模式的核心理念是: 生产者没有直接发送任何消费到队列. 实际上, 生产者都不知道这个消费是发送给哪个队列的.
相反, 生产者只能发送消息给转发器, 转发器是非常简单的. 一方面它接受生产者的消息, 另一方面向队列推送消息. 转发器必须清楚的知道如何处理接收到的消息. 附加一个特定的队列吗? 附加多个队列? 或者是否丢弃? 这些规则通过转发器的类型进行定义.
类型有:
Direct
,Topic
,Headers
和Fanout
fanout exchange
发送到该交换器的所有消息, 会被路由到其绑定的所有队列. 该交换器不需要指定routingKey.
direct exchange
发送到该交换器的消息, 会通过路由键完全匹配, 匹配成功就会路由到指定队列.
发送到 direct exchange
的消息, 会通过消息的 routing key
路由:
- 如果
routing key
值为queue.direct.key1
, 会路由到QUEUE-1
- 如果
routing key
值为queue.direct.key2
, 会路由到QUEUE-2
- 如果
routing key
值为其他, 不会路由到任何队列
topic exchange
发送到该交换器的消息, 会通过路由键模糊匹配, 匹配成功就会路由到指定队列, 路由键通过 .
来划分为多个单词, *
匹配一个单词, #
匹配零个或多个单词.
发送到 topic exchange
的消息, 会通过消息的 routing key
模糊匹配再路由:
- 如果
routing key
值为queue.topic.key1
, 会路由到QUEUE-1
和QUEUE-2
- 如果
routing key
值为test.topic.key2
, 会路由到QUEUE-1
- 如果
routing key
值为queue
, 会路由到QUEUE-2
- 如果
routing key
值为queue.hello
, 会路由到QUEUE-2
- 如果
routing key
值为test.test.test
, 不会路由到任何队列
header exchange
发送到该交换器的消息, 会通过消息的 header
信息匹配, 匹配成功就会路由到指定队列.
消息的 header
信息是 key-value
的形式, 每条消息可以包含多条 header
信息, 路由规则是通过 header
信息的 key
来匹配的, Spring Boot 封装的匹配规则有三种:
where(key).exists()
:匹配单个key
whereAll(keys).exist()
:同时匹配多个key
whereAny(keys).exist()
:匹配多个key
中的一个或多个
发送到 headers exchange
的消息, 会通过消息的 header
匹配:
1 |
|
- 如果
header
信息存在one=XXXX
, 会路由到QUEUE-1
- 如果
header
信息存在all1=XXXX
和all2=XXXX
, 会路由到QUEUE-2
- 如果
header
信息存在any1=XXXX
或any2=XXXX
, 会路由到QUEUE-3
header
不能以x-
开头, 参考官方文档:https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-headers
Spring AMQP的几个参数说明
1 | spring: |
ListenerContainer线程池配置
默认一个消费者对应一个新的线程, 可配置共享线程池节约线程.
Spring AMQP
Spring Boot 的相关配置在 RabbitAutoConfiguration
-> RabbitAnnotationDrivenConfiguration
.
ListernContainer的线程池可以配置在 SimpleRabbitListenerContainerFactory
中:
1 | "rabbitListenerContainerFactory") (name = |
Spring Cloud Stream
对于 Spring Cloud Stream, 可以实现 ListenerContainerCustomizer
接口定制化配置:
1 | public class CustomListenerContainerCustomizer implements ListenerContainerCustomizer { |
持久化
开启消息持久化可在RabbitMQ重启后不丢失消息.
在Docker中, 数据存放在
/var/lib/rabbitmq
.
Spring AMQP
在Spring AMQP中, 通过Queue构造器可指定持久化是否开启:
1 |
|
第二个参数指的是是否开启持久化:
ExChange 指定持久化也一样:
Spring Cloud Stream
在Spring Cloud Stream中指定Queue与Exchange持久化只需要通过以下两个参数配置, 默认值都为 true
:
1 | spring.cloud.stream.rabbit.bindings.<channelName>.consumer.durableSubscription= |
手动ACK
在Spring AMQP中ACK是自动完成的, 如果报错了, 消息不会丢失, 但是会无限循环消费, 一直报错, 如果开启了错误日志很容易就把磁盘空间耗完.
在Spring Cloud Stream中默认情况下会自动重试3次, 再自动ACK. 可通过 maxAttempts
参数指定重试次数.
配置
Spring Cloud Stream
1 | spring.cloud.stream.rabbit.bindings.<channelName>.consumer.acknowledgeMode=MANUAL |
Spring AMQP
1 | spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL |
代码示例
Spring Cloud Stream
1 | .class) (AckTopic |
- 使用
channel.basicAck(deliveryTag, false)
进行ACK. Channel
也可以通过message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
获取,deliveryTag
也一样.
Spring AMQP
1 |
|
Spring Boot 中使用方式
创建队列交换机
通过@RabbitListener创建
1 | ( |
通过声明Bean创建
1 |
|
通过RabbitAdmin动态注册
1 |
|
1 |
|
topic.yml
:
1 | messaging: |
监听
1 |
|
@RabbitListener
注解的消费者监听方法, 默认有几个可以自动注入的参数对象:
org.springframework.amqp.core.Message
消息原始对象com.rabbitmq.client.Channel
接收消息所所在的channel
org.springframework.messaging.Message
amqp的原始消息对象转换为messaging后的消息对象, 该消息包含自定义消息头和标准的amqp消息头
此外, 非以上参数, 自定义参数对象可以通过@Header
/@Headers
/@Payload
标注为消息头或消息体接受对象.
DLQ队列
通过下面参数开启DLQ转发:
1 | spring.cloud.stream.rabbit.bindings.<channelName>.consumer.auto-bind-dlq=true |
当消息消费失败后, 消息会原封不动地转发到 error-topic.test.dlq
这个死信队列中.
点击进入死信队列, 可以使用 Get Message
查看消息, Move message
可以将消息移动到原先的队列中继续消费.
设置死信队列消息过期时间:
如果某些消息存在时效性, 可通过一下参数配置过期时间, 超过时间后, 消息会自动移除掉:
1 | spring.cloud.stream.rabbit.bindings.<channelName>.consumer.dlq-ttl=10000 |
将异常信息放到消息header中:
1 | spring.cloud.stream.rabbit.bindings.<channelName>.consumer.republish-to-dlq=true |
重新入队
重新入队是指消息消费失败了之后, 消息将不会被抛弃, 而是重新放入队列中.
可以通过以下参数开启:
1 | spring.cloud.stream.rabbit.bindings.<channelName>.consumer.requeue-rejected=true |
这样会导致一个问题就是, 业务代码的缺陷导致的异常, 无论消费多少次, 这个消息总是失败的. 那么会导致消息堆积越来越大, 那么可以通过配合DLQ来避免这个情况:
1 | spring.cloud.stream.rabbit.bindings.<channelName>.consumer.auto-bind-dlq=true |
然后到达一定重试次数之后抛出 AmqpRejectAndDontRequeueException
这个指定的异常, 消息就会被推到死信队列中了:
1 | (TestTopic.INPUT) |
总结:
上面介绍了几种Spring Cloud Stream RabbitMQ中的重试策略, 个人认为比较适合实际业务场景的做法是, 失败后, 将消息持久化到数据库中, 后续再通过邮件或钉钉等方式通知开发人员进行处理. 因为一般场景下 , 绝大部分的异常消息都是由于业务代码的缺陷导致的, 所以怎么重试都会失败, 并且消费逻辑中一定要做好幂等校验.
延迟队列
实现方式
RabbitMQ的延迟队列可以通过死信队列来实现, 但这种方式显得比较臃肿并且有致命的缺陷(设置了不同的过期时间, 队列并不会按照这些过期时间来顺序消费), 具体请参考: springboot整合rabbitmq实现延时队列之TTL方式
比较优雅的方式是通过 rabbitmq_delayed_message_exchange
插件来实现延迟队列. 插件介绍可查看官网: https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
流程大概是这样的:
1: 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
2: 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
3: 队列(queue)再把消息发送给监听它的消费者(customer)
插件安装
只有RabbitMQ 3.6.x以上才支持
这里使用Docker部署, rabbitmq_delayed_message_exchange
插件需要到 官网下载.
Dockerfile:
1 | FROM rabbitmq:3.7-management |
enable_plugins:
1 | [rabbitmq_delayed_message_exchange,rabbitmq_management]. |
注意: 插件需要解压放到Dockerfile根目录.
或者这个Dockerfile也可以:
1 | FROM rabbitmq:3.7-management |
构建:
1 | docker build -t my-rabbitmq . |
代码示例
Spring Cloud Stream
application.yml
:
1 | spring: |
application-stream-rabbitmq-delay.yml
:
1 | spring: |
delayedExchange
设置为true
表示将exchange
声明为Delayed Message Exchange
. 生产者以及消费者都需要配置这个, 否则会报以下错误:
1 | Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay-topic' in vhost '/': received 'topic' but current is ''x-delayed-message'', class-id=40, method-id=10) |
代码:
1 | public interface DelayTopic { |
1 | .class) (DelayTopic |
发送延迟消息:
1 | public class XDelaySender extends SpringBootRabbitmqApplicationTests { |
运行结果, 可以看到发送与接受之间差了5秒:
Spring AMQP
Configuration:
1 |
|
常量类:
1 | public final class MqConstant { |
生产者:
1 |
|
消费者:
1 |
|
查看延迟消息数量
这个可以通过RabbitMQ的管理页面查看:
独占队列
某些场景下, 我们对消息的处理具有严格的顺序依赖性, 比如下一个消息的处理需要基于上一个消息的处理结果.
这时候, 一般比较暴力的做法就是只部署一台消费者. 还有另外一种做法便是独占队列.
以RabbitMQ为例, 使用注解的话只需要多加一个 exclusive = true
的参数:
1 | "${items.updated.queue}", exclusive = true) (queues = |
Spring Cloud Stream 配置:
1 | spring: |
recoveryInterval
: 由于RabbitMq的独占队列只有一个消费者能成功订阅, 后面的消费者都会失败并不断地重试, 我们可以将重试时间调大一点(默认为5000ms).
并且会有一个 WARN 级别的日志打印出来, 我们可以自己实现 ConditionalExceptionLogger
接口将日志改为 INFO 级别:
1 | public class CustomConditionalExceptionLogger implements ConditionalExceptionLogger { |
注入到 SimpleMessageListenerContainer
:
1 | public class CustomListenerContainerCustomizer implements ListenerContainerCustomizer { |
注册到Spring容器:
1 |
|
启用了独占模式的队列中, 可以看到这个:
附录
多Binder配置
spring.cloud.stream.bindings.{channel-name}.binder
:设定指定通道binder名称,完全自定义;spring.cloud.stream.binders.{binder-name}.type
:对自定义的binder设定其类型,rabbit或者kafka;spring.cloud.stream.binders.{binder-name}.environment.{*}
:对自定义的binder设定其配置项,如host等;spring.cloud.stream.default-binder
:除了特殊的通道需要设定binder,其他的channel需要从所有自定义的binder选择一个作为默认binder,即所有非指定binder的通道均采用此default-binder
Finally
参考:
https://www.kancloud.cn/longxuan/rabbitmq-arron
http://blog.didispace.com/spring-cloud-starter-finchley-7-7/
https://blog.csdn.net/eumenides_/article/details/86025773
https://blog.csdn.net/songhaifengshuaige/article/details/79266444