-
Notifications
You must be signed in to change notification settings - Fork 18
/
index.js
94 lines (81 loc) · 2.63 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
var redis = require('redis');
var PubSub = require('sharedb').PubSub;
// Redis pubsub driver for ShareDB.
//
// The redis driver requires two redis clients (a single redis client can't do
// both pubsub and normal messaging). These clients will be created
// automatically if you don't provide them.
function RedisPubSub(options) {
if (!(this instanceof RedisPubSub)) return new RedisPubSub(options);
PubSub.call(this, options);
options || (options = {});
this.client = options.client || redis.createClient(options);
this._clientConnection = null;
// Redis doesn't allow the same connection to both listen to channels and do
// operations. Make an extra redis connection for subscribing with the same
// options if not provided
this.observer = options.observer || redis.createClient(this.client.options);
this._observerConnection = null;
this._connect();
}
module.exports = RedisPubSub;
RedisPubSub.prototype = Object.create(PubSub.prototype);
RedisPubSub.prototype.close = function(callback) {
if (!callback) {
callback = function(err) {
if (err) throw err;
};
}
var pubsub = this;
PubSub.prototype.close.call(this, function(err) {
if (err) return callback(err);
pubsub._close().then(function() {
callback();
}, callback);
});
};
RedisPubSub.prototype._close = function() {
return this._closing = this._closing || this._connect().then(Promise.all([
this.client.quit(),
this.observer.quit()
]));
};
RedisPubSub.prototype._subscribe = function(channel, callback) {
var pubsub = this;
pubsub.observer
.subscribe(channel, function(message) {
var data = JSON.parse(message);
pubsub._emit(channel, data);
})
.then(function() {
callback();
}, callback);
};
RedisPubSub.prototype._unsubscribe = function(channel, callback) {
this.observer.unsubscribe(channel)
.then(function() {
callback();
}, callback);
};
RedisPubSub.prototype._publish = function(channels, data, callback) {
var message = JSON.stringify(data);
var args = [message].concat(channels);
this.client.eval(PUBLISH_SCRIPT, {arguments: args}).then(function() {
callback();
}, callback);
};
RedisPubSub.prototype._connect = function() {
this._clientConnection = this._clientConnection || connect(this.client);
this._observerConnection = this._observerConnection || connect(this.observer);
return Promise.all([
this._clientConnection,
this._observerConnection
]);
};
function connect(client) {
return client.isOpen ? Promise.resolve() : client.connect();
}
var PUBLISH_SCRIPT =
'for i = 2, #ARGV do ' +
'redis.call("publish", ARGV[i], ARGV[1]) ' +
'end';