"Ascoltatori" 是一个简单的发布/订阅库,支持以下代理/协议:
- Redis,由 @antirez 创建的键/值存储。
- MongoDB,可扩展、高性能的文档型数据库。
- Mosquitto 和所有 MQTT 协议的实现。
- RabbitMQ 和所有 AMQP 协议的实现。
- ZeroMQ,用于在 P2P 模式下使用 Ascoltatori。
- QlobberFSQ,共享文件系统队列。
- Apache Kafka,高吞吐量的分布式消息系统。
安装:
通过 npm 安装库:
$ npm install ascoltatori --save
通过 Git 安装:
$ git clone git://github.com/mcollina/ascoltatori.git
$ cd ascoltatori
$ npm install
入门示例(使用 Redis):
Ascoltatori 专注于为所有支持的代理提供简单且统一的抽象。以下是一个使用 Redis 的简单示例:
var ascoltatori = require('ascoltatori');
ascoltatori.build(function (err, ascoltatore) {
// 订阅主题
ascoltatore.subscribe('hello', function() {
console.log(arguments);
// { '0': 'hello', '1': 'a message' }
});
// 发布消息到主题 'hello'
ascoltatore.publish('hello', 'a message', function() {
console.log('message published');
});
});
通配符支持:
所有 ascoltatori 都支持通配符的使用,因此应该在每个代理上都能正常工作。您可能会发现一些差异,如果是这种情况,请提交错误报告以便修复。
通配符 + 匹配精确一个单词:
var ascoltatori = require('ascoltatori');
ascoltatori.build(function (err, ascoltatore) {
ascoltatore.subscribe("hello/+/world", function() {
// 这将打印 { '0': "hello/there/world", '1': "a message" }
console.log(arguments);
});
ascoltatore.subscribe("hello/+", function() {
// 这不会被调用
console.log(arguments);
});
ascoltatore.publish("hello/there/world", "a message", function() {
console.log("message published");
});
});
通配符 * 匹配零个或多个单词:
var ascoltatori = require('ascoltatori');
ascoltatori.build(function (err, ascoltatore) {
ascoltatore.subscribe("hello/*", function() {
// 这将打印 { '0': "hello/there/world", '1': "a message" }
console.log(arguments);
});
ascoltatore.subscribe("*", function() {
// 这将打印 { '0': "hello/there/world", '1': "a message" }
console.log(arguments);
});
ascoltatore.subscribe("hello/there/world/*", function() {
// 这将打印 { '0': "hello/there/world", '1': "a message" }
console.log(arguments);
});
ascoltatore.publish("hello/there/world", "a message", function() {
console.log("message published");
});
});
当然,您可以在同一订阅中混合使用 * 和 +:
var ascoltatori = require('ascoltatori');
ascoltatori.build(function (err, ascoltatore) {
ascoltatore.subscribe("hello/+/world/*", function() {
// 这将打印 { '0': "hello/foo/world/bar/42", '1': "a message" }
console.log(arguments);
});
ascoltatore.publish("hello/foo/world/bar/42", "a message", function() {
console.log("message published");
});
});
各代理示例:
Ascoltatori 支持不同的代理。以下是如何使用每个代理的示例。
Redis:
var ascoltatori = require('ascoltatori');
var settings = {
type: 'redis',
redis: require('redis'),
db: 12,
port: 6379,
return_buffers: true, // 用于处理二进制数据
host: 'localhost'
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
MongoDB:
MongoDB 使用 Capped Collections 来实现发布/订阅模式。
var ascoltatori = require('ascoltatori');
var settings = {
type: 'mongo',
url: 'mongodb://127.0.0.1/ascoltatori',
pubsubCollection: 'ascoltatori',
mongo: {} // MongoDB 特定选项
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
您还可以重用现有的 MongoDB 连接:
var ascoltatori = require('ascoltatori');
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://127.0.0.1/ascoltatori', {}, function (err, db) {
var settings = {
type: 'mongo',
db: db,
pubsubCollection: 'ascoltatori'
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
})
MQTT(Mosquitto):
var ascoltatori = require('ascoltatori');
var settings = {
type: 'mqtt',
json: false,
mqtt: require('mqtt'),
url: 'mqtt://127.0.0.1:1883'
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
AMQP(RabbitMQ):
var ascoltatori = require('ascoltatori');
var settings = {
type: 'amqp',
json: false,
amqp: require('amqp'),
exchange: 'ascoltatore5672'
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
使用 amqplib:
var ascoltatori = require('ascoltatori');
var settings = {
type: 'amqplib',
json: false,
amqp: require('amqplib/callback_api'),
exchange: 'ascoltatore5672',
queue: 'queueName',
durableQueue: true
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
ZeroMQ:
var ascoltatori = require('ascoltatori');
var settings = {
type: 'zmq',
json: false,
zmq: require("zeromq"),
port: "tcp://127.0.0.1:33333",
controlPort: "tcp://127.0.0.1:33334",
delay: 10
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
QlobberFSQ:
您可以使用任何 QlobberFSQ 构造函数选项,例如:
var ascoltatori = require('ascoltatori');
var settings = {
type: 'filesystem',
json: false,
qlobber_fsq: require("qlobber-fsq"),
fsq_dir: "/shared/fsq"
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
如果不指定 fsq_dir,则消息将写入 qlobber-fsq 模块目录中名为 fsq 的目录中。
Memory:
var ascoltatori = require('ascoltatori');
ascoltatori.build(function (err, ascoltatore) {
// ...
});
JSON:
默认情况下,由 ascoltatori.build 创建的每个 ascoltatore 将每个发布的消息包装在 JSON 格式中。可以通过传递 { json: false } 选项来禁用此行为。
require('ascoltatori').build({ json: false }, function(err, a) {
// ...
});
Apache Kafka:
var ascoltatori = require('ascoltatori');
var settings = {
type: 'kafka',
json: false,
kafka: require("kafka-node"),
connectionString: "localhost:2181",
clientId: "ascoltatori",
groupId: "ascoltatori",
defaultEncoding: "utf8",
encodings: {
image: "buffer"
}
};
ascoltatori.build(settings, function (err, ascoltatore) {
// ...
});
如果发布到不存在的 Kafka 主题,则将使用默认设置创建该主题。如果订阅不存在的 Kafka 主题,则仅在通过 Ascoltatori 发布到该主题时才会生效。
调试:
Ascoltatori 支持调试包,并根据外部环境变量触发日志记录。
$ DEBUG=ascoltatori:mqtt node examples/mqtt_topic_bridge.js
支持的调试标志包括:
- ascoltatori:amqp
- ascoltatori:trie
- ascoltatori:mqtt
- ascoltatori:prefix
- ascoltatori:redis
- ascoltatori:zmq
- ascoltatori:ee2
- ascoltatori:filesystem
- ascoltatori:kafka
可靠性:
由于 Ascoltatori 使用各种传输方式,因此无法保证在所有传输方式上具有各种可靠性属性。但是,MQTT 和 AMQP Ascoltatori 提供至少一次语义,这意味着消息可能会被接收多次,但至少一次。