This repository has been archived by the owner on Mar 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
Refactor fs datastore to us async/await #22
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
language: node_js | ||
cache: npm | ||
stages: | ||
- check | ||
- test | ||
- cov | ||
|
||
node_js: | ||
- '10' | ||
|
||
os: | ||
- linux | ||
- osx | ||
|
||
script: npx nyc -s npm run test:node -- --bail | ||
after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov | ||
|
||
jobs: | ||
include: | ||
- os: windows | ||
cache: false | ||
|
||
- stage: check | ||
script: | ||
- npx aegir commitlint --travis | ||
- npx aegir dep-check | ||
- npm run lint | ||
|
||
- stage: test | ||
name: chrome | ||
addons: | ||
chrome: stable | ||
script: npx aegir test -t browser -t webworker | ||
|
||
- stage: test | ||
name: firefox | ||
addons: | ||
firefox: latest | ||
script: npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless | ||
|
||
notifications: | ||
email: false |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,18 +4,23 @@ | |
/* :: import type {Batch, Query, QueryResult, Callback} from 'interface-datastore' */ | ||
|
||
const fs = require('graceful-fs') | ||
const pull = require('pull-stream') | ||
const glob = require('glob') | ||
const setImmediate = require('async/setImmediate') | ||
const waterfall = require('async/series') | ||
const each = require('async/each') | ||
const mkdirp = require('mkdirp') | ||
const writeFile = require('fast-write-atomic') | ||
const promisify = require('util').promisify | ||
const writeFile = promisify(require('fast-write-atomic')) | ||
const path = require('path') | ||
|
||
const asyncFilter = require('interface-datastore').utils.asyncFilter | ||
const asyncSort = require('interface-datastore').utils.asyncSort | ||
const filter = require('interface-datastore').utils.filter | ||
const take = require('interface-datastore').utils.take | ||
const map = require('interface-datastore').utils.map | ||
const sortAll = require('interface-datastore').utils.sortAll | ||
const IDatastore = require('interface-datastore') | ||
|
||
const asyncMkdirp = promisify(require('mkdirp')) | ||
const fsAccess = promisify(fs.access) | ||
const fsReadFile = promisify(fs.readFile) | ||
const fsUnlink = promisify(fs.unlink) | ||
|
||
const Key = IDatastore.Key | ||
const Errors = IDatastore.Errors | ||
|
||
|
@@ -57,9 +62,8 @@ class FsDatastore { | |
} | ||
} | ||
|
||
open (callback /* : Callback<void> */) /* : void */ { | ||
open () /* : void */ { | ||
this._openOrCreate() | ||
setImmediate(callback) | ||
} | ||
|
||
/** | ||
|
@@ -150,104 +154,97 @@ class FsDatastore { | |
* | ||
* @param {Key} key | ||
* @param {Buffer} val | ||
* @param {function(Error)} callback | ||
* @returns {void} | ||
* @returns {Promise<void>} | ||
*/ | ||
putRaw (key /* : Key */, val /* : Buffer */, callback /* : Callback<void> */) /* : void */ { | ||
async putRaw (key /* : Key */, val /* : Buffer */) /* : void */ { | ||
const parts = this._encode(key) | ||
const file = parts.file.slice(0, -this.opts.extension.length) | ||
waterfall([ | ||
(cb) => mkdirp(parts.dir, { fs: fs }, cb), | ||
(cb) => writeFile(file, val, cb) | ||
], (err) => callback(err)) | ||
await asyncMkdirp(parts.dir, { fs: fs }) | ||
await writeFile(file, val) | ||
} | ||
|
||
/** | ||
* Store the given value under the key. | ||
* | ||
* @param {Key} key | ||
* @param {Buffer} val | ||
* @param {function(Error)} callback | ||
* @returns {void} | ||
* @returns {Promise<void>} | ||
*/ | ||
put (key /* : Key */, val /* : Buffer */, callback /* : Callback<void> */) /* : void */ { | ||
async put (key /* : Key */, val /* : Buffer */) /* : void */ { | ||
const parts = this._encode(key) | ||
waterfall([ | ||
(cb) => mkdirp(parts.dir, { fs: fs }, cb), | ||
(cb) => writeFile(parts.file, val, cb) | ||
], (err) => { | ||
if (err) { | ||
return callback(Errors.dbWriteFailedError(err)) | ||
} | ||
callback() | ||
}) | ||
try { | ||
await asyncMkdirp(parts.dir, { fs: fs }) | ||
await writeFile(parts.file, val) | ||
} catch (err) { | ||
throw Errors.dbWriteFailedError(err) | ||
} | ||
} | ||
|
||
/** | ||
* Read from the file system without extension. | ||
* | ||
* @param {Key} key | ||
* @param {function(Error, Buffer)} callback | ||
* @returns {void} | ||
* @returns {Promise<Buffer>} | ||
*/ | ||
getRaw (key /* : Key */, callback /* : Callback<Buffer> */) /* : void */ { | ||
async getRaw (key /* : Key */) /* : void */ { | ||
const parts = this._encode(key) | ||
let file = parts.file | ||
file = file.slice(0, -this.opts.extension.length) | ||
fs.readFile(file, (err, data) => { | ||
if (err) { | ||
return callback(Errors.notFoundError(err)) | ||
} | ||
callback(null, data) | ||
}) | ||
let data | ||
try { | ||
data = await fsReadFile(file) | ||
} catch (err) { | ||
throw Errors.notFoundError(err) | ||
} | ||
return data | ||
} | ||
|
||
/** | ||
* Read from the file system. | ||
* | ||
* @param {Key} key | ||
* @param {function(Error, Buffer)} callback | ||
* @returns {void} | ||
* @returns {Promise<Buffer>} | ||
*/ | ||
get (key /* : Key */, callback /* : Callback<Buffer> */) /* : void */ { | ||
async get (key /* : Key */) /* : void */ { | ||
const parts = this._encode(key) | ||
fs.readFile(parts.file, (err, data) => { | ||
if (err) { | ||
return callback(Errors.notFoundError(err)) | ||
} | ||
callback(null, data) | ||
}) | ||
let data | ||
try { | ||
data = await fsReadFile(parts.file) | ||
} catch (err) { | ||
throw Errors.notFoundError(err) | ||
} | ||
return data | ||
} | ||
|
||
/** | ||
* Check for the existence of the given key. | ||
* | ||
* @param {Key} key | ||
* @param {function(Error, bool)} callback | ||
* @returns {void} | ||
* @returns {Promise<bool>} | ||
*/ | ||
has (key /* : Key */, callback /* : Callback<bool> */) /* : void */ { | ||
async has (key /* : Key */) /* : void */ { | ||
const parts = this._encode(key) | ||
fs.access(parts.file, err => { | ||
callback(null, !err) | ||
}) | ||
try { | ||
await fsAccess(parts.file) | ||
} catch (err) { | ||
return false | ||
} | ||
return true | ||
} | ||
|
||
/** | ||
* Delete the record under the given key. | ||
* | ||
* @param {Key} key | ||
* @param {function(Error)} callback | ||
* @returns {void} | ||
* @returns {Promise<void>} | ||
*/ | ||
delete (key /* : Key */, callback /* : Callback<void> */) /* : void */ { | ||
async delete (key /* : Key */) /* : void */ { | ||
const parts = this._encode(key) | ||
fs.unlink(parts.file, (err) => { | ||
if (err) { | ||
return callback(Errors.dbDeleteFailedError(err)) | ||
} | ||
callback() | ||
}) | ||
try { | ||
await fsUnlink(parts.file) | ||
} catch (err) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we ignore the error if it indicates that the file has already been deleted? |
||
throw Errors.dbDeleteFailedError(err) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -265,15 +262,9 @@ class FsDatastore { | |
delete (key /* : Key */) /* : void */ { | ||
deletes.push(key) | ||
}, | ||
commit: (callback /* : (err: ?Error) => void */) => { | ||
waterfall([ | ||
(cb) => each(puts, (p, cb) => { | ||
this.put(p.key, p.value, cb) | ||
}, cb), | ||
(cb) => each(deletes, (k, cb) => { | ||
this.delete(k, cb) | ||
}, cb) | ||
], (err) => callback(err)) | ||
commit: async () /* : Promise<void> */ => { | ||
await Promise.all((puts.map((put) => this.put(put.key, put.value)))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be more efficient to put these in one big |
||
await Promise.all((deletes.map((del) => this.delete(del)))) | ||
} | ||
} | ||
} | ||
|
@@ -282,7 +273,7 @@ class FsDatastore { | |
* Query the store. | ||
* | ||
* @param {Object} q | ||
* @returns {PullStream} | ||
* @returns {Iterable} | ||
*/ | ||
query (q /* : Query<Buffer> */) /* : QueryResult<Buffer> */ { | ||
// glob expects a POSIX path | ||
|
@@ -291,53 +282,46 @@ class FsDatastore { | |
.join(this.path, prefix, '*' + this.opts.extension) | ||
.split(path.sep) | ||
.join('/') | ||
let tasks = [pull.values(glob.sync(pattern))] | ||
|
||
let files = glob.sync(pattern) | ||
let it | ||
if (!q.keysOnly) { | ||
tasks.push(pull.asyncMap((f, cb) => { | ||
fs.readFile(f, (err, buf) => { | ||
if (err) { | ||
return cb(err) | ||
} | ||
cb(null, { | ||
key: this._decode(f), | ||
value: buf | ||
}) | ||
}) | ||
})) | ||
it = map(files, async (f) => { | ||
const buf = await fsReadFile(f) | ||
return { | ||
key: this._decode(f), | ||
value: buf | ||
} | ||
}) | ||
} else { | ||
tasks.push(pull.map(f => ({ key: this._decode(f) }))) | ||
it = map(files, f => ({ key: this._decode(f) })) | ||
} | ||
|
||
if (q.filters != null) { | ||
tasks = tasks.concat(q.filters.map(asyncFilter)) | ||
if (Array.isArray(q.filters)) { | ||
it = q.filters.reduce((it, f) => filter(it, f), it) | ||
} | ||
|
||
if (q.orders != null) { | ||
tasks = tasks.concat(q.orders.map(asyncSort)) | ||
if (Array.isArray(q.orders)) { | ||
it = q.orders.reduce((it, f) => sortAll(it, f), it) | ||
} | ||
|
||
if (q.offset != null) { | ||
let i = 0 | ||
tasks.push(pull.filter(() => i++ >= q.offset)) | ||
it = filter(it, () => i++ >= q.offset) | ||
} | ||
|
||
if (q.limit != null) { | ||
tasks.push(pull.take(q.limit)) | ||
it = take(it, q.limit) | ||
} | ||
|
||
return pull.apply(null, tasks) | ||
return it | ||
} | ||
|
||
/** | ||
* Close the store. | ||
* | ||
* @param {function(Error)} callback | ||
* @returns {void} | ||
* @returns {Promise<void>} | ||
*/ | ||
close (callback /* : (err: ?Error) => void */) /* : void */ { | ||
setImmediate(callback) | ||
} | ||
async close () /* : Promise<void> */ { } | ||
} | ||
|
||
module.exports = FsDatastore |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the
refactor/async-iterators
PR was merged and the branch deleted this is outdated and breaks things.