Skip to content

Commit

Permalink
Remove fast-future (Level/leveldown#638)
Browse files Browse the repository at this point in the history
  • Loading branch information
vweevers committed Aug 11, 2019
1 parent 02076e3 commit 1dcc6c5
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 14 deletions.
5 changes: 5 additions & 0 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,8 @@ struct Iterator {

bool IteratorNext (std::vector<std::pair<std::string, std::string> >& result) {
size_t size = 0;
uint32_t cacheSize = 0;

while (true) {
std::string key, value;
bool ok = Read(key, value);
Expand All @@ -717,6 +719,9 @@ struct Iterator {
size = size + key.size() + value.size();
if (size > highWaterMark_) return true;

// Limit the size of the cache to prevent starving the event loop
// in JS-land while we're recursively calling process.nextTick().
if (++cacheSize >= 1000) return true;
} else {
return false;
}
Expand Down
15 changes: 2 additions & 13 deletions iterator.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const util = require('util')
const AbstractIterator = require('abstract-leveldown').AbstractIterator
const fastFuture = require('fast-future')
const binding = require('./binding')

function Iterator (db, options) {
Expand All @@ -9,7 +8,6 @@ function Iterator (db, options) {
this.context = binding.iterator_init(db.context, options)
this.cache = null
this.finished = false
this.fastFuture = fastFuture()
}

util.inherits(Iterator, AbstractIterator)
Expand All @@ -26,20 +24,11 @@ Iterator.prototype._seek = function (target) {

Iterator.prototype._next = function (callback) {
var that = this
var key
var value

if (this.cache && this.cache.length) {
key = this.cache.pop()
value = this.cache.pop()

this.fastFuture(function () {
callback(null, key, value)
})
process.nextTick(callback, null, this.cache.pop(), this.cache.pop())
} else if (this.finished) {
this.fastFuture(function () {
callback()
})
process.nextTick(callback)
} else {
binding.iterator_next(this.context, function (err, array, finished) {
if (err) return callback(err)
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
},
"dependencies": {
"abstract-leveldown": "~6.0.3",
"fast-future": "~1.0.2",
"napi-macros": "~1.8.2",
"node-gyp-build": "~4.1.0"
},
Expand Down
124 changes: 124 additions & 0 deletions test/iterator-starvation-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
'use strict'

const test = require('tape')
const testCommon = require('./common')
const sourceData = []

// For this test the number of records in the db must be a multiple of
// the hardcoded fast-future limit (1000) or a cache size limit in C++.
for (let i = 0; i < 1e4; i++) {
sourceData.push({
type: 'put',
key: i.toString(),
value: ''
})
}

test('setUp', testCommon.setUp)

test('iterator does not starve event loop', function (t) {
t.plan(6)

const db = testCommon.factory()

db.open(function (err) {
t.ifError(err, 'no open error')

// Insert test data
db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

// Set a high highWaterMark to fill up the cache entirely
const it = db.iterator({ highWaterMark: Math.pow(1024, 3) })

let breaths = 0
let entries = 0
let scheduled = false

// Iterate continuously while also scheduling work with setImmediate(),
// which should be given a chance to run because we limit the tick depth.
const next = function () {
it.next(function (err, key, value) {
if (err || (key === undefined && value === undefined)) {
t.ifError(err, 'no next error')
t.is(entries, sourceData.length, 'got all data')
t.is(breaths, sourceData.length / 1000, 'breathed while iterating')

return db.close(function (err) {
t.ifError(err, 'no close error')
})
}

entries++

if (!scheduled) {
scheduled = true
setImmediate(function () {
breaths++
scheduled = false
})
}

next()
})
}

next()
})
})
})

test('iterator with seeks does not starve event loop', function (t) {
t.plan(6)

const db = testCommon.factory()

db.open(function (err) {
t.ifError(err, 'no open error')

db.batch(sourceData.slice(), function (err) {
t.ifError(err, 'no batch error')

const it = db.iterator({ highWaterMark: Math.pow(1024, 3), limit: sourceData.length })

let breaths = 0
let entries = 0
let scheduled = false

const next = function () {
it.next(function (err, key, value) {
if (err || (key === undefined && value === undefined)) {
t.ifError(err, 'no next error')
t.is(entries, sourceData.length, 'got all data')
t.is(breaths, sourceData.length, 'breathed while iterating')

return db.close(function (err) {
t.ifError(err, 'no close error')
})
}

entries++

if (!scheduled) {
// Seeking clears the cache, which should only have a positive
// effect because it means the cache must be refilled, which
// again gives us time to breathe. This is a smoke test, really.
it.seek(sourceData[0].key)

scheduled = true
setImmediate(function () {
breaths++
scheduled = false
})
}

next()
})
}

next()
})
})
})

test('tearDown', testCommon.tearDown)

0 comments on commit 1dcc6c5

Please sign in to comment.