利用 MQTT 通信的行为模型来优化物联网部署
MQTT作为一种高度灵活的协议,专为物联网应用中的高效轻量级通信而设计。它基于发布-订阅模型,天然支持客户端之间的解耦关系。然而,在实际使用MQTT时,数据生产者和消费者通常依赖于一套预先定义的指南。
例如:
1) 包含多个MQTT连接的设备必须遵循特定的初始化序列,才能被识别为在线;
2) 触发数据生产者完全功能的准备状态需要预定的消息序列;
3) 客户端引起的合理资源使用。在本文中,我们将介绍基于有限状态机的行为模型。
在本文的最后,将展示一个现已可用的现实世界示例。
MQTT通信上行为策略的需求
由于 MQTT 允许灵活地实现任何这些通信方案,因此数据生产者通常会实现一些控制逻辑来验证这些通信方案。然而,这种方法有一些缺点,因为数据传输是不必要的,并且必须在所有数据消费者中实现相应的控制逻辑。即使数据生成器是根据商定的标准实施的,也需要进行检查来验证这些标准,以克服无意中的错误部署。应用行为策略克服了检查消费者方面的需要。
模型
不论MQTT版本如何,MQTT协议都实现了一个定义好的状态机来处理客户端连接。例如,最初需要一个CONNECT包来建立客户端与代理的连接。一旦连接建立,就会实现特定的状态,并可以发送更多包。接下来,客户端可以发送SUBSCRIBE包订阅一个MQTT主题。之后,客户端发送PUBLISH包将载荷发布到代理。最终,客户端发送DISCONNECT包断开与代理的连接——达到最终状态。
以下示意图展示了我们刚刚描述的客户端行为。请记住,这只是演示客户端连接到代理时随时间进展的情况,并不反映实际实现。
MQTT协议在状态机上的使用示例
一个有限状态机可以简单地指定为由有限的状态和两个状态之间的有限转换组成。在上面的示例中,显示了以下状态和转换:
状态:
已连接(Connected)、已订阅(Subscribed)、已发布(Published)、开始(Start)、结束(End)
转换:
开始(Start)-> CONNECT -> 已连接(Connected)
已连接(Connected)-> SUBSCRIBE -> 已订阅(Subscribed)
已订阅(Subscribed)-> PUBLISH -> 已发布(Published)
已发布(Published)-> DISCONNECT -> 结束(End)
注:以上状态和转换是客户端与MQTT代理交互的简单示例。
目的驱动的协议
大多数用例在MQTT协议提供的状态和转换之上定义了它们的目的驱动协议,主要是基于MQTT负载中的实际数据。
例如:一台设备被认为完全在线,当且仅当以下MQTT包按特定顺序发送:CONNECT
、SUBSCRIBE
、PUBLISH+
、DISCONNECT
,其中PUBLISH+
表示至少发送了一次发布。重点是SUBSCRIBE
包必须在PUBLISH
消息之前发送,以确保设备正确初始化。
为了验证这种序列,定义了一个特定的协议,要求这个特定序列 — 即定义了一个目的驱动协议。
其他场景包括:
1) 必须定义Last Will
以确保有效,以及
2) 在一个小时内不得向目的驱动协议发送重复的负载。
通常,这种客户端行为可以通过自定义代码实现作为额外的微服务来验证,或者通过实现HiveMQ扩展来验证。
行为模型作为目的驱动协议检查器
在本节中,我们将介绍行为模型。行为模型是一个在任何MQTT包之上指定目的驱动协议的有限状态机。状态机具有以下属性:
- 状态:
- 初始状态:行为模型一旦客户端开始与MQTT代理建立连接时的起始状态。
- 终止状态:有两种子类型的状态:
Success
表示客户端成功通过了行为模型检查,Failed
则相反。 - 中间状态:所有其他状态都是在初始和终止之间的中间状态。
- 转换:
- 某个特定事件可能会引起状态转换。这包括至少MQTT包(如
CONNECT
、PUBLISH
等),或更复杂的事件,如时间或甚至外部触发器。 - 一个转换由一个起始状态、一个条件事件和一个目标状态组成。
- 条件事件包括一个事件(如1) MQTT包(如
CONNECT
或PUBLISH
)或一个事件和一个返回true
或false
的条件。
- 某个特定事件可能会引起状态转换。这包括至少MQTT包(如
- 操作:
- 转换可能有额外的行为要执行,例如在到达任何消费者之前修改被发送的负载,或构建自定义指标。
- 记忆:
- 它还具有一个有限的内存来存储和从中加载数据,操作为
store(var, value)
和load(var)
,后者返回存储的值。
- 它还具有一个有限的内存来存储和从中加载数据,操作为
示例:DUPLICATE_COUNTER
为了让概念更加直观,我们来看一个具体的例子:定义一个名为DUPLICATE_COUNTER的行为模型,用于计算两个连续且相同的负载的数量。该行为模型可以如下建模:
MQTT行为模型:DUPLICATE_COUNTER
- 状态:
- 模型有三个状态:
Start
、CONNECTED
和End
。 Start
是一个初始状态,Connected
是一个中间状态,End
是一个成功的终止状态。
- 模型有三个状态:
- 转换和操作:
- 从
Start
到Connected
的转换:当客户端通过MQTT包CONNECT
启动连接时触发。执行转换时,变量counter
初始化为0并存储在内存中。 - 从
Connected
到Connected
的循环:当客户端通过MQTTPUBLISH
包发送负载且条件isIdentical
为true
时触发条件事件。随后执行的操作将变量counter
增加1。 - 从
Connected
到End
的转换:当客户端发送MQTTDISCONNECT
包时,触发转换,状态机达到成功的终止状态。
- 从
计数器在每次连续发布两条相同消息时增加。注意,为了简化,这里没有进一步介绍isIdentical
函数。
行为模型的形式主义允许我们检查MQTT协议的使用是否定义良好。模型可能有两个结果:要么MQTT客户端正确实现了状态机(行为模型以成功的终止状态结束),要么MQTT客户端行为不当(以失败的终止状态结束)。特别是不当的行为可以相应地处理,例如首先在大规模IoT部署中使这些客户端可见,或者通过在状态转换中引入额外的逻辑来纠正行为。特别是后一种情况对于由于缺乏更新能力而难以修复的客户端来说很有趣。
在数据中心的实现
在HiveMQ平台版本4.20中,我们推出了HiveMQ数据中心,它实现了新的行为政策和预定义的数据中心行为模型。行为政策实例化了一个可选择的客户端连接的行为模型检查器。每个在代理中接收到的MQTT包都会被传递给检查器,以确定实例化行为模型的状态转换。此外,状态转换可以导致更多的操作。当客户端行为不当时,甚至可以断开客户端连接或丢弃消息。
下图展示了该架构的总体视图。连接的客户端发来的MQTT包通过代理处理,并由实例化的行为模型检查器(称为策略引擎)检查。策略引擎返回要采取的进一步行动,例如丢弃消息、断开客户端连接或仅记录消息以供进一步检查。
HiveMQ数据中心中的行为模型检查器
因此,每个客户端连接都有一些关于实例化行为模型状态和变量的额外信息。记得我们在上面的示例中创建了一个计数器变量。
然而,使客户端行为可见是一个重要方面。为此,可以通过REST API请求每个客户端连接的当前状态 — 甚至返回存储在内存中的状态变量以进行进一步的调试。
Publish.duplicate
考虑以下描述的行为模型,这是HiveMQ数据中心中一个实际可用的行为模型,类似于上面介绍的示例模型。该行为模型的目的是跟踪实际重复消息的情况,它有两个终端状态。
下面是“Publish.duplicate”行为模型中状态
状态 | 类型 | 描述 |
---|---|---|
初始 Initial | 初始,非终端 | 模型的起始点,一旦客户端符合政策就进入此状态 |
已连接 Connected | 中间,非终端 | 表示客户端已成功连接到代理 |
未重复 NotDuplicated | 中间,非终端 | 表明客户端已发送其第一条消息,或两个连续消息不同 |
重复 Duplicated | 中间,非终端 | 表明客户端已发送与前一条消息相同的消息 |
失败 Violated | 失败,终端 | 当客户端在任何时间点发送两条相同的连续消息并断开连接时,失败状态成为终止状态 |
断开 Disconnected | 成功,终端 | 当客户端始终发送不同的连续消息并断开连接时,断开状态成为终止状态 |
下面显示了该行为模型的状态机。
如您在状态机中看到的,有一个状态用于指示客户端是否发送了重复消息——“Duplicated”状态。
下面的列表显示了准备好使用的完整行为政策:
{
"id": "drop-duplicate-messages",
"createdAt": "2023-11-22T18:02:06.853Z",
"lastUpdatedAt": "2023-11-22T18:38:01.714Z",
"matching": {
"clientIdRegex": ".*"
},
"behavior": {
"id": "Publish.duplicate",
"arguments": {}
},
"onTransitions": [
{
"fromState": "Any.*",
"toState": "Duplicated",
"Mqtt.OnInboundPublish": {
"pipeline": [
{
"id": "operation-GDgvk",
"functionId": "Mqtt.drop",
"arguments": {
"reasonString": "The message you sent was a duplicate message caught by policy ${policyId}."
}
}
]
}
}
]
}
该政策为配置在匹配字段中的每个连接客户端实例化行为模型。在这个例子中,使用了Publish.duplicate行为模型。接下来,在onTransitions字段中定义了额外的操作。在这种特定情况下,每次MQTT客户端发送PUBLISH包导致状态机从Any(通配符状态)进入Duplicated状态时,将执行管道。在这种情况下,执行预定义且可用的Mqtt.drop函数,意味着传入的MQTT PUBLISH包将被丢弃。最终,根据MQTT客户端的版本,向发送客户端提供了一个原因字符串。
这个行为政策已经准备好在您的IoT部署中与HiveMQ数据中心一起使用,以避免在消费者端出现消息重复。
您还可以使用HiveMQ控制中心来创建上述政策。请参见下面的截图,展示了在HiveMQ控制中心创建行为政策的相关部分。
HiveMQ控制中心用于创建行为政策
展示案例
在本节中,我们想展示在HiveMQ数据中心注册行为政策的客户端的语义。下面的动画插图显示了其中的三个部分:
左上角的控制台显示了一个MQTT客户端使用mqtt cli向HiveMQ代理发布数据。客户端以客户端ID“testclient”连接代理。一旦建立连接,就会按以下顺序向主题“test”发布消息:
{ "temperature": 123 }
{ "temperature": 123 }
{ "temperature": 124 }
{ "temperature": 124 }
{ "temperature": 123 }
左下角的控制台显示了一个订阅通配主题的MQTT客户端,以消费所有数据。如您在输出中看到的,客户端只消费不同的连续消息:
{ "temperature": 123 }
{ "temperature": 124 }
{ "temperature": 123 }
右侧的控制台显示了对客户端“testclient”的REST API请求状态。每次客户端发送重复消息时,行为模型就会进入“Duplicated”状态。响应还显示了更多信息。
实际影响
行为模型Publish.duplicate在多个方面展示了其优势,如下所列:
- 提高效率:减少不必要的数据传输提高了网络效率,确保只有相关且及时的数据被传输。
- 降低运营成本:较少的数据传输意味着更低的带宽使用,可能降低与数据存储和处理相关的成本。
- 增强系统性能:系统和网络负担减轻,提高了性能和可靠性。
- 促进更好的数据管理:数据流更加流畅,便于更有效地管理、分析和利用数据。
- 支持可扩展性:在不断增长的物联网网络中,有效的数据传输对于可扩展性至关重要,尤其是在设备数量和数据点可能呈指数级增长的情况下。
结论
将行为模型集成到MQTT通信中对于优化IoT部署至关重要。引入有限状态机确保了对MQTT客户端的系统性验证,强制执行预定义的序列和标准。DUPLICATE_COUNTER行为模型是实际应用的一个例子,用于跟踪如重复消息出现的条件。
随着IoT领域的发展,利用MQTT中的行为模型成为确保大规模IoT场景中的可靠性、效率和成本效益的一种积极策略,这标志着在实现明确定义和优化的基于MQTT的IoT部署方面的重大进步。
回复