-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
pg-kinesis-bridge.js
75 lines (65 loc) · 2.29 KB
/
pg-kinesis-bridge.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
const pg = require("pg");
const AWS = require("aws-sdk");
const PgKinesisBridge = exports.PgKinesisBridge = function(pg_config, kinesis_config) {
this.pgclient = new pg.Client(pg_config);
this.kinesis = new AWS.Kinesis(kinesis_config);
this.streams = {}; /* map from streamName to SequenceNumber */
this.channels = {};
this.connected = false;
};
PgKinesisBridge.prototype._onputfail = function(err) {
console.error(err, err.stack);
};
PgKinesisBridge.prototype._onnotify = function(msg) {
for (let streamName in this.streams) {
let params = {
StreamName: streamName,
PartitionKey: msg.channel,
Data: msg.payload,
SequenceNumberForOrdering: this.streams[streamName]
};
this.kinesis.putRecord(params, (function(err, data) {
if (err) {
this._onputfail(err);
} else {
this.streams[streamName] = data.SequenceNumber;
}
}).bind(this));
}
};
/* Produce a "quoted identifier" https://www.postgresql.org/docs/current/static/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS */
/* Quoted identifiers can contain any character, except the character with code zero. (To include a double quote, write two double quotes.) */
const escape_sql_identifier = function(s) {
if (typeof s != "string" || s.length < 1 || s.indexOf("\0") != -1)
throw TypeError("invalid identifier");
return '"' + s.replace('"', '""') + '"';
};
PgKinesisBridge.prototype._listen = function(channel) {
return this.pgclient.query('LISTEN ' + escape_sql_identifier(channel));
};
PgKinesisBridge.prototype.addChannel = function(channel) {
this.channels[channel] = true;
if (this.connected) {
return this._listen(channel).then(() => void 0);
} else {
/* return already resolved promise */
return Promise.resolve();
}
};
PgKinesisBridge.prototype.addStream = function(streamName, SequenceNumber) {
this.streams[streamName] = (SequenceNumber || null);
};
PgKinesisBridge.prototype.connect = function() {
/* Set up DB connection */
return this.pgclient.connect()
.then(() => {
this.pgclient.on("notification", this._onnotify.bind(this));
this.connected = true;
/* Subscribe to channels */
let promise = this.pgclient.query("begin");
for (let channel in this.channels) {
promise = promise.then(() => this._listen(channel));
}
return promise.then(() => this.pgclient.query("commit"));
});
};