随着物联网 (IoT) 和微服务架构的日益普及,MQTT 已经成为一个关键的消息传递协议。在本文中,我们将讨论如何在 Spring Boot 应用程序中集成 MQTT,从而为你的应用带来更好的扩展性和响应性。
1. 什么是 MQTT?
MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅协议,专为低带宽、高延迟或不可靠的网络设计。它广泛应用于 IoT 场景中,连接传感器、设备和应用。
2. 为什么选择 MQTT?
- 轻量级:MQTT 的报文头部很小,非常适合于受到带宽限制的网络。
- 消息等级:它支持不同级别的消息传递保证。
- 持久会话:支持持久会话,即客户端和服务器之间的会话状态可以被保留。
- 最后遗言:如果一个客户端断开连接,它可以预先设定一个“最后遗言”消息。
3. Spring Boot 中的 MQTT
要在 Spring Boot 中使用 MQTT,我们首先需要添加相关的依赖。我们将使用 Eclipse Paho Java 客户端作为 MQTT 的客户端库。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.9</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
4. 配置 MQTT
包含接收消息的配置和发送消息的配置
package com.demo.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.UUID;
/**
* mqtt连接配置
*/
@Configuration
public class MqttConfig {
/**
* 创建连接
*
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// mqtt用户名&密码
String userName = "";
String pwd = "";
// mqtt服务地址,可以是多个
options.setServerURIs(new String[]{"tcp://server:1883"});
options.setUserName(userName);
options.setPassword(pwd.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
/**
* 2、接收消息的通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 接收消息
*
* @return
*/
@Bean
public MessageProducer inbound() {
// 订阅主题,保证唯一性
String inClientId = UUID.randomUUID().toString().replaceAll("-", "");
// 最后的#相当于通配符的概念
String[] topic = {"topic_prefix/topic/#"};
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
inClientId,
mqttClientFactory(),
topic);
adapter.setCompletionTimeout(5000);
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// 按字节接收消息
// defaultPahoMessageConverter.setPayloadAsBytes(true);
adapter.setConverter(defaultPahoMessageConverter);
// 设置QoS
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 3、消息处理
* ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String payload = message.getPayload().toString();
// byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 可以根据topic进行处理不同的业务类型
System.out.println("主题[" + topic + "],负载:" + payload);
};
}
/**
* 发送消息的通道
*
* @return
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 发送消息
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outbound() {
// 连接clientId保证唯一
String outClientId = UUID.randomUUID().toString().replaceAll("-", "");
// 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(outClientId, mqttClientFactory());
// 如果设置成true,即异步,发送消息时将不会阻塞。
// messageHandler.setAsync(true);
// 设置默认的topic
// messageHandler.setDefaultTopic("defaultTopic");
// 设置默认QoS
messageHandler.setDefaultQos(1);
// Paho消息转换器
DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// 发送默认按字节类型发送消息
// defaultPahoMessageConverter.setPayloadAsBytes(true);
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
}
5. 消息发送
1. 定义消息发送的接口
package com.demo.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* 定义消息发送的接口
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateWay {
/**
* 发送消息
*
* @param payload 发送的消息
*/
void sendToMqtt(String payload);
/**
* 指定topic消息发送
*
* @param topic 指定topic
* @param payload 消息
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}
2. 定义消息发送的controller
package com.demo.business;
import com.sonli.config.MqttGateWay;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 对外暴露发送消息的controller
*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateWay mqttGateWay;
@PostMapping("/sendMessage")
public String sendMessage(String topic, String message) {
// 发送消息到指定topic
mqttGateWay.sendToMqtt(topic, 1, message);
return "send topic: " + topic + ", message : " + message;
}
}
测试
发送消息
消息的监听,收到的消息
总结
Spring Boot 与 MQTT 的集成为开发者提供了一个简单但功能强大的方式,用于创建 IoT 和消息驱动的应用程序。这种集成不仅确保了消息传递的可靠性和效率,还使得应用程序更具扩展性和响应性。
希望这篇文章能为你提供在 Spring Boot 中集成 MQTT 的指导。如果你有任何问题或需要进一步的指导,请随时留言或联系我们。
您必须登录才能发表评论。