MQTT消息丢失问题处理

MQTT为什么会丢失消息?

Qos等级影响

MQTT消息有三种服务质量(QoS)等级,具体作用如下:

  • QoS 0:最多一次。消息传输不保证消息一定到达,只发送一次,可能丢失。

  • QoS 1:至少一次。保证消息至少送达一次,但可能会重复发送。

  • QoS 2:只有一次。保证消息只送达一次,不会重复。

如果发送消息时选择了QoS 0,由于网络波动等原因导致消息丢失是正常现象。

选择了QoS 1或QoS 2就能保证不丢失吗?

这涉及到消息降级的问题。假如发布者A发送一条QoS 2的消息,但订阅者B选择了QoS 0进行订阅,此时消息会降级,推送给B时采用的是QoS 0,这样发生消息丢失也是正常的情况。

MQTT服务器持久化消息失败

MQTT服务器接收到QoS 1或QoS 2消息后会对消息进行持久化。

当订阅客户端因网络波动掉线后,会尝试重新上线,向服务器发送CONNECT报文,服务器查询是否存在未完成的消息,并尝试重新发送。在这个过程中,如果消息持久化失败且没有重试机制,就可能导致消息丢失。

在这个过程中,假如消息持久失败没有重试处理,会导致很容易出现消息丢失。

MQTT节点路由

对于MQTT集群,客户端的连接分散在不同节点。当一个节点接收到消息时,需要将消息转发给其他节点。

怎么知道需要发送到那些节点?

EMQX采用的是Mria数据库来存储节点和客户端的关系,通过主题前缀树匹配主题,找到相关主题后,查找那些节点涉及到相关主题,这样就确定了需要发送节点。

知道发送的节点,怎么去发送了?

EMQX使用的是Erlang/OTP,将节点两两之间互相连接构成网状结构进行通讯。ThingsBoard在之前版本使用的是GRPC,为了保证稳定可靠,现在已经换成Kafka

综上,消息被错误路由或者路由失败,都可以导致消息丢失,这个是EMQX的文档感兴趣可以看下他们是怎么进行设计。

MQTT节点同步

对于MQTT集群,客户端的连接分散在不同节点。当一个节点接收到消息时,需要将消息转发给其他节点。这时可能会发生消息丢失,因此需要有重试机制来保证消息的可靠传输。

客户端掉线长时间未上线

消息持久化不可能永久保留,而且消息通常具有时效性。如果在发布消息后,订阅客户端掉线且长时间未上线,消息必然会丢失。MQTT 5引入了消息过期时间的概念,可以指定消息的过期时间,同时也要设置默认过期时间。

解决方式

监控和报警机制:设置消息丢失监控和报警机制,及时发现和处理消息丢失问题。

路由关系正确:采用合适的中间件,来维护路由关系。

使用消息队列:可以在MQTT服务器和客户端之间引入消息队列,如Kafka或RabbitMQ,增强消息传递的可靠性。

日志记录和分析:记录消息传递过程中的日志,分析消息丢失的原因,以便采取相应的措施进行优化。

增强持久化机制:确保消息持久化机制的可靠性,例如通过数据库集群、分布式存储等方式提升消息持久化的可靠性和容错性。

网络环境优化:优化网络环境,减少网络波动对消息传递的影响。