MQTT消息丢失问题处理
MQTT为什么会丢失消息?
Qos等级影响
MQTT消息有三种服务质量(QoS)等级,具体作用如下:
-
QoS 0:最多一次。消息传输不保证消息一定到达,只发送一次,可能丢失。
-
QoS 1:至少一次。保证消息至少送达一次,但可能会重复发送。
-
QoS 2:只有一次。保证消息只送达一次,不会重复。
如果发送消息时选择了QoS 0,由于网络波动等原因导致消息丢失是正常现象。
选择了QoS 1或QoS 2就能保证不丢失吗?
这涉及到消息降级的问题。假如发布者A发送一条QoS 2的消息,但订阅者B选择了QoS 0进行订阅,此时消息会降级,推送给B时采用的是QoS 0,这样发生消息丢失也是正常的情况。
MQTT服务器持久化消息失败
MQTT服务器接收到QoS 1或QoS 2消息后会对消息进行持久化。
当订阅客户端因网络波动掉线后,会尝试重新上线,向服务器发送CONNECT
报文,服务器查询是否存在未完成的消息,并尝试重新发送。在这个过程中,如果消息持久化失败且没有重试机制,就可能导致消息丢失。
在这个过程中,假如消息持久失败没有重试处理,会导致很容易出现消息丢失。
MQTT节点路由
对于MQTT集群,客户端的连接分散在不同节点。当一个节点接收到消息时,需要将消息转发给其他节点。
怎么知道需要发送到那些节点?
EMQX采用的是Mria
数据库来存储节点和客户端的关系,通过主题前缀树匹配主题,找到相关主题后,查找那些节点涉及到相关主题,这样就确定了需要发送节点。
知道发送的节点,怎么去发送了?
EMQX使用的是Erlang/OTP
,将节点两两之间互相连接构成网状结构进行通讯。ThingsBoard在之前版本使用的是GRPC
,为了保证稳定可靠,现在已经换成Kafka
。
综上,消息被错误路由或者路由失败,都可以导致消息丢失,这个是EMQX的文档感兴趣可以看下他们是怎么进行设计。
MQTT节点同步
对于MQTT集群,客户端的连接分散在不同节点。当一个节点接收到消息时,需要将消息转发给其他节点。这时可能会发生消息丢失,因此需要有重试机制来保证消息的可靠传输。
客户端掉线长时间未上线
消息持久化不可能永久保留,而且消息通常具有时效性。如果在发布消息后,订阅客户端掉线且长时间未上线,消息必然会丢失。MQTT 5引入了消息过期时间的概念,可以指定消息的过期时间,同时也要设置默认过期时间。
解决方式
监控和报警机制:设置消息丢失监控和报警机制,及时发现和处理消息丢失问题。
路由关系正确:采用合适的中间件,来维护路由关系。
使用消息队列:可以在MQTT服务器和客户端之间引入消息队列,如Kafka或RabbitMQ,增强消息传递的可靠性。
日志记录和分析:记录消息传递过程中的日志,分析消息丢失的原因,以便采取相应的措施进行优化。
增强持久化机制:确保消息持久化机制的可靠性,例如通过数据库集群、分布式存储等方式提升消息持久化的可靠性和容错性。
网络环境优化:优化网络环境,减少网络波动对消息传递的影响。
评论