Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
node_modules
build
dist
.docs
.coverage
node_modules
package-lock.json
yarn.lock
.vscode
24 changes: 5 additions & 19 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,6 @@
},
"type": "module",
"types": "./dist/src/index.d.ts",
"typesVersions": {
"*": {
"*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
],
"src/*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
]
}
},
"files": [
"src",
"dist",
Expand Down Expand Up @@ -162,18 +146,20 @@
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interfaces": "^3.2.0",
"@libp2p/logger": "^2.0.0",
"datastore-core": "^8.0.1",
"interface-datastore": "^7.0.0",
"datastore-core": "^9.1.0",
"interface-datastore": "^8.2.0",
"interface-store": "^5.1.0",
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@libp2p/floodsub": "^6.0.0",
"@libp2p/interface-connection-manager": "^1.3.8",
"@libp2p/interface-mocks": "^9.0.0",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/record": "^3.0.0",
"aegir": "^37.10.0",
"aegir": "^38.1.8",
"p-wait-for": "^5.0.0",
"sinon": "^15.0.1"
}
Expand Down
140 changes: 56 additions & 84 deletions src/index.js → src/index.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
import { Key } from 'interface-datastore'
import { Datastore, Key } from 'interface-datastore'
import { BaseDatastore } from 'datastore-core'
import { encodeBase32, keyToTopic, topicToKey } from './utils.js'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { CodeError } from '@libp2p/interfaces/errors'
import { logger } from '@libp2p/logger'
import type { Message, PubSub } from '@libp2p/interface-pubsub'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { SelectFn, ValidateFn } from '@libp2p/interface-dht'
import type { AbortOptions } from 'interface-store'

const log = logger('datastore-pubsub:publisher')

export interface SubscriptionKeyFn { (key: Uint8Array): Promise<Uint8Array> | Uint8Array }

/**
* @typedef {import('@libp2p/interface-peer-id').PeerId} PeerId
* @typedef {import('./types').SubscriptionKeyFn} SubscriptionKeyFn
* @typedef {import('@libp2p/interface-pubsub').Message} PubSubMessage
* @typedef {import('@libp2p/interfaces').AbortOptions} AbortOptions
* DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
* [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js}
*/

// DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
// [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js}
export class PubSubDatastore extends BaseDatastore {
/**
* Creates an instance of DatastorePubsub.
*
* @param {import('@libp2p/interface-pubsub').PubSub} pubsub - pubsub implementation
* @param {import('interface-datastore').Datastore} datastore - datastore instance
* @param {PeerId} peerId - peer-id instance
* @param {import('@libp2p/interface-dht').ValidateFn} validator - validator function
* @param {import('@libp2p/interface-dht').SelectFn} selector - selector function
* @param {SubscriptionKeyFn} [subscriptionKeyFn] - function to manipulate the key topic received before processing it
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, selector, subscriptionKeyFn) {
private readonly _pubsub: PubSub
private readonly _datastore: Datastore
private readonly _peerId: PeerId
private readonly _validator: ValidateFn
private readonly _selector: SelectFn
private readonly _handleSubscriptionKeyFn?: SubscriptionKeyFn

constructor (pubsub: PubSub, datastore: Datastore, peerId: PeerId, validator: ValidateFn, selector: SelectFn, subscriptionKeyFn?: SubscriptionKeyFn) {
super()

if (!validator) {
if (validator == null) {
throw new CodeError('missing validator', 'ERR_INVALID_PARAMETERS')
}

Expand All @@ -43,7 +40,7 @@ export class PubSubDatastore extends BaseDatastore {
throw new CodeError('missing select function', 'ERR_INVALID_PARAMETERS')
}

if (subscriptionKeyFn && typeof subscriptionKeyFn !== 'function') {
if ((subscriptionKeyFn != null) && typeof subscriptionKeyFn !== 'function') {
throw new CodeError('invalid subscriptionKeyFn received', 'ERR_INVALID_PARAMETERS')
}

Expand All @@ -56,7 +53,11 @@ export class PubSubDatastore extends BaseDatastore {

// Bind _onMessage function, which is called by pubsub.
this._onMessage = this._onMessage.bind(this)
this._pubsub.addEventListener('message', this._onMessage)
this._pubsub.addEventListener('message', (evt) => {
this._onMessage(evt).catch(err => {
log.error(err)
})
})
}

/**
Expand All @@ -66,8 +67,8 @@ export class PubSubDatastore extends BaseDatastore {
* @param {Uint8Array} val - value to be propagated.
* @param {AbortOptions} [options]
*/
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async put (key, val, options) {
// @ts-expect-error Datastores take keys as Keys, this one takes Uint8Arrays
async put (key: Uint8Array, val: Uint8Array, options?: AbortOptions): Promise<void> {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'

Expand All @@ -91,13 +92,10 @@ export class PubSubDatastore extends BaseDatastore {
}

/**
* Try to subscribe a topic with Pubsub and returns the local value if available.
*
* @param {Uint8Array} key - identifier of the value to be subscribed.
* @param {AbortOptions} [options]
* Try to subscribe a topic with Pubsub and returns the local value if available
*/
// @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays
async get (key, options) {
// @ts-expect-error Datastores take keys as Keys, this one takes Uint8Arrays
async get (key: Uint8Array, options?: AbortOptions): Promise<Uint8Array> {
if (!(key instanceof Uint8Array)) {
const errMsg = 'datastore key does not have a valid format'

Expand All @@ -106,54 +104,47 @@ export class PubSubDatastore extends BaseDatastore {
}

const stringifiedTopic = keyToTopic(key)
const subscriptions = await this._pubsub.getTopics()
const subscriptions = this._pubsub.getTopics()

// If already subscribed, just try to get it
if (subscriptions && Array.isArray(subscriptions) && subscriptions.indexOf(stringifiedTopic) > -1) {
return this._getLocal(key, options)
if (subscriptions.includes(stringifiedTopic)) {
return await this._getLocal(key, options)
}

// subscribe
try {
await this._pubsub.subscribe(stringifiedTopic)
} catch (/** @type {any} */ err) {
this._pubsub.subscribe(stringifiedTopic)
} catch (err: any) {
const errMsg = `cannot subscribe topic ${stringifiedTopic}`

log.error(errMsg)
throw new CodeError(errMsg, 'ERR_SUBSCRIBING_TOPIC')
}
log(`subscribed values for key ${stringifiedTopic}`)

return this._getLocal(key)
return await this._getLocal(key)
}

/**
* Unsubscribe topic.
*
* @param {Uint8Array} key - identifier of the value to unsubscribe.
* @returns {void}
* Unsubscribe topic
*/
unsubscribe (key) {
unsubscribe (key: Uint8Array): void {
const stringifiedTopic = keyToTopic(key)

return this._pubsub.unsubscribe(stringifiedTopic)
this._pubsub.unsubscribe(stringifiedTopic)
}

/**
* Get record from local datastore
*
* @private
* @param {Uint8Array} key
* @param {AbortOptions} [options]
*/
async _getLocal (key, options) {
async _getLocal (key: Uint8Array, options?: AbortOptions): Promise<Uint8Array> {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
let dsVal

try {
dsVal = await this._datastore.get(routingKey, options)
} catch (/** @type {any} */ err) {
} catch (err: any) {
if (err.code !== 'ERR_NOT_FOUND') {
const errMsg = `unexpected error getting the ipns record for ${routingKey.toString()}`

Expand All @@ -178,10 +169,8 @@ export class PubSubDatastore extends BaseDatastore {

/**
* handles pubsub subscription messages
*
* @param {CustomEvent<PubSubMessage>} evt
*/
async _onMessage (evt) {
async _onMessage (evt: CustomEvent<Message>): Promise<void> {
const msg = evt.detail

if (msg.type !== 'signed') {
Expand All @@ -193,7 +182,7 @@ export class PubSubDatastore extends BaseDatastore {
let key
try {
key = topicToKey(topic)
} catch (/** @type {any} */ err) {
} catch (err: any) {
log.error(err)
return
}
Expand All @@ -206,12 +195,12 @@ export class PubSubDatastore extends BaseDatastore {
return
}

if (this._handleSubscriptionKeyFn) {
if (this._handleSubscriptionKeyFn != null) {
let res

try {
res = await this._handleSubscriptionKeyFn(key)
} catch (/** @type {any} */ err) {
} catch (err: any) {
log.error('message discarded by the subscriptionKeyFn')
return
}
Expand All @@ -221,24 +210,20 @@ export class PubSubDatastore extends BaseDatastore {

try {
await this._storeIfSubscriptionIsBetter(key, data)
} catch (/** @type {any} */ err) {
} catch (err: any) {
log.error(err)
}
}

/**
* Store the received record if it is better than the current stored
*
* @param {Uint8Array} key
* @param {Uint8Array} data
* @param {AbortOptions} [options]
*/
async _storeIfSubscriptionIsBetter (key, data, options) {
async _storeIfSubscriptionIsBetter (key: Uint8Array, data: Uint8Array, options?: AbortOptions): Promise<void> {
let isBetter = false

try {
isBetter = await this._isBetter(key, data)
} catch (/** @type {any} */ err) {
} catch (err: any) {
if (err.code !== 'ERR_NOT_VALID_RECORD') {
throw err
}
Expand All @@ -251,37 +236,28 @@ export class PubSubDatastore extends BaseDatastore {

/**
* Validate record according to the received validation function
*
* @param {Uint8Array} key
* @param {Uint8Array} value
*/
async _validateRecord (key, value) { // eslint-disable-line require-await
return this._validator(key, value)
async _validateRecord (key: Uint8Array, value: Uint8Array): Promise<void> { // eslint-disable-line require-await
await this._validator(key, value)
}

/**
* Select the best record according to the received select function
*
* @param {Uint8Array} key
* @param {Uint8Array[]} records
*/
async _selectRecord (key, records) {
const res = await this._selector(key, records)
async _selectRecord (key: Uint8Array, records: Uint8Array[]): Promise<boolean> {
const res = this._selector(key, records)

// If the selected was the first (0), it should be stored (true)
return res === 0
}

/**
* Verify if the record received through pubsub is valid and better than the one currently stored
*
* @param {Uint8Array} key
* @param {Uint8Array} val
*/
async _isBetter (key, val) {
async _isBetter (key: Uint8Array, val: Uint8Array): Promise<boolean> {
try {
await this._validateRecord(key, val)
} catch (/** @type {any} */ err) {
} catch (err: any) {
// If not valid, it is not better than the one currently available
const errMsg = 'record received through pubsub is not valid'

Expand All @@ -295,7 +271,7 @@ export class PubSubDatastore extends BaseDatastore {

try {
currentRecord = await this._getLocal(dsKey.uint8Array())
} catch (/** @type {any} */ err) {
} catch (err: any) {
// if the old one is invalid, the new one is *always* better
return true
}
Expand All @@ -306,17 +282,13 @@ export class PubSubDatastore extends BaseDatastore {
}

// verify if the received record should replace the current one
return this._selectRecord(key, [currentRecord, val])
return await this._selectRecord(key, [currentRecord, val])
}

/**
* add record to datastore
*
* @param {Uint8Array} key
* @param {Uint8Array} data
* @param {AbortOptions} [options]
*/
async _storeRecord (key, data, options) {
async _storeRecord (key: Uint8Array, data: Uint8Array, options?: AbortOptions): Promise<void> {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)

Expand Down
1 change: 0 additions & 1 deletion src/types.ts

This file was deleted.

Loading