From b599a41ad16f42e885cfcb37a3a93db20f6ead86 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Fri, 22 Feb 2019 11:23:00 -0800 Subject: [PATCH 1/2] refactor: update fs datastore to use async/await --- package.json | 9 +-- src/index.js | 178 +++++++++++++++++++++------------------------ test/index.spec.js | 97 ++++++++++-------------- 3 files changed, 123 insertions(+), 161 deletions(-) diff --git a/package.json b/package.json index ae8bb05..34f7bfd 100644 --- a/package.json +++ b/package.json @@ -35,16 +35,15 @@ "homepage": "https://github.com/ipfs/js-datastore-fs#readme", "dependencies": { "async": "^2.6.1", - "datastore-core": "~0.6.0", + "datastore-core": "git://github.com:zcstarr/js-datastore-core.git", "fast-write-atomic": "~0.2.0", "glob": "^7.1.3", "graceful-fs": "^4.1.11", - "interface-datastore": "~0.6.0", - "mkdirp": "~0.5.1", - "pull-stream": "^3.6.9" + "interface-datastore": "git://github.com/ipfs/interface-datastore.git#refactor/async-iterators", + "mkdirp": "~0.5.1" }, "devDependencies": { - "aegir": "^15.3.1", + "aegir": "^18.2.0", "chai": "^4.2.0", "cids": "~0.5.5", "dirty-chai": "^2.0.1", diff --git a/src/index.js b/src/index.js index d5b1826..ce7b98e 100644 --- a/src/index.js +++ b/src/index.js @@ -4,18 +4,23 @@ /* :: import type {Batch, Query, QueryResult, Callback} from 'interface-datastore' */ const fs = require('graceful-fs') -const pull = require('pull-stream') const glob = require('glob') -const setImmediate = require('async/setImmediate') -const waterfall = require('async/series') -const each = require('async/each') const mkdirp = require('mkdirp') -const writeFile = require('fast-write-atomic') +const promisify = require('util').promisify +const writeFile = promisify(require('fast-write-atomic')) const path = require('path') -const asyncFilter = require('interface-datastore').utils.asyncFilter -const asyncSort = require('interface-datastore').utils.asyncSort +const filter = require('interface-datastore').utils.filter +const take = require('interface-datastore').utils.take +const map = require('interface-datastore').utils.map +const sortAll = require('interface-datastore').utils.sortAll const IDatastore = require('interface-datastore') + +const asyncMkdirp = promisify(require('mkdirp')) +const fsAccess = promisify(fs.access) +const fsReadFile = promisify(fs.readFile) +const fsUnlink = promisify(fs.unlink) + const Key = IDatastore.Key const Errors = IDatastore.Errors @@ -57,9 +62,8 @@ class FsDatastore { } } - open (callback /* : Callback */) /* : void */ { + open () /* : void */ { this._openOrCreate() - setImmediate(callback) } /** @@ -150,16 +154,13 @@ class FsDatastore { * * @param {Key} key * @param {Buffer} val - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - putRaw (key /* : Key */, val /* : Buffer */, callback /* : Callback */) /* : void */ { + async putRaw (key /* : Key */, val /* : Buffer */) /* : void */ { const parts = this._encode(key) const file = parts.file.slice(0, -this.opts.extension.length) - waterfall([ - (cb) => mkdirp(parts.dir, { fs: fs }, cb), - (cb) => writeFile(file, val, cb) - ], (err) => callback(err)) + await asyncMkdirp(parts.dir, { fs: fs }) + await writeFile(file, val) } /** @@ -167,87 +168,83 @@ class FsDatastore { * * @param {Key} key * @param {Buffer} val - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - put (key /* : Key */, val /* : Buffer */, callback /* : Callback */) /* : void */ { + async put (key /* : Key */, val /* : Buffer */) /* : void */ { const parts = this._encode(key) - waterfall([ - (cb) => mkdirp(parts.dir, { fs: fs }, cb), - (cb) => writeFile(parts.file, val, cb) - ], (err) => { - if (err) { - return callback(Errors.dbWriteFailedError(err)) - } - callback() - }) + try { + await asyncMkdirp(parts.dir, { fs: fs }) + await writeFile(parts.file, val) + } catch (err) { + throw Errors.dbWriteFailedError(err) + } } /** * Read from the file system without extension. * * @param {Key} key - * @param {function(Error, Buffer)} callback - * @returns {void} + * @returns {Promise} */ - getRaw (key /* : Key */, callback /* : Callback */) /* : void */ { + async getRaw (key /* : Key */) /* : void */ { const parts = this._encode(key) let file = parts.file file = file.slice(0, -this.opts.extension.length) - fs.readFile(file, (err, data) => { - if (err) { - return callback(Errors.notFoundError(err)) - } - callback(null, data) - }) + let data + try { + data = await fsReadFile(file) + } catch (err) { + throw Errors.notFoundError(err) + } + return data } /** * Read from the file system. * * @param {Key} key - * @param {function(Error, Buffer)} callback - * @returns {void} + * @returns {Promise} */ - get (key /* : Key */, callback /* : Callback */) /* : void */ { + async get (key /* : Key */) /* : void */ { const parts = this._encode(key) - fs.readFile(parts.file, (err, data) => { - if (err) { - return callback(Errors.notFoundError(err)) - } - callback(null, data) - }) + let data + try { + data = await fsReadFile(parts.file) + } catch (err) { + throw Errors.notFoundError(err) + } + return data } /** * Check for the existence of the given key. * * @param {Key} key - * @param {function(Error, bool)} callback - * @returns {void} + * @returns {Promise} */ - has (key /* : Key */, callback /* : Callback */) /* : void */ { + async has (key /* : Key */) /* : void */ { const parts = this._encode(key) - fs.access(parts.file, err => { - callback(null, !err) - }) + try { + await fsAccess(parts.file) + } catch (err) { + return false + } + return true } /** * Delete the record under the given key. * * @param {Key} key - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - delete (key /* : Key */, callback /* : Callback */) /* : void */ { + async delete (key /* : Key */) /* : void */ { const parts = this._encode(key) - fs.unlink(parts.file, (err) => { - if (err) { - return callback(Errors.dbDeleteFailedError(err)) - } - callback() - }) + try { + await fsUnlink(parts.file) + } catch (err) { + throw Errors.dbDeleteFailedError(err) + } } /** @@ -265,15 +262,9 @@ class FsDatastore { delete (key /* : Key */) /* : void */ { deletes.push(key) }, - commit: (callback /* : (err: ?Error) => void */) => { - waterfall([ - (cb) => each(puts, (p, cb) => { - this.put(p.key, p.value, cb) - }, cb), - (cb) => each(deletes, (k, cb) => { - this.delete(k, cb) - }, cb) - ], (err) => callback(err)) + commit: async () /* : Promise */ => { + await Promise.all((puts.map((put) => this.put(put.key, put.value)))) + await Promise.all((deletes.map((del) => this.delete(del)))) } } } @@ -282,7 +273,7 @@ class FsDatastore { * Query the store. * * @param {Object} q - * @returns {PullStream} + * @returns {Iterable} */ query (q /* : Query */) /* : QueryResult */ { // glob expects a POSIX path @@ -291,53 +282,46 @@ class FsDatastore { .join(this.path, prefix, '*' + this.opts.extension) .split(path.sep) .join('/') - let tasks = [pull.values(glob.sync(pattern))] - + let files = glob.sync(pattern) + let it if (!q.keysOnly) { - tasks.push(pull.asyncMap((f, cb) => { - fs.readFile(f, (err, buf) => { - if (err) { - return cb(err) - } - cb(null, { - key: this._decode(f), - value: buf - }) - }) - })) + it = map(files, async (f) => { + const buf = await fsReadFile(f) + return { + key: this._decode(f), + value: buf + } + }) } else { - tasks.push(pull.map(f => ({ key: this._decode(f) }))) + it = map(files, f => ({ key: this._decode(f) })) } - if (q.filters != null) { - tasks = tasks.concat(q.filters.map(asyncFilter)) + if (Array.isArray(q.filters)) { + it = q.filters.reduce((it, f) => filter(it, f), it) } - if (q.orders != null) { - tasks = tasks.concat(q.orders.map(asyncSort)) + if (Array.isArray(q.orders)) { + it = q.orders.reduce((it, f) => sortAll(it, f), it) } if (q.offset != null) { let i = 0 - tasks.push(pull.filter(() => i++ >= q.offset)) + it = filter(it, () => i++ >= q.offset) } if (q.limit != null) { - tasks.push(pull.take(q.limit)) + it = take(it, q.limit) } - return pull.apply(null, tasks) + return it } /** * Close the store. * - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - close (callback /* : (err: ?Error) => void */) /* : void */ { - setImmediate(callback) - } + async close () /* : Promise */ { } } module.exports = FsDatastore diff --git a/test/index.spec.js b/test/index.spec.js index 9e7e275..2f1076f 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -5,14 +5,12 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const pull = require('pull-stream') const path = require('path') +const promisify = require('util').promisify const mkdirp = require('mkdirp') -const rimraf = require('rimraf') -const waterfall = require('async/waterfall') -const parallel = require('async/parallel') +const rimraf = promisify(require('rimraf')) const fs = require('fs') - +const fsReadFile = promisify(require('fs').readFile) const Key = require('interface-datastore').Key const utils = require('interface-datastore').utils const ShardingStore = require('datastore-core').ShardingDatastore @@ -20,8 +18,8 @@ const sh = require('datastore-core').shard const FsStore = require('../src') -describe('FsDatastore', () => { - describe('construction', () => { +describe('FsDatastore', async () => { + describe('construction', async () => { it('defaults - folder missing', () => { const dir = utils.tmpdir() expect( @@ -40,7 +38,7 @@ describe('FsDatastore', () => { it('createIfMissing: false - folder missing', () => { const dir = utils.tmpdir() expect( - () => new FsStore(dir, {createIfMissing: false}) + () => new FsStore(dir, { createIfMissing: false }) ).to.throw() }) @@ -48,7 +46,7 @@ describe('FsDatastore', () => { const dir = utils.tmpdir() mkdirp.sync(dir) expect( - () => new FsStore(dir, {errorIfExists: true}) + () => new FsStore(dir, { errorIfExists: true }) ).to.throw() }) }) @@ -71,71 +69,52 @@ describe('FsDatastore', () => { ) }) - it('sharding files', (done) => { + it('sharding files', async () => { const dir = utils.tmpdir() const fstore = new FsStore(dir) const shard = new sh.NextToLast(2) - waterfall([ - (cb) => ShardingStore.create(fstore, shard, cb), - (cb) => fs.readFile(path.join(dir, sh.SHARDING_FN), cb), - (file, cb) => { - expect(file.toString()).to.be.eql('/repo/flatfs/shard/v1/next-to-last/2\n') - fs.readFile(path.join(dir, sh.README_FN), cb) - }, - (readme, cb) => { - expect(readme.toString()).to.be.eql(sh.readme) - cb() - }, - (cb) => rimraf(dir, cb) - ], done) + await ShardingStore.create(fstore, shard) + + const file = await fsReadFile(path.join(dir, sh.SHARDING_FN)) + expect(file.toString()).to.be.eql('/repo/flatfs/shard/v1/next-to-last/2\n') + + const readme = await fsReadFile(path.join(dir, sh.README_FN)) + expect(readme.toString()).to.be.eql(sh.readme) + await rimraf(dir) }) - it('query', (done) => { + it('query', async () => { const fs = new FsStore(path.join(__dirname, 'test-repo', 'blocks')) - - pull( - fs.query({}), - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.have.length(23) - done() - }) - ) + let res = [] + for await (const q of fs.query({})) { + res.push(q) + } + expect(res).to.have.length(23) }) - it('interop with go', (done) => { + it('interop with go', async () => { const repodir = path.join(__dirname, '/test-repo/blocks') const fstore = new FsStore(repodir) const key = new Key('CIQGFTQ7FSI2COUXWWLOQ45VUM2GUZCGAXLWCTOKKPGTUWPXHBNIVOY') const expected = fs.readFileSync(path.join(repodir, 'VO', key.toString() + '.data')) - - waterfall([ - (cb) => ShardingStore.open(fstore, cb), - (flatfs, cb) => parallel([ - (cb) => pull( - flatfs.query({}), - pull.collect(cb) - ), - (cb) => flatfs.get(key, cb) - ], (err, res) => { - expect(err).to.not.exist() - expect(res[0]).to.have.length(23) - expect(res[1]).to.be.eql(expected) - - cb() - }) - ], done) + const flatfs = await ShardingStore.open(fstore) + let res = await flatfs.get(key) + let queryResult = flatfs.query({}) + let results = [] + for await (const result of queryResult) results.push(result) + expect(results).to.have.length(23) + expect(res).to.be.eql(expected) }) describe('interface-datastore', () => { const dir = utils.tmpdir() require('interface-datastore/src/tests')({ - setup (callback) { - callback(null, new FsStore(dir)) + setup: () => { + return new FsStore(dir) }, - teardown (callback) { - rimraf(dir, callback) + teardown: () => { + return rimraf(dir) } }) }) @@ -144,12 +123,12 @@ describe('FsDatastore', () => { const dir = utils.tmpdir() require('interface-datastore/src/tests')({ - setup (callback) { + setup: () => { const shard = new sh.NextToLast(2) - ShardingStore.createOrOpen(new FsStore(dir), shard, callback) + return ShardingStore.createOrOpen(new FsStore(dir), shard) }, - teardown (callback) { - rimraf(dir, callback) + teardown: () => { + return rimraf(dir) } }) }) From 4ca128c0d4259a4ebbcd84fa1ea76e6babc10f17 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Mon, 11 Mar 2019 08:53:20 -0700 Subject: [PATCH 2/2] ci: add travis ci support --- .travis.yml | 42 ++++++++++++++++++++++++++++++++++++++++++ ci/Jenkinsfile | 2 -- 2 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 .travis.yml delete mode 100644 ci/Jenkinsfile diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..d1036e2 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,42 @@ +language: node_js +cache: npm +stages: + - check + - test + - cov + +node_js: + - '10' + +os: + - linux + - osx + +script: npx nyc -s npm run test:node -- --bail +after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov + +jobs: + include: + - os: windows + cache: false + + - stage: check + script: + - npx aegir commitlint --travis + - npx aegir dep-check + - npm run lint + + - stage: test + name: chrome + addons: + chrome: stable + script: npx aegir test -t browser -t webworker + + - stage: test + name: firefox + addons: + firefox: latest + script: npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless + +notifications: + email: false \ No newline at end of file diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile deleted file mode 100644 index a7da2e5..0000000 --- a/ci/Jenkinsfile +++ /dev/null @@ -1,2 +0,0 @@ -// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. -javascript()