MQTT简介
MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,设计用于在低带宽和不稳定的网络环境中传输消息。它最初由IBM开发,用于连接远程设备和传感器到网络,并支持发布/订阅模型的消息通信。
MQTT被广泛用于物联网(IoT)领域,其中大量的设备需要进行实时通信和数据交换。它采用了一种发布/订阅(publish/subscribe)模型,其中消息的发送者(发布者)将消息发布到特定的主题(topic),而订阅者可以选择性地订阅感兴趣的主题,以接收相应的消息。
MQTT特点
以下是MQTT的一些关键特点:
轻量级:MQTT的设计非常轻量,协议头部非常小,传输的数据量很小,适用于带宽有限的网络环境,如低速、高延迟或不稳定的网络。
简单:MQTT的协议规范相对简单,易于实现和部署。它定义了少量的消息类型和协议操作,使得开发人员可以快速上手。
异步通信:MQTT使用异步通信模式,发布者发送消息后,不需要等待接收者的响应,可以继续执行其他操作。这种异步通信模式适合在资源有限的设备和网络中工作。
可靠性:MQTT支持三种不同的消息传递质量(QoS)级别:QoS 0(至多一次),QoS 1(至少一次)和QoS 2(只有一次)。这使得可以根据应用程序的要求选择适当的消息交付保证级别。
网络状况适应性:MQTT可以适应不稳定的网络状况,如网络中断、重连等。它具有断开连接后自动重连的机制,可以确保消息的可靠传输。
订阅和发布模型
Publisher(发布者):发布者是消息的发送者,它将消息发布到特定的主题(topic)上。可以有一个或多个发布者。
Subscriber(订阅者):订阅者是对消息感兴趣的实体,它选择性地订阅一个或多个主题。一旦订阅了主题,它就会接收到相应的消息。
MQTT Broker(MQTT代理):MQTT代理是中间件,负责接收发布者发送的消息,并将其路由到对应的订阅者。它维护着主题和订阅关系的注册表,并确保消息的可靠传递。
工作流程如下:
发布者将消息发布到特定的主题上。
MQTT代理接收到消息后,根据订阅者的注册信息,将消息路由到对应的订阅者。
订阅者接收到发布者发布的消息,并进行相应的处理。
通过发布/订阅模型,MQTT允许实现解耦和灵活性,发布者和订阅者之间不需要直接的点对点连接,而是通过MQTT代理进行中转和路由。这种模型非常适合在物联网中进行大规模设备间的通信和数据交换。
MQTT QoS
MQTT(Message Queuing Telemetry Transport)协议支持三种不同的QoS(Quality of Service)级别,用于控制消息的可靠性和传输保证。以下是MQTT的三个QoS级别:
QoS 0(至多一次):
在QoS 0级别下,消息以“至多一次”传输,没有确认机制。消息被发布后,发布者不会接收到关于消息是否成功传输或交付的确认。MQTT代理会尽最大努力将消息传输给订阅者,但可能会出现消息丢失或重复的情况。此级别适用于对消息传输的可靠性要求不高的场景,如传感器数据的临时更新等。
QoS 1(至少一次):
在QoS 1级别下,消息以“至少一次”传输,确保至少传输一次。发布者发送消息后,会等待MQTT代理发送确认消息(PUBACK)来确认消息的接收。
如果发布者没有收到确认消息,它会再次发送相同的消息,直到收到确认为止。MQTT代理会确保消息至少传输一次给订阅者,但可能会出现重复传输的情况。此级别适用于对消息传输的可靠性要求较高的场景,如控制指令的传递。
QoS 2(只有一次):
在QoS 2级别下,消息以“只有一次”传输,确保仅传输一次。发布者发送消息后,会等待MQTT代理发送两个确认消息(PUBREC和PUBCOMP)来确认消息的接收和完成。MQTT代理会确保消息仅传输一次给订阅者,没有重复传输的情况。
此级别提供了最高的消息传输可靠性,但也伴随着更高的网络开销。此级别适用于对消息传输的可靠性要求非常高的场景,如金融交易或严格的数据同步。选择合适的QoS级别取决于应用程序对消息传输可靠性和网络开销的要求。更高的QoS级别提供了更可靠的传输,但同时也增加了网络开销。因此,需要根据具体场景的需求来选择适当的级别。
OpenWrt中使用mosquitto
插件安装
默认是没有包含mosquitto客户端和broker的,我们可以手动安装相关插件,为了测试我们需要安装broker和client
首先更新openwrt软件源
opkg update
然后调用以下命令分别安装mosquitto broker和client,这里我们选用nossl版本,也就是不需要ssl加密,方便测试
opkg install mosquitto-nossl
opkg install mosquitto-client-nossl
mosquitto服务
安装完成后就可以使用broker和client了,首先我们需要启动mosquitto broker服务,
mosquitto broker服务配置文件在/etc/mosquitto/目录中,我们可以修改服务器相关信息,比如监听端口号、接口、ip地址等。
root@OpenWrt:~# ls /etc/mosquitto/mosquitto.conf
/etc/mosquitto/mosquitto.conf
root@OpenWrt:~#
mosquitto客户端
mosquitto客户端包含sub和pub两部分,分别用于订阅和发布
订阅主题:
mosquitto_sub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic>
其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要订阅的主题名称。
示例:
mosquitto_sub -h 192.168.1.1 -p 1883 -t test/topic
发布主题:
mosquitto_pub -h <MQTT Broker IP> -p <MQTT Broker Port> -t <Topic> -m <Message>
其中,是MQTT Broker的IP地址,是MQTT Broker的端口号,是要发布的主题名称,是要发布的消息内容。
示例:
mosquitto_pub -h 192.168.1.1 -p 1883 -t test/topic -m "Hello, MQTT!"
运行结果: 由于订阅和发布客户端都在本地,ip使用localhost地址127.0.0.1
root@OpenWrt:~# mosquitto_sub -h 127.0.0.1 -p 1883 -t test/topic &
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t test/topic -m "hello MQTT."
root@OpenWrt:~# hello MQTT.
在订阅主题时,我们还可以使用通配符,最常用就是通配符"#",通过"#"可以匹配多级topic
比如订阅了主题"test/#",则可以收到"test/"开头的所有topic,比如"test/topic1"、"test/hello"等
以下实例中分别订阅了"test/#"和"test/topic1",当发布"test/topic1"消息时,二者都可以收到,而发布"test/topic2"时只有一个可以收到。
除了通配符"#"之外,还有"$"、"+"等通配符,不在这里详解。
使用云端公共broker测试
emqx提供了公共免费的broker供开发者测试,注意不要在生产环境使用,仅供测试 我们可以准备两台不同的设备,都连接broker.emqx.io,这两台设备可以在不同区域,通过公网broker可以轻松实现两台设备通信。
●客户端1
mosquitto_sub -h broker.emqx.io -p 1883 -i client_001 -t openwrt/topic
●客户端2
发送一条消息到主题openwrt/topic,这样客户端1就可以收到该消息
mosquitto_pub -h broker.emqx.io -p 1883 -t openwrt/topic -i client_002 -m "hello client1, i am froms client2"
OpenWrt中基于libmosquitto开发
前面给大家演示了mosquitto客户端的使用,但命令行客户端仅供测试使用,我们在开发过程中需要自定义消息并且能够实时解析消息,而通过命令行就没那么方便消息的处理了,需要调用mosquitto底层api接口实现想要的功能。
libmosquitto库
在openwrt系统中默认集成了mosquitto库,可以直接依赖调用。 对应依赖的库为:
libmosquitto-nossl 不支持ssl加密
libmosquitto 支持ssl加密。
api接口详解
mosquitto_lib_init:初始化libmosquitto库。在使用其他libmosquitto函数之前,应该首先调用此函数。
mosquitto_lib_version:获取libmosquitto库的版本号信息。
mosquitto_new:创建一个新的mosquitto对象(MQTT客户端)。
mosquitto_connect:与MQTT代理服务器建立连接。
mosquitto_disconnect:断开与MQTT代理服务器的连接。
mosquitto_publish:向指定主题发布消息。
mosquitto_subscribe:订阅一个或多个主题。
mosquitto_unsubscribe:取消订阅一个或多个主题。
mosquitto_loop_start:启动一个线程来处理MQTT消息循环。
mosquitto_loop_forever:开始一个阻塞的循环,处理MQTT消息。
mosquitto_loop:在非阻塞模式下处理MQTT消息。
mosquitto_message_callback_set:设置用于接收订阅消息的回调函数。
mosquitto_username_pw_set:设置连接时使用的用户名和密码。
mosquitto_tls_set:为MQTT连接启用SSL/TLS加密。
mosquitto_tls_opts_set:设置SSL/TLS选项,如CA证书、客户端证书和私钥等。
mosquitto_tls_insecure_set:设置是否允许SSL/TLS连接中的不安全选项。
mosquitto_will_set:设置遗嘱消息,即在客户端异常断开时发布的消息。
基于libmosquitto实现一个消息订阅程序
源码
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
#include <sys/time.h>
#include <sys/sysinfo.h>
struct mosquitto *g_test_mosq = NULL;
void mqtt_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
printf("connect to mqtt server ok\n");
if (MOSQ_ERR_SUCCESS != mosquitto_subscribe(mosq, NULL, "openwrt/#", 0))
{
printf("sub topic openwrt/# failed...\n");
}
else{
printf("sub topic openwrt/# failed...\n");
}
}
void mqtt_disconnect_callback(struct mosquitto *mosq, void *userdata, int result)
{
if (result)
printf("disconnect %s\n", mosquitto_connack_string(result));
else
printf("disconnect from mqtt server.\n");
}
void mqtt_sub_callback(struct mosquitto *mosq,
void *userdata,
int mid,
int qos_count,
const int *granted_qos)
{
printf("sub callback\n");
}
void mqtt_msg_callback(struct mosquitto *mosq,
void *userdata,
struct mosquitto_message *message)
{
printf("callback recv mqtt msg, topic = %s, payload = %s\n", message->topic, message->payload);
}
struct mosquitto *connect_to_mqtt_server(char *server_ip)
{
struct mosquitto *mosq = NULL;
int rc;
char mqtt_user[128] = {0};
char mqtt_pwd[128] = {0};
char client_id[128] = {0};
struct timeval tv;
gettimeofday(&tv, NULL);
mosquitto_lib_init();
snprintf(client_id, sizeof(client_id), "test_%d", tv.tv_sec);
printf("connect to mqtt server..client_id=%s\n", client_id);
mosq = mosquitto_new(client_id, true, NULL);
if (!mosq)
{
return NULL;
}
#if 0
rc = mosquitto_username_pw_set(mosq, "test", "test");
if (rc)
{
mosquitto_destroy(mosq);
return NULL;
}
#endif
mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
mosquitto_message_callback_set(mosq, mqtt_msg_callback);
mosquitto_subscribe_callback_set(mosq, mqtt_sub_callback);
mosquitto_disconnect_callback_set(mosq, mqtt_disconnect_callback);
rc = mosquitto_connect(mosq, server_ip, 1883, 30);
if (rc)
{
printf("Unable to connect mqtt server rc=%d\n", rc);
mosquitto_destroy(mosq);
return NULL;
}
return mosq;
}
int mqtt_bcast_msg(char *api, char *data, int len)
{
char topic[128] = {0};
int mid;
if (!api || !data || len == 0)
return -1;
if (!g_test_mosq)
return -1;
sprintf(topic, "openwrt/%s", api);
return mosquitto_publish(g_test_mosq, &mid, topic, len, data, 0, 0);
}
int main(int argc, char *argv[]){
char *host = NULL;
if (argc < 2){
host = "127.0.0.1";
printf("use default ip: 127.0.0.1\n");
}
else{
host = argv[1];
printf("use ip: %s\n", host);
}
g_test_mosq = connect_to_mqtt_server(host);
if (!g_test_mosq){
printf("connect to server %s failed\n", host);
exit(0);
}
mosquitto_loop_forever(g_test_mosq, -1, 1);
mosquitto_destroy(g_test_mosq);
mosquitto_lib_cleanup();
return 0;
}
实例源码编译
将源码包拷贝到openwrt源码package目录
开启mqtt_test宏并生成默认依赖配置
echo "CONFIG_PACKAGE_mqtt_test=y" >>.config
make defconfig
编译
make package/mqtt_test/compile V=s
插件安装: mqtt_test依赖了libmosquitto库,而libmosquitto依赖了libcares,所以需要安装三个插件
●libcares
●libmosquitto-nossl
●mqtt_test
将插件通过winscp或其他工具上传到openwrt系统中,执行以下命令安装(以X86为例)
opkg install libcares_1.18.1-1_x86_64.ipk
opkg install libmosquitto-nossl_2.0.15-1_x86_64.ipk
opkg install mqtt_test_1.0-1_x86_64.ipk
运行:
mqtt_test默认连接本地broker,也可以指定ip
运行如果出现错误,表示服务器没有启动或者参数异常,请先确认mosquitto服务已经启动。
use default ip: 127.0.0.1
connect to mqtt server..client_id=test_1686993389
Unable to connect mqtt server rc=14
connect to server 127.0.0.1 failed
运行成功
root@OpenWrt:~# mqtt_test
use default ip: 127.0.0.1
connect to mqtt server..client_id=test_1686993598
connect to mqtt server ok
sub callback
通过mosquitto_pub工具可以发送指令到该客户端,客户端当前处理方式是输出收到的消息,当然实际开发是解析指令并执行对应的命令,比如接收到reboot命令后执行重启。
pub命令如下:
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "hello openwrt"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "你好"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"
root@OpenWrt:~# mosquitto_pub -h 127.0.0.1 -p 1883 -t openwrt/send_msg -m "reboot"
客户端输出如下:
callback recv mqtt msg, topic = openwrt/send_msg, payload = hello openwrt
callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
callback recv mqtt msg, topic = openwrt/send_msg, payload = 你好
callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot
callback recv mqtt msg, topic = openwrt/send_msg, payload = reboot
如果客户端连接云端的broker,就可以实现远程操作设备,比如远程重启设备、配置下发等。
总结
在物联网开发中我们会经常用的MQTT协议,常见的就是边缘设备和云端通信,上报实时状态、远程管理等,当然也可以局域网间通信,实现节点间通信,比如可以通过MQTT协议实现mesh数据同步、AC集中管理等,有了MQTT协议我们不需要自己通过底层socket实现私有协议,可以只关注业务处理,可大大提高程序稳定性。
您必须登录才能发表评论。