-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflow_data_provider.js
47 lines (40 loc) · 1.41 KB
/
flow_data_provider.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
var fs = require('fs')
, inherits = require('util').inherits
, EventEmitter = require('events').EventEmitter;
var FlowDataProvider = function FlowDataProvider(db) {
EventEmitter.call(this);
// Save the db
this.db = db;
// Get the collection for the flow
this.collection = this.db.collection("flow");
}
inherits(FlowDataProvider, EventEmitter);
FlowDataProvider.prototype.start = function start(callback) {
var self = this;
// Retry function, it's here for the situation where the collection does not exist
// or if the cursor times out or dies for some reason (this is a new query)
var tailCursorWithRetry = function() {
// Send from current point in time
var currentPointInTime = new Date().getTime();
// Let's connect to the cursor
var cursor = self.collection.find({at: {$gte: currentPointInTime}}, {tailable:true, tailableRetryInterval:1000, numberOfRetries:1000});
var stream = cursor.stream();
// Pass on the data
stream.on("data", function(data) {
// console.log("=========== cursor items:: " + cursor.items.length)
// console.dir(data)
process.nextTick(function() {
self.emit("data", data);
})
})
// Signal end of the cursor
stream.on("end", function() {
setTimeout(tailCursorWithRetry, 1000);
})
};
// Start tailing
tailCursorWithRetry();
// Return
callback(null, null)
}
exports.FlowDataProvider = FlowDataProvider;