diff --git a/package.json b/package.json index ccaec9f..8c98f26 100644 --- a/package.json +++ b/package.json @@ -47,9 +47,12 @@ "dependencies": { "debug": "^4.1.1", "interface-datastore": "^5.1.1", + "it-drain": "^1.0.4", "it-filter": "^1.0.2", "it-map": "^1.0.5", "it-merge": "^1.0.1", + "it-pipe": "^1.1.0", + "it-pushable": "^1.4.2", "it-take": "^1.0.1", "uint8arrays": "^2.1.5" }, diff --git a/src/keytransform.js b/src/keytransform.js index 9ba7765..a5fb88f 100644 --- a/src/keytransform.js +++ b/src/keytransform.js @@ -2,6 +2,7 @@ const { Adapter } = require('interface-datastore') const map = require('it-map') +const { pipe } = require('it-pipe') /** * @typedef {import('interface-datastore').Datastore} Datastore @@ -10,9 +11,15 @@ const map = require('it-map') * @typedef {import('interface-datastore').Query} Query * @typedef {import('interface-datastore').KeyQuery} KeyQuery * @typedef {import('interface-datastore').Key} Key + * @typedef {import('interface-datastore').Pair} Pair * @typedef {import('./types').KeyTransform} KeyTransform */ +/** + * @template TEntry + * @typedef {import('interface-store').AwaitIterable} AwaitIterable + */ + /** * A datastore shim, that wraps around a given datastore, changing * the way keys look to the user, for example namespacing @@ -69,6 +76,78 @@ class KeyTransformDatastore extends Adapter { return this.child.delete(this.transform.convert(key), options) } + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * putMany (source, options = {}) { + const transform = this.transform + const child = this.child + + yield * pipe( + source, + async function * (source) { + yield * map(source, ({ key, value }) => ({ + key: transform.convert(key), + value + })) + }, + async function * (source) { + yield * child.putMany(source, options) + }, + async function * (source) { + yield * map(source, ({ key, value }) => ({ + key: transform.invert(key), + value + })) + } + ) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * getMany (source, options = {}) { + const transform = this.transform + const child = this.child + + yield * pipe( + source, + async function * (source) { + yield * map(source, key => transform.convert(key)) + }, + async function * (source) { + yield * child.getMany(source, options) + } + ) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * deleteMany (source, options = {}) { + const transform = this.transform + const child = this.child + + yield * pipe( + source, + async function * (source) { + yield * map(source, key => transform.convert(key)) + }, + async function * (source) { + yield * child.deleteMany(source, options) + }, + async function * (source) { + yield * map(source, key => transform.invert(key)) + } + ) + } + /** * @returns {Batch} */ diff --git a/src/mount.js b/src/mount.js index a5ff3a2..1d5972d 100644 --- a/src/mount.js +++ b/src/mount.js @@ -22,11 +22,6 @@ const Keytransform = require('./keytransform') * @typedef {import('./types').KeyTransform} KeyTransform */ -/** - * @template TEntry - * @typedef {import('./types').AwaitIterable} AwaitIterable - */ - /** * A datastore that can combine multiple stores inside various * key prefixes diff --git a/src/sharding.js b/src/sharding.js index a493556..d506394 100644 --- a/src/sharding.js +++ b/src/sharding.js @@ -22,7 +22,12 @@ const shardReadmeKey = new Key(sh.README_FN) */ /** * @template TValue - * @typedef {import('./types').Await } Await + * @typedef {import('interface-store').Await } Await + */ + +/** + * @template TEntry + * @typedef {import('interface-store').AwaitIterable} AwaitIterable */ /** @@ -162,6 +167,33 @@ class ShardingDatastore extends Adapter { return this.child.delete(key, options) } + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * putMany (source, options = {}) { + yield * this.child.putMany(source, options) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * getMany (source, options = {}) { + yield * this.child.getMany(source, options) + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * deleteMany (source, options = {}) { + yield * this.child.deleteMany(source, options) + } + batch () { return this.child.batch() } diff --git a/src/tiered.js b/src/tiered.js index a988d7f..3bbb7f5 100644 --- a/src/tiered.js +++ b/src/tiered.js @@ -2,6 +2,9 @@ const { Adapter, Errors } = require('interface-datastore') const log = require('debug')('datastore:core:tiered') +const pushable = require('it-pushable') +const drain = require('it-drain') + /** * @typedef {import('interface-datastore').Datastore} Datastore * @typedef {import('interface-datastore').Options} Options @@ -9,6 +12,12 @@ const log = require('debug')('datastore:core:tiered') * @typedef {import('interface-datastore').Query} Query * @typedef {import('interface-datastore').KeyQuery} KeyQuery * @typedef {import('interface-datastore').Key} Key + * @typedef {import('interface-datastore').Pair} Pair + */ + +/** + * @template TEntry + * @typedef {import('interface-store').AwaitIterable} AwaitIterable */ /** @@ -90,6 +99,74 @@ class TieredDatastore extends Adapter { } } + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * putMany (source, options = {}) { + let error + const pushables = this.stores.map(store => { + const source = pushable() + + drain(store.putMany(source, options)) + .catch(err => { + // store threw while putting, make sure we bubble the error up + error = err + }) + + return source + }) + + try { + for await (const pair of source) { + if (error) { + throw error + } + + pushables.forEach(p => p.push(pair)) + + yield pair + } + } finally { + pushables.forEach(p => p.end()) + } + } + + /** + * @param {AwaitIterable} source + * @param {Options} [options] + * @returns {AsyncIterable} + */ + async * deleteMany (source, options = {}) { + let error + const pushables = this.stores.map(store => { + const source = pushable() + + drain(store.deleteMany(source, options)) + .catch(err => { + // store threw while deleting, make sure we bubble the error up + error = err + }) + + return source + }) + + try { + for await (const key of source) { + if (error) { + throw error + } + + pushables.forEach(p => p.push(key)) + + yield key + } + } finally { + pushables.forEach(p => p.end()) + } + } + async close () { await Promise.all(this.stores.map(store => store.close())) }