mqtt-plus 架构解析(二):一条 MQTT 消息如何到达你的 @MqttListener

张开发
2026/4/13 11:06:37 15 分钟阅读

分享文章

mqtt-plus 架构解析(二):一条 MQTT 消息如何到达你的 @MqttListener
mqtt-plus 架构解析二一条 MQTT 消息如何到达你的 MqttListener摘要很多人第一次使用MqttListener时感受到的是“写起来很顺”但框架内部到底做了哪些事往往并不直观。本文沿着MqttInboundMessageSink - MqttMessageRouter - MqttListenerRegistry - ListenerInvoker这条链路拆解一条 MQTT 消息是如何到达监听方法的并重点解释两个关键设计为什么反序列化发生在 listener 分发之后以及为什么用户代码不能直接跑在客户端回调线程里。项目地址项目地址https://github.com/mqttplus/mqtt-plus配套的示例工程https://github.com/mqttplus/mqtt-plus-examples如果你对这个方向感兴趣欢迎关注、试用也欢迎一起交流 issue 和 PR。如果这篇文章对你有帮助欢迎点赞、收藏也欢迎给项目一个 Star。假设你写了这样一段代码MqttListener(brokerprimary,topicsdrone//status,payloadTypeDroneStatus.class)publicvoidonStatus(DroneStatusstatus,MqttHeadersheaders,MqttTopicStringtopic){// business logic}消息来了topic 是drone/001/statuspayload 是一段 bytes。对使用者来说这件事看起来像“框架自动把消息送到了方法里”但从框架设计角度看这个过程至少要回答四个问题消息先交给谁topic 是怎么匹配到 listener 的payload 是在什么时机被转换的方法调用为什么不能直接跑在 MQTT 客户端回调线程里这一篇文章就是把这四个问题拆开讲清楚。一、这篇文章到底想回答什么这一篇只回答三个问题一条消息从 broker 进入框架后会经过哪些核心组件为什么一条消息可能匹配多个 listener而且每个 listener 都要独立转换和独立调用为什么 mqtt-plus 选择“先路由、再转换、再调用”而不是提前把 payload 统一反序列化好如果只先记住一句话可以记这句mqtt-plus 的路由链不是“收到消息就直接调方法”而是先用brokerId topic找到 listener再按 listener 的 payloadType 独立完成转换和调用。这个顺序看起来多了一层但它恰好决定了多 listener、错误处理和扩展点是否还能成立。二、先看整条消息流转链路先把主链路拉平。只要这张图看明白了后面的 topic 匹配、payload 转换和线程模型都会顺很多。ListenerListenerInvokerPayloadConverterInterceptorMqttListenerRegistryMqttMessageRouterMqttInboundMessageSinkAdapterBrokerListenerListenerInvokerPayloadConverterInterceptorMqttListenerRegistryMqttMessageRouterMqttInboundMessageSinkAdapterBrokerdeliver messageonMessage(brokerId, topic, payload, headers)route(...)resolve(brokerId, topic)matched listenersbeforeHandle()convert(payload, targetType)invoke(definition, convertedPayload, context)call MqttListener methodafterHandle()在源码里这条链的几个关键锚点很明确MqttInboundMessageSink定义了统一入站入口onMessage(String brokerId, String topic, byte[] payload, MqttHeaders headers)MqttMessageRouter只暴露一个核心动作route(...)DefaultMqttMessageRouter负责把“查 listener、执行 interceptor、转换 payload、调用方法、聚合错误动作”这些步骤串起来MqttListenerRegistry负责基于brokerId topic找匹配的 listenerListenerInvoker负责把最终参数真正送进方法也就是说mqtt-plus 不是把“监听方法调用”塞进 adapter 里完成的而是先把 adapter 收到的消息提升成一个框架内部统一的入站模型然后交给路由器处理。这一步很关键因为只有这样多个 adapter 才能共享同一套路由逻辑。三、topic 是怎么匹配到 listener 的topic 匹配发生在MqttListenerRegistry.resolve(brokerId, topic)这一层。从实现上看MqttListenerRegistry内部维护的是CopyOnWriteArrayListMqttListenerDefinition。每个MqttListenerDefinition里都带着 broker、topics、qos、payloadType、bean、method 等元信息。路由时它会先做两层过滤先看 broker 是否匹配。只有definition.getBroker().equals(brokerId)或者监听器写的是*才继续往下。再遍历这个 listener 声明的 topic pattern交给MqttTopicMatcher.matches(pattern, topic)去判断。MqttTopicMatcher的逻辑不复杂但它把几个关键语义都定死了只匹配单层 topic#只在最后一层时表示“后续全部匹配”如果 topic 以$开头而订阅模式不以$开头则直接不匹配可以用一张简单的对照图快速建立直觉订阅模式消息 topic是否匹配原因drone//statusdrone/001/status匹配匹配单层drone//statusdrone/001/telemetry不匹配最后一层不同alert/#alert/warn/device/battery匹配#匹配后续所有层alert/#device/alert/warn不匹配起始层不同这里还有一个容易忽略但对后面很重要的点resolve 的结果不是单个 listener而是一个列表。这意味着mqtt-plus 从一开始就把“同一条消息匹配多个 listener”视为正常情况而不是异常情况。也正因为如此后面才会有独立转换、独立调用和错误聚合这些设计。四、一条消息为什么可能会匹配多个 listener如果只从业务角度看很多人会下意识觉得“一条消息应该只交给一个处理器”。但在注解驱动模型下这个假设并不成立。比如同一条消息drone/001/status完全可能同时被下面几类 listener 消费一个用String接收原始文本做日志记录一个用DroneStatus接收结构化对象做业务处理一个用byte[]接收原始 payload 做审计或转发这也是为什么DefaultMqttMessageRouter.route(...)的第一步不是“先转换 payload”而是先拿到matches然后逐个 listener 去处理。Inbound MQTT Messagetopicdrone/001/statusMqttListenerRegistry.resolve()Listener ApayloadTypeStringListener BpayloadTypeDroneStatusListener CpayloadTypebyte[]String converterJSON converterRaw bytesinvoke Ainvoke Binvoke C这张图想表达的核心不是“框架多做了几步”而是同一条消息在进入不同 listener 时其实可能对应完全不同的参数形态。也正因为如此mqtt-plus 的路由器在循环每个MqttListenerDefinition时都会重新创建MqttContext、重新选择PayloadConverter、重新走一次ListenerInvoker.invoke(...)。这是独立分发模型的代价但也是它能支撑灵活监听签名的原因。设计决策反序列化不是在消息进入框架的那一刻统一做掉而是在 listener 分发之后按payloadType独立执行。因为在 mqtt-plus 的模型里同一条消息本来就可能被不同 payloadType 的 listener 同时消费。五、为什么真正的方法调用不是直接method.invoke(payload)从 core 的角度看ListenerInvoker只是一个抽象给我一个MqttListenerDefinition、一个已转换的 payload 和一个MqttContext我负责把它变成真正的方法调用。这个设计本身就说明了一件事路由器并不关心方法参数怎么拼出来它只关心“路由到谁”。在非 Spring 场景里测试里有一个很简单的ReflectiveListenerInvoker逻辑基本就是无参方法就直接invoke(bean)单参方法就把 payload 塞进去但在 Spring 环境里starter 默认装配的是SpringMqttListenerInvoker。它会把converted payload raw payload topic headers一起交给MqttListenerMethodArgumentResolver然后再决定每个参数位该填什么。这一步正好解释了为什么在MqttListener方法里可以同时拿到业务 payloadMqttHeadersMqttTopic标注的 topic原始byte[]也就是说ListenerInvoker的存在不是为了“包一层反射显得高级”而是为了把“路由”和“方法参数绑定”这两件事拆开。这条边界一旦立住路由器就不需要知道 Spring 方法参数解析细节而 Spring 侧也不需要碰消息匹配逻辑。六、为什么用户代码不能直接跑在客户端回调线程里这个问题经常被低估但它其实是整个消息链里最工程化的一层。在PahoMqttClientAdapter里真正的消息入口不是直接调用inboundMessageSink.onMessage(...)而是先把消息交给inboundExecutor.submit(...)messageArrived(...)handleMessage(topic, message)inboundExecutor.submit(() - inboundMessageSink.onMessage(...))这个设计非常直接地表达了一个原则用户代码不能跑在底层 MQTT 客户端的回调线程里。原因有三个如果 listener 处理慢直接阻塞客户端回调线程会影响后续消息接收。如果 listener 抛异常把客户端线程拖进不可控状态故障边界会变得很模糊。一旦要做线程池配置、削峰、隔离和观测单纯依赖客户端回调线程几乎没有回旋空间。starter 里之所以还要把inboundThreadPool放进MqttBrokerDefinition和配置属性里本质上也是在承认入站线程池是 broker 级别的工程配置而不是某个 adapter 的私有细节。设计决策用户 listener 绝不能直接运行在 MQTT 客户端回调线程里。mqtt-plus 选择先把消息移交给入站执行器再进入统一路由链这不是“多绕一步”而是把 IO 层和业务层真正隔离开。这里顺手也能理解一个对比PahoMqttClientAdapter把“线程切换”写得很显式而SpringIntegrationMqttClientAdapter把更多调度能力交给 Spring Integration 基础设施。但不管外表怎么不同进入MqttInboundMessageSink之后后面的路由链仍然保持统一。七、小结这一篇真正想讲清的是mqtt-plus 的消息路由不是一个“收到消息后立刻调方法”的黑盒而是一条明确的、可扩展的处理链。如果把结论压缩一下可以记住这几件事MqttInboundMessageSink是统一入站入口adapter 负责把底层客户端消息送到这里。DefaultMqttMessageRouter真正串起了resolve - interceptor - convert - invoke - aggregate这条主链。MqttListenerRegistry和MqttTopicMatcher共同决定“这条消息该交给哪些 listener”。ListenerInvoker把“路由到谁”和“方法参数怎么绑定”拆开给 Spring 风格的监听方法留下了空间。“先路由、再转换”与“不要跑在客户端回调线程里”这两个决定看起来像实现细节其实是后续错误处理、拦截器和多 adapter 统一行为的前提。下一篇会继续沿着这条主线往前走但聚焦的问题会更窄一些为什么 mqtt-plus 要把序列化和反序列化拆成两条独立的链以及这种拆分到底换来了什么。系列导航本文是mqtt-plus 架构解析系列的第 2/10 篇。#主题链接1总览分层架构与设计哲学链接2消息路由一条 MQTT 消息如何到达你的MqttListener本文3Payload 序列化与反序列化双链设计的取舍链接4拦截器链MqttMessageInterceptor的扩展点设计链接5错误处理ErrorAction聚合策略的设计逻辑链接6多 Broker 管理如何让一个应用同时连接多个 MQTT 服务链接7动态订阅与重连恢复Reconciler的协调机制链接8Spring Boot 自动装配零件是怎么被粘合起来的链接9测试体系MqttTestTemplate与EmbeddedBroker的设计链接10从内部项目到开源框架mqtt-plus 的抽取过程与决策链接上一篇总览分层架构与设计哲学下一篇Payload 序列化与反序列化双链设计的取舍

更多文章