diff --git a/package.json b/package.json index fc9c6c3..959eedd 100644 --- a/package.json +++ b/package.json @@ -154,6 +154,7 @@ "dependencies": { "@libp2p/interface-dht": "^1.0.1", "@libp2p/interface-pubsub": "^1.0.4", + "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", "datastore-core": "^7.0.0", "debug": "^4.2.0", @@ -166,7 +167,6 @@ "@libp2p/floodsub": "^3.0.0", "@libp2p/interface-compliance-tests": "^3.0.2", "@libp2p/interface-mocks": "^3.0.1", - "@libp2p/interfaces": "^3.0.3", "@libp2p/peer-id-factory": "^1.0.9", "@libp2p/record": "^2.0.0", "@types/detect-node": "^2.0.0", diff --git a/src/index.js b/src/index.js index 57ee896..6fa40e6 100644 --- a/src/index.js +++ b/src/index.js @@ -11,6 +11,7 @@ const log = logger('datastore-pubsub:publisher') * @typedef {import('@libp2p/interface-peer-id').PeerId} PeerId * @typedef {import('./types').SubscriptionKeyFn} SubscriptionKeyFn * @typedef {import('@libp2p/interface-pubsub').Message} PubSubMessage + * @typedef {import('@libp2p/interfaces').AbortOptions} AbortOptions */ // DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with @@ -63,9 +64,10 @@ export class PubSubDatastore extends BaseDatastore { * * @param {Uint8Array} key - identifier of the value to be published. * @param {Uint8Array} val - value to be propagated. + * @param {AbortOptions} [options] */ // @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays - async put (key, val) { + async put (key, val, options) { if (!(key instanceof Uint8Array)) { const errMsg = 'datastore key does not have a valid format' @@ -92,9 +94,10 @@ export class PubSubDatastore extends BaseDatastore { * Try to subscribe a topic with Pubsub and returns the local value if available. * * @param {Uint8Array} key - identifier of the value to be subscribed. + * @param {AbortOptions} [options] */ // @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays - async get (key) { + async get (key, options) { if (!(key instanceof Uint8Array)) { const errMsg = 'datastore key does not have a valid format' @@ -107,7 +110,7 @@ export class PubSubDatastore extends BaseDatastore { // If already subscribed, just try to get it if (subscriptions && Array.isArray(subscriptions) && subscriptions.indexOf(stringifiedTopic) > -1) { - return this._getLocal(key) + return this._getLocal(key, options) } // subscribe @@ -141,14 +144,15 @@ export class PubSubDatastore extends BaseDatastore { * * @private * @param {Uint8Array} key + * @param {AbortOptions} [options] */ - async _getLocal (key) { + async _getLocal (key, options) { // encode key - base32(/ipns/{cid}) const routingKey = new Key('/' + encodeBase32(key), false) let dsVal try { - dsVal = await this._datastore.get(routingKey) + dsVal = await this._datastore.get(routingKey, options) } catch (/** @type {any} */ err) { if (err.code !== 'ERR_NOT_FOUND') { const errMsg = `unexpected error getting the ipns record for ${routingKey.toString()}` @@ -221,8 +225,9 @@ export class PubSubDatastore extends BaseDatastore { * * @param {Uint8Array} key * @param {Uint8Array} data + * @param {AbortOptions} [options] */ - async _storeIfSubscriptionIsBetter (key, data) { + async _storeIfSubscriptionIsBetter (key, data, options) { let isBetter = false try { @@ -234,7 +239,7 @@ export class PubSubDatastore extends BaseDatastore { } if (isBetter) { - await this._storeRecord(key, data) + await this._storeRecord(key, data, options) } } @@ -303,12 +308,13 @@ export class PubSubDatastore extends BaseDatastore { * * @param {Uint8Array} key * @param {Uint8Array} data + * @param {AbortOptions} [options] */ - async _storeRecord (key, data) { + async _storeRecord (key, data, options) { // encode key - base32(/ipns/{cid}) const routingKey = new Key('/' + encodeBase32(key), false) - await this._datastore.put(routingKey, data) + await this._datastore.put(routingKey, data, options) log(`record for ${keyToTopic(key)} was stored in the datastore`) } }