fusesource/mqtt-client: A Java MQTT Client (github.com)
概述
MQTT是一种机器对机器(M2M)/“物联网”连接协议。它被设计为一种极轻量级的发布/订阅消息传输协议。它适用于需要小型代码占用空间和/或网络带宽有限的远程位置连接。
mqtt-client提供了一个ASL 2.0许可的MQTT API。它会自动重新连接到您的MQTT服务器,并在发生任何网络故障时恢复客户端会话。应用程序可以使用阻塞式API风格、基于futures的API或回调/继续传递API风格。
使用Maven
将以下内容添加到您的maven pom.xml文件中。
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
使用Gradle
将以下内容添加到您的gradle文件中。
compile 'org.fusesource.mqtt-client:mqtt-client:1.12'
使用其他构建系统
下载uber JAR文件并将其添加到您的构建中。uber包含了mqtt-client依赖的其他项目中的所有精简依赖项。
在Java 1.4上使用
我们还提供了与Java 1.4 JVM兼容的java 1.4 uber JAR文件。此版本的JAR不支持SSL连接,因为用于在NIO上实现SSL的SSLEngine类是在Java 1.5之前引入的。
配置MQTT连接
阻塞、future和回调API都共享相同的连接设置。您可以创建一个MQTT类的新实例,并配置它以进行连接和套接字相关选项。至少在尝试连接之前必须调用setHost方法。
MQTT mqtt = new MQTT();
mqtt.setHost("localhost", 1883);
// 或者
mqtt.setHost("tcp://localhost:1883");
控制MQTT选项
- setClientId:用于设置会话的客户端ID。这是MQTT服务器用于识别在使用setCleanSession(false);时使用的会话的ID。ID必须不超过23个字符。默认为自动生成的ID(基于您的套接字地址、端口和时间戳)。
- setCleanSession:如果要使MQTT服务器在客户端会话之间保留主题订阅和确认位置,设置为false。默认为true。
- setKeepAlive:以秒为单位配置保持活动定时器。定义从客户端接收的消息之间的最大时间间隔。它使服务器能够检测到与客户端的网络连接已断开,而不必等待长时间的TCP/IP超时。
- setUserName:设置用于对服务器进行身份验证的用户名。
- setPassword:设置用于对服务器进行身份验证的密码。
- setWillTopic:如果设置,服务器将发布客户端的Will消息到指定的主题,如果客户端意外断开连接。
- setWillMessage:要发送的Will消息。默认为零长度消息。
- setWillQos:设置将消息的服务质量使用。默认为QoS.AT_MOST_ONCE。
- setWillRetain:如果要发布Will并保留选项,则设置为true。
- setVersion:设置为“3.1.1”以使用MQTT版本3.1.1。否则默认为3.1协议版本。
控制连接重新连接
如果发生任何网络错误,连接将自动重新连接并恢复消息会话。您可以使用以下方法来控制重新连接的尝试频率并定义重新连接尝试的最大次数:
- setConnectAttemptsMax:在客户端第一次尝试连接到服务器时,最大的重新连接尝试次数之前会向客户端报告错误。设置为-1以使用无限次尝试。默认为-1。
- setReconnectAttemptsMax:在以前曾建立过服务器连接后,每次客户端尝试重新连接后,最大的重新连接尝试次数之前会向客户端报告错误。设置为-1以使用无限次尝试。默认为-1。
- setReconnectDelay:在第一次重新连接尝试之前等待的时间(以毫秒为单位)。默认为10。
- setReconnectDelayMax:重新连接尝试之间等待的最大时间(以毫秒为单位)。默认为30,000。
- setReconnectBackOffMultiplier:重新连接尝试之间使用指数退避。设置为1以禁用指数退避。默认为2。
配置套接字选项
您可以使用以下方法调整一些套接字选项:
- setReceiveBufferSize:设置内部套接字接收缓冲区的大小。默认为65536(64k)。
- setSendBufferSize:设置内部套接字发送缓冲区的大小。默认为65536(64k)。
- setTrafficClass:设置IP头中的流量类或服务类型八位字节,用于从传输发送的数据包。默认为8,表示流量应优化以提高吞吐量。
限制连接速率
如果要减慢连接的读取或写入速率,可以使用以下方法:
- setMaxReadRate:设置此传输将以每秒的最大字节数接收数据。此设置可限制读取,以便不超过速率。默认为0,表示禁用限制。
- setMaxWriteRate:设置此传输将以每秒的最大字节数发送数据。此设置可限制写入,以便不超过速率。默认为0,表示禁用限制。
使用SSL连接
如果要使用SSL/TLS而不是TCP进行连接,请在主机字段中使用“ssl://”或“tls://” URI前缀,而不是“tcp://”。要更精细地控制使用的算法。支持的协议值包括:
- ssl:// - 使用JVM默认版本的SSL算法。
- sslv*://
- 使用特定的SSL版本,其中*是您的JVM支持的版本。示例:sslv3
- tls:// - 使用JVM默认版本的TLS算法。
- tlsv:// - 使用特定的TLS版本,其中是您的JVM支持的版本。示例:tlsv1.1
客户端将使用默认的JVM SSLContext,该SSLContext通过JVM系统属性配置,除非您使用setSslContext方法配置MQTT实例。
SSL连接对内部线程池执行阻塞操作,除非您调用setBlockingExecutor方法来配置它们将使用的执行器。
选择分发队列
HawtDispatch分发队列用于同步对连接的访问。如果未通过setDispatchQueue方法配置明确队列,则将为连接创建新队列。如果要使多个连接共享同一个队列以进行同步,设置明确的队列可能会很方便。
使用阻塞式API
MQTT.connectBlocking方法建立连接并为您提供具有阻塞API的连接。
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
使用publish方法将消息发布到主题:
connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);
您可以使用subscribe方法订阅多个主题:
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
byte[] qoses = connection.subscribe(topics);
然后使用receive和ack方法接收和确认消息的消费:
Message message = connection.receive();
System.out.println(message.getTopic());
byte[] payload = message.getPayload();
// 处理消息后执行:
message.ack();
最后要断开连接:
connection.disconnect();
使用基于Future的API
MQTT.connectFuture方法建立连接并为您提供具有基于futures的API的连接。所有针对连接的操作都是非阻塞的,并通过Future返回结果。
FutureConnection connection = mqtt.futureConnection();
Future<Void> f1 = connection.connect();
f1.await();
Future<byte[]> f2 = connection.subscribe(new Topic[]{new Topic(utf8("foo"), QoS.AT_LEAST_ONCE)});
byte[] qoses = f2.await();
// 我们可以开始future接收...
Future<Message> receive = connection.receive();
// 发送消息...
Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);
// 然后接收将获得消息。
Message message = receive.await();
message.ack();
Future<Void> f4 = connection.disconnect();
f4.await();
使用回调/继续传递API
MQTT.connectCallback方法建立连接并为您提供具有回调式API的连接。这是最复杂的API风格,但可以提供最佳性能。未来和阻塞API在底层使用回调API。连接上的所有操作都是非阻塞的,并且操作的结果通过您实现的回调接口传递。
final CallbackConnection connection = mqtt.callbackConnection();
connection.listener(new Listener() {
public void onDisconnected() {
}
public void onConnected() {
}
public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
// 您现在可以处理从主题接收的消息。
// 处理后执行ack runnable。
ack.run();
}
public void onFailure(Throwable value) {
connection.close(null); // 发生了连接故障。
}
})
connection.connect(new Callback<Void>() {
public void onFailure(Throwable value) {
result.failure(value); // 如果无法连接到服务器。
}
// 一旦我们连接上...
public void onSuccess(Void v) {
// 订阅主题
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
connection.subscribe(topics, new Callback<byte[]>() {
public void onSuccess(byte[] qoses) {
// 订阅请求的结果。
}
public void onFailure(Throwable value) {
connection.close(null); // 订阅失败。
}
});
// 发送消息到主题
connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
public void onSuccess(Void v) {
// 发布操作成功完成。
}
public void onFailure(Throwable value) {
connection.close(null); // 发布失败。
}
});
// 断开连接...
connection.disconnect(new Callback<Void>() {
public void onSuccess(Void v) {
// 连接断开后调用。
}
public void onFailure(Throwable value) {
// 断开连接永远不会失败。
}
});
}
});
每个连接都有一个HawtDispatch分发队列,用于处理套接字的IO事件。分发队列是一个Executor,提供IO和处理事件的串行执行,并用于确保连接的同步访问。
回调将在与连接关联的分发队列中执行,因此可以从回调中安全使用连接,但在回调内绝不能执行任何阻塞操作。如果需要执行可能会阻塞的某些处理,必须将其发送到另一个线程池进行处理。此外,如果其他线程需要与连接交互,只能通过将Runnable提交到连接的分发队列来完成。