Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
"dependencies": {
"@libp2p/interface-dht": "^1.0.1",
"@libp2p/interface-pubsub": "^1.0.4",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0",
"datastore-core": "^7.0.0",
"debug": "^4.2.0",
Expand All @@ -166,7 +167,6 @@
"@libp2p/floodsub": "^3.0.0",
"@libp2p/interface-compliance-tests": "^3.0.2",
"@libp2p/interface-mocks": "^3.0.1",
"@libp2p/interfaces": "^3.0.3",
"@libp2p/peer-id-factory": "^1.0.9",
"@libp2p/record": "^2.0.0",
"@types/detect-node": "^2.0.0",
Expand Down
24 changes: 15 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const log = logger('datastore-pubsub:publisher')
* @typedef {import('@libp2p/interface-peer-id').PeerId} PeerId
* @typedef {import('./types').SubscriptionKeyFn} SubscriptionKeyFn
* @typedef {import('@libp2p/interface-pubsub').Message} PubSubMessage
* @typedef {import('@libp2p/interfaces').AbortOptions} AbortOptions
*/

// DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
Expand Down Expand Up @@ -63,9 +64,10 @@ export class PubSubDatastore extends BaseDatastore {
*
* @param {Uint8Array} key - identifier of the value to be published.
* @param {Uint8Array} val - value to be propagated.
* @param {AbortOptions} [options]
*/
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async put (key, val) {
async put (key, val, options) {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'

Expand All @@ -92,9 +94,10 @@ export class PubSubDatastore extends BaseDatastore {
* Try to subscribe a topic with Pubsub and returns the local value if available.
*
* @param {Uint8Array} key - identifier of the value to be subscribed.
* @param {AbortOptions} [options]
*/
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async get (key) {
async get (key, options) {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'

Expand All @@ -107,7 +110,7 @@ export class PubSubDatastore extends BaseDatastore {

// If already subscribed, just try to get it
if (subscriptions && Array.isArray(subscriptions) && subscriptions.indexOf(stringifiedTopic) > -1) {
return this._getLocal(key)
return this._getLocal(key, options)
}

// subscribe
Expand Down Expand Up @@ -141,14 +144,15 @@ export class PubSubDatastore extends BaseDatastore {
*
* @private
* @param {Uint8Array} key
* @param {AbortOptions} [options]
*/
async _getLocal (key) {
async _getLocal (key, options) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
let dsVal

try {
dsVal = await this._datastore.get(routingKey)
dsVal = await this._datastore.get(routingKey, options)
} catch (/** @type {any} */ err) {
if (err.code !== 'ERR_NOT_FOUND') {
const errMsg = `unexpected error getting the ipns record for ${routingKey.toString()}`
Expand Down Expand Up @@ -221,8 +225,9 @@ export class PubSubDatastore extends BaseDatastore {
*
* @param {Uint8Array} key
* @param {Uint8Array} data
* @param {AbortOptions} [options]
*/
async _storeIfSubscriptionIsBetter (key, data) {
async _storeIfSubscriptionIsBetter (key, data, options) {
let isBetter = false

try {
Expand All @@ -234,7 +239,7 @@ export class PubSubDatastore extends BaseDatastore {
}

if (isBetter) {
await this._storeRecord(key, data)
await this._storeRecord(key, data, options)
}
}

Expand Down Expand Up @@ -303,12 +308,13 @@ export class PubSubDatastore extends BaseDatastore {
*
* @param {Uint8Array} key
* @param {Uint8Array} data
* @param {AbortOptions} [options]
*/
async _storeRecord (key, data) {
async _storeRecord (key, data, options) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)

await this._datastore.put(routingKey, data)
await this._datastore.put(routingKey, data, options)
log(`record for ${keyToTopic(key)} was stored in the datastore`)
}
}