-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
72 lines (57 loc) · 1.75 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
var Level = require('level')
var mkdirp = require('mkdirp')
var Obv = require('obv')
var peek = require('level-peek')
var Double = require('varstruct/types/numbers').DoubleBE
var pull = require('pull-stream/pull')
var Map = require('pull-stream/throughs/map')
var Read = require('pull-level/read')
var timestamp = require('monotonic-timestamp')
//var createAppend = require('./append')
var Append = require('append-batch')
module.exports = (dir) => {
var since = Obv()
var db = Level(dir, {keyEncoding: Double, valueEncoding: 'json'})
peek.last(db, {}, function (_, upto) {
since.set(upto || -1)
})
var append = Append(function (batch, cb) {
batch = batch.map(function (value) {
return {key: last = timestamp(), value: value, type: 'put'}
})
db.batch(batch, function (err) {
since.set(batch[batch.length - 1].key); cb(null, last)
})
})
return {
dir: dir,
since: since,
get: function (seq, cb) {
db.get(seq, cb)
},
stream: function (opts) {
opts.keys = null //don't accidentially support level api.
opts.keys = opts.seqs //alias flume api to level api.
var seqs = opts.seqs !== false, values = opts.values !== false
function format(seq, value) {
if(seqs && values) return {seq: seq, value: value}
else if(seqs) return seq
else return value
}
//since we are using timestamps inside BE
if(opts.gt < 0) opts.gt = 0
if(opts.gte < 0) opts.gte = 0
opts.keys = true
opts.values = true
opts.sync = false
return pull(
Read(db, opts),
Map(function (data) {
console.log('flumelog-level', data)
return format(data.key, data.value)
})
)
},
append: append
}
}