手把手教你入门AIoT(八)
在这一课里面我们来学习一下 Retained 消息和 LWT。本节课核心内容:
- Retained 消息
- 代码实践:发布和接收 Retained 消息
- LWT
- 代码实践:监控 Client 连接状态
8.1 Retained 消息
让我们来看一下这个场景:
你有一个温度传感器,它每三个小时向一个 Topic 发布当前的温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候能才能收到温度消息?
对的,它必须等到三个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。
怎么来解决这个问题呢?
这个时候就轮到 Retained 消息出场解决这个问题了。Retained 消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。
Retain 消息有以下一些特点:
- 一个 Topic 只能有 1 条 Retained 消息,发布新的 Retained 消息将覆盖老的 Retained 消息;
- 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的 Retained 消息;
- 只有新的订阅者才会收到 Retained 消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到 Retained 消息;
- Retained 消息发送到订阅者时,消息的 Retain 标识仍然是 1,订阅者可以判断这个消息是否是 Retained 消息,以做相应的处理。
注意:Retained 消息和持久性会话没有任何关系,Retained 消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的。
如果你想删除一个 Retained 消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 Retained 消息就可以了。
那么开头我们提到的那个场景的解决方案就很简单了,温度传感器每 3 个小时发布当前的温度的 Retained 消息,那么无论新的订阅者什么时候进行订阅,它都能收到温度传感器上一次发布的数据。
8.2 代码实践:发布和接收 Retained 消息
在这里我们编写一个发布 Retained 消息的发布者,和一个接受消息的订阅端,订阅端在接收消息的时候将消息的 Retain 标识和内容打印出来。
完整的代码 publish_retained.js 如下:
var mqtt = require('mqtt') |
完整的代码 subscribe_retained.js 如下:
var mqtt = require('mqtt') |
我们首先运行 node publish_retained.js,再运行 node subscribe_retained.js,会得到以下输出:
retained: true, temperature: 25
可见我们在 Publisher 发布之后再订阅主题也能收到 Retained 消息。
然后我们再运行一次 node publish_retained.js,在运行 subscribe_retained.js 的终端会有以下输出:
retained: false, temperature: 25
Broker 收到 Retained 消息以后,只是保存这个消息,然后按照正常的转发逻辑转发给订阅者,所以对订阅者来说,这个是一个普通的 MQTT 消息,所以 Retain 标识为 0。
然后 Ctrl+C 关闭掉 subscribe_retained.js,然后重新运行,终端不会有任何输出,可见 Retained 消息只对新订阅的订阅者有效。
8.3 LWT
LWT 全称为 Last Will and Testament,也就是我们在连接到 Broker 时提到的遗愿,包括遗愿主题、遗愿 QoS、遗愿消息等。
顾名思义,当 Broker 检测到 Client 非正常地断开连接的时候,就会向遗愿主题里面发布一条消息。遗愿相关的设置是在建立连接的时候,在 CONNECT 数据包里面指定的。
- Will Flag:是否使用 LWT
- Will Topic:遗愿主题名,不可使用通配符
- Will Qos:发布遗愿消息时使用的 QoS
- Will Retain:遗愿消息的 Retain 标识
- Will Message:遗愿消息内容
Broker 在以下情况下认为 Client 是非正常断开连接的:
- Broker 检测到底层的 I/O 异常;
- Client 未能在 Keep Alive 的间隔内和 Broker 之间有消息交互;
- Client 在关闭底层 TCP 连接前没有发送 DISCONNECT 数据包;
- Broker 因为协议错误关闭和 Client 的连接,比如 Client 发送了一个格式错误的 MQTT 数据包。
如果 Client 通过发布 DISCONNECT 数据包断开连接,这个属于正常断开连接,不会触发 LWT 的机制,同时,Broker 还会丢弃掉这个 Client 在连接时指定的 LWT 参数。
通常,如果我们关心一个设备,比如传感器的连接状态,可以使用 LWT。在接下来的代码实践里面,我们会使用 LWT 和 Retained 消息来实现对一个 Client 的连接状态监控。
8.4 代码实践:监控 Client 连接状态
实现 Client 连接状态监控的原理很简单:
- Client 在连接的时候指定 Will Topic 为“client/status”,遗愿消息为“offline”,Will Retain=1;
- Client 在连接成功以后向同一个主题“client/status”,发布一个内容为“online”的 Retained 消息。
那么订阅者在任何时候订阅“client/status”,都会获取 Client 当前的连接状态。
client.js 代码如下:
var mqtt = require('mqtt') |
monitor.js 代码如下:
var mqtt = require('mqtt') |
在 monitor.js 中,我们每次连接的时候都重新订阅 “client/status”,这样的话每次运行都能收到关于 Client 连接状态的 Retained 消息。
首先运行 node client.js,然后运行 node monitor.js,会得到以下输出:
client is online |
在运行 client.js 的终端上,使用 Ctrl+C 终止 client.js,之后在运行 monitor.js 的终端上会得到以下输出:
client is offline |
然后重新运行 node client.js,在运行 monitor.js 的终端上会得到以下输出:
client is offline |
Ctrl+C 终止 monitor.js,然后重新运行 node monitor.js,会得到以下输出:
client is offline |
这样我们就完美地监控了 Client 的连接状态。
8.5 小结
在这一课我们学习了 Retained 消息和 LWT,并利用这两个特性完成了对 Client 连接状态进行监控的功能,下一课我们将学习 Keep Alive 和在移动端的连接保活。