Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

refactor: async iterators #12

Merged
merged 6 commits into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ dist
test/test-repo/datastore
init-default
datastore-test
.vscode
18 changes: 8 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,20 @@
},
"homepage": "https://github.com/ipfs/js-datastore-level#readme",
"dependencies": {
"datastore-core": "~0.6.0",
"datastore-core": "~0.7.0",
"encoding-down": "^6.0.2",
"interface-datastore": "~0.6.0",
"level-js": "github:timkuijsten/level.js#idbunwrapper",
"interface-datastore": "~0.7.0",
"leveldown": "^5.0.0",
"levelup": "^4.0.1",
"pull-stream": "^3.6.9"
"levelup": "^4.0.1"
},
"devDependencies": {
"aegir": "^15.3.1",
"async": "^2.6.1",
"aegir": "^19.0.3",
"chai": "^4.2.0",
"cids": "~0.5.5",
"cids": "~0.7.1",
"dirty-chai": "^2.0.1",
"flow-bin": "~0.81.0",
"memdown": "^1.4.1",
"flow-bin": "~0.99.0",
"level-js": "^4.0.1",
"memdown": "^4.0.0",
"rimraf": "^2.6.2"
},
"contributors": [
Expand Down
182 changes: 84 additions & 98 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@

/* :: import type {Callback, Batch, Query, QueryResult, QueryEntry} from 'interface-datastore' */

const pull = require('pull-stream')
const levelup = require('levelup')

const asyncFilter = require('interface-datastore').utils.asyncFilter
const asyncSort = require('interface-datastore').utils.asyncSort
const Key = require('interface-datastore').Key
const Errors = require('interface-datastore').Errors
const { Key, Errors, utils } = require('interface-datastore')
const encode = require('encoding-down')

const { filter, map, take, sortAll } = utils

/**
* A datastore backed by leveldb.
*/
Expand Down Expand Up @@ -50,59 +47,53 @@ class LevelDatastore {
)
}

open (callback /* : Callback<void> */) /* : void */ {
this.db.open((err) => {
if (err) {
return callback(Errors.dbOpenFailedError(err))
}
callback()
})
async open () /* : Promise */ {
try {
await this.db.open()
} catch (err) {
throw Errors.dbOpenFailedError(err)
}
}

put (key /* : Key */, value /* : Buffer */, callback /* : Callback<void> */) /* : void */ {
this.db.put(key.toString(), value, (err) => {
if (err) {
return callback(Errors.dbWriteFailedError(err))
}
callback()
})
async put (key /* : Key */, value /* : Buffer */) /* : Promise */ {
try {
await this.db.put(key.toString(), value)
} catch (err) {
throw Errors.dbWriteFailedError(err)
}
}

get (key /* : Key */, callback /* : Callback<Buffer> */) /* : void */ {
this.db.get(key.toString(), (err, data) => {
if (err) {
return callback(Errors.notFoundError(err))
}
callback(null, data)
})
async get (key /* : Key */) /* : Promise */ {
let data
try {
data = await this.db.get(key.toString())
} catch (err) {
if (err.notFound) throw Errors.notFoundError(err)
throw Errors.dbWriteFailedError(err)
}
return data
}

has (key /* : Key */, callback /* : Callback<bool> */) /* : void */ {
this.db.get(key.toString(), (err, res) => {
if (err) {
if (err.notFound) {
callback(null, false)
return
}
callback(err)
return
}

callback(null, true)
})
async has (key /* : Key */) /* : Promise<Boolean> */ {
try {
await this.db.get(key.toString())
} catch (err) {
if (err.notFound) return false
throw err
}
return true
}

delete (key /* : Key */, callback /* : Callback<void> */) /* : void */ {
this.db.del(key.toString(), (err) => {
if (err) {
return callback(Errors.dbDeleteFailedError(err))
}
callback()
})
async delete (key /* : Key */) /* : Promise */ {
try {
await this.db.del(key.toString())
} catch (err) {
throw Errors.dbDeleteFailedError(err)
}
}

close (callback /* : Callback<void> */) /* : void */ {
this.db.close(callback)
close () /* : Promise */ {
return this.db.close()
}

batch () /* : Batch<Buffer> */ {
Expand All @@ -121,8 +112,8 @@ class LevelDatastore {
key: key.toString()
})
},
commit: (callback /* : Callback<void> */) /* : void */ => {
this.db.batch(ops, callback)
commit: () /* : Promise */ => {
return this.db.batch(ops)
}
}
}
Expand All @@ -133,70 +124,65 @@ class LevelDatastore {
values = !q.keysOnly
}

const iter = this.db.db.iterator({
keys: true,
values: values,
keyAsBuffer: true
})

const rawStream = (end, cb) => {
if (end) {
return iter.end((err) => {
cb(err || end)
})
}

iter.next((err, key, value) => {
if (err) {
return cb(err)
}

if (err == null && key == null && value == null) {
return iter.end((err) => {
cb(err || true)
})
}

const res /* : QueryEntry<Buffer> */ = {
key: new Key(key, false)
}

if (values) {
res.value = Buffer.from(value)
}

cb(null, res)
let it = levelIteratorToIterator(
this.db.db.iterator({
keys: true,
values: values,
keyAsBuffer: true
})
}
)

let tasks = [rawStream]
let filters = []
it = map(it, ({ key, value }) => {
const res /* : QueryEntry<Buffer> */ = { key: new Key(key, false) }
if (values) {
res.value = Buffer.from(value)
}
return res
})

if (q.prefix != null) {
const prefix = q.prefix
filters.push((e, cb) => cb(null, e.key.toString().startsWith(prefix)))
it = filter(it, e => e.key.toString().startsWith(q.prefix))
}

if (q.filters != null) {
filters = filters.concat(q.filters)
if (Array.isArray(q.filters)) {
it = q.filters.reduce((it, f) => filter(it, f), it)
}

tasks = tasks.concat(filters.map(f => asyncFilter(f)))

if (q.orders != null) {
tasks = tasks.concat(q.orders.map(o => asyncSort(o)))
if (Array.isArray(q.orders)) {
it = q.orders.reduce((it, f) => sortAll(it, f), it)
}

if (q.offset != null) {
let i = 0
tasks.push(pull.filter(() => i++ >= q.offset))
it = filter(it, () => i++ >= q.offset)
}

if (q.limit != null) {
tasks.push(pull.take(q.limit))
it = take(it, q.limit)
}

return pull.apply(null, tasks)
return it
}
}

function levelIteratorToIterator (li) {
return {
next: () => new Promise((resolve, reject) => {
li.next((err, key, value) => {
if (err) return reject(err)
if (key == null) return resolve({ done: true })
resolve({ done: false, value: { key, value } })
})
}),
return: () => new Promise((resolve, reject) => {
li.end(err => {
if (err) return reject(err)
resolve({ done: true })
})
}),
[Symbol.asyncIterator] () {
return this
}
}
}

Expand Down
41 changes: 24 additions & 17 deletions test/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
/* eslint-env mocha */
'use strict'

const each = require('async/each')
const MountStore = require('datastore-core').MountDatastore
const Key = require('interface-datastore').Key
const { MountDatastore } = require('datastore-core')
const { Key } = require('interface-datastore')

// leveldown will be swapped for level-js
const leveljs = require('leveldown')
Expand All @@ -14,31 +13,39 @@ const LevelStore = require('../src')
describe('LevelDatastore', () => {
describe('interface-datastore (leveljs)', () => {
require('interface-datastore/src/tests')({
setup (callback) {
callback(null, new LevelStore('hello', {db: leveljs}))
},
teardown (callback) {
leveljs.destroy('hello', callback)
}
setup: () => new LevelStore('hello', { db: leveljs }),
teardown: () => new Promise((resolve, reject) => {
leveljs.destroy('hello', err => {
if (err) return reject(err)
resolve()
})
})
})
})

describe('interface-datastore (mount(leveljs, leveljs, leveljs))', () => {
require('interface-datastore/src/tests')({
setup (callback) {
callback(null, new MountStore([{
setup () {
return new MountDatastore([{
prefix: new Key('/a'),
datastore: new LevelStore('one', {db: leveljs})
datastore: new LevelStore('one', { db: leveljs })
}, {
prefix: new Key('/q'),
datastore: new LevelStore('two', {db: leveljs})
datastore: new LevelStore('two', { db: leveljs })
}, {
prefix: new Key('/z'),
datastore: new LevelStore('three', {db: leveljs})
}]))
datastore: new LevelStore('three', { db: leveljs })
}])
},
teardown (callback) {
each(['one', 'two', 'three'], leveljs.destroy.bind(leveljs), callback)
teardown () {
return Promise.all(['one', 'two', 'three'].map(dir => {
return new Promise((resolve, reject) => {
leveljs.destroy(dir, err => {
if (err) return reject(err)
resolve()
})
})
}))
}
})
})
Expand Down
Loading