-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.js
149 lines (131 loc) · 4.63 KB
/
listener.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//Original inspiration:
//https://github.com/vitaly-t/pg-promise/wiki/Robust-Listeners
//Self-ping each second to check
const PING_INTERVAL = 1000;
//Any message with this string will be thrown away.
//We are using a guid to ensure that we won't clash with an actual messages.
const PING_MESSAGE = '0a7735a0-93b6-4830-835b-72d0f552381c';
const SELF_CHECK_MESSAGE = 'f75976d0-dbb6-441f-b62a-264dc689d933';
//Default: retry 10 times, with 5-second intervals
const DEFAULT_RETRY_COUNT = 10;
const DEFAULT_RETRY_INTERVAL = 5000;
const DEFAULT_SELF_CHECK_TIMEOUT = 20000;
module.exports = DatabaseListener = function({
dbConnection, //pg-promise-connection to your database
onDatabaseNotification, //callback for message handling
channel, //name of your channel, i.e. the channel name with NOTIFY in your database
logger = null, //If you don't like console.log, insert your own logger
parseJson = false, //Can your notify-messages be parsed from json?
maxRetryCount = DEFAULT_RETRY_COUNT,
retryInterval = DEFAULT_RETRY_INTERVAL,
selfCheckTimeout = DEFAULT_SELF_CHECK_TIMEOUT,
}) {
if (!dbConnection) throw new Error('DatabaseListener: Missing dbConnection');
if (!channel) throw new Error('DatabaseListener: Missing channel name');
this.logger = logger || console.log;
this.db = dbConnection;
this.channel = channel;
this.onDatabaseNotification = onDatabaseNotification;
this.parseJson = parseJson;
this.maxRetryCount = maxRetryCount;
this.retryInterval = retryInterval;
this.selfCheckTimeout = selfCheckTimeout;
// global connection for permanent event listeners
this.connection = null;
this.selfCheck = () => {
return new Promise((resolve, reject) => {
this.selfCheckCallback = () => resolve(true);
setTimeout(() => resolve(false), this.selfCheckTimeout);
this.connection.none('NOTIFY $1~, $2', [this.channel, SELF_CHECK_MESSAGE]);
});
};
const onNotification = data => {
if (data.payload === PING_MESSAGE) return;
if (data.payload === SELF_CHECK_MESSAGE) {
return this.selfCheckCallback && this.selfCheckCallback();
}
let message = data.payload;
if (this.parseJson) {
try {
message = JSON.parse(data.payload);
} catch (e) {
this.logger(e);
this.logger(data);
}
}
if (this.onDatabaseNotification) {
this.onDatabaseNotification(message);
} else {
this.logger(message);
}
};
const setListeners = client => {
client.on('notification', onNotification);
return this.connection.none('LISTEN $1~', this.channel).catch(error => {
this.logger(error); // unlikely to happen
});
};
const removeListeners = client => {
client.removeListener('notification', onNotification);
};
const onConnectionLost = (err, e) => {
this.logger('Connectivity Problem:', err);
this.connection = null; // prevent use of the connection
removeListeners(e.client);
reconnect(this.retryInterval, this.maxRetryCount)
.then(() => {
this.logger('Successfully Reconnected');
})
.catch(() => {
this.logger('Connection Lost Permanently');
process.exit(); // exiting the process
});
};
const reconnect = (delay, maxAttempts) => {
delay = delay > 0 ? parseInt(delay) : 0;
maxAttempts = maxAttempts > 0 ? parseInt(maxAttempts) : 1;
return new Promise((resolve, reject) => {
setTimeout(() => {
this.db
.connect({direct: true, onLost: onConnectionLost})
.then(obj => {
// global connection is now available
this.connection = obj;
resolve(obj);
return setListeners(obj.client);
})
.catch(error => {
this.logger('Error Connecting:', error);
if (--maxAttempts) {
reconnect(delay, maxAttempts)
.then(resolve)
.catch(reject);
} else {
reject(error);
}
});
}, delay);
});
};
const sendNotifications = () => {
//Initiate a "notify" from the database to check that we are connected
setInterval(() => {
if (this.connection) {
this.connection.none('NOTIFY $1~, $2', [this.channel, PING_MESSAGE]).catch(error => {
this.logger('Failed to Notify:', error); // unlikely to happen
});
}
}, PING_INTERVAL);
};
const init = () => {
return reconnect() // same as reconnect(0, 1)
.then(obj => {
this.logger('Successful Initial database Connection');
sendNotifications();
})
.catch(error => {
this.logger('Failed Initial database Connection:', error);
});
};
init();
};