diff --git a/.travis.yml b/.travis.yml index bb9067f..41b62b8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,18 @@ language: node_js cache: npm +dist: bionic -stages: - - check - - test - - cov +branches: + only: + - master + - /^release\/.*$/ node_js: - - '10' + - 'lts/*' + - 'node' + +stages: + - check os: - linux diff --git a/package.json b/package.json index a1b8a4f..69dcce4 100644 --- a/package.json +++ b/package.json @@ -4,8 +4,9 @@ "description": "Responsible for providing an interface-datastore compliant api to pubsub", "leadMaintainer": "Vasco Santos ", "main": "src/index.js", + "types": "dist/src/index.d.ts", "scripts": { - "build": "aegir build", + "prepare": "aegir build --no-bundle", "lint": "aegir lint", "release": "aegir release --target node", "release-minor": "aegir release --target node --type minor", @@ -25,6 +26,10 @@ "datastore", "pubsub" ], + "files": [ + "dist", + "src" + ], "author": "Vasco Santos ", "license": "MIT", "bugs": { @@ -38,11 +43,16 @@ "uint8arrays": "^2.0.5" }, "devDependencies": { + "@types/detect-node": "^2.0.0", + "@types/mocha": "^8.2.1", + "@types/sinon": "^9.0.10", "aegir": "^31.0.3", "detect-node": "^2.0.4", + "ipfs-core-types": "^0.3.0", "it-pair": "^1.0.0", "libp2p": "^0.30.9", "libp2p-gossipsub": "^0.8.0", + "libp2p-interfaces": "^0.8.3", "libp2p-record": "^0.10.0", "p-wait-for": "^3.1.0", "peer-id": "^0.14.2", diff --git a/src/index.js b/src/index.js index 00f3bb2..7c8eda8 100644 --- a/src/index.js +++ b/src/index.js @@ -2,11 +2,20 @@ const { Key, Adapter } = require('interface-datastore') const { encodeBase32, keyToTopic, topicToKey } = require('./utils') +const uint8ArrayEquals = require('uint8arrays/equals') const errcode = require('err-code') const debug = require('debug') -const log = debug('datastore-pubsub:publisher') -log.error = debug('datastore-pubsub:publisher:error') +const log = Object.assign(debug('datastore-pubsub:publisher'), { + error: debug('datastore-pubsub:publisher:error') +}) + +/** + * @typedef {import('peer-id')} PeerId + * @typedef {import('./types').Validator} Validator + * @typedef {import('./types').SubscriptionKeyFn} SubscriptionKeyFn + * @typedef {import('libp2p-interfaces/src/pubsub/message').Message} PubSubMessage + */ // DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with // [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js} @@ -14,13 +23,11 @@ class DatastorePubsub extends Adapter { /** * Creates an instance of DatastorePubsub. * - * @param {*} pubsub - pubsub implementation. - * @param {*} datastore - datastore instance. - * @param {*} peerId - peer-id instance. - * @param {Object} validator - validator functions. - * @param {(record: uint8Array, peerId: PeerId) => boolean} validator.validate - function to validate a record. - * @param {(received: uint8Array, current: uint8Array) => boolean} validator.select - function to select the newest between two records. - * @param {function(key, callback)} subscriptionKeyFn - optional function to manipulate the key topic received before processing it. + * @param {import('libp2p-interfaces/src/pubsub')} pubsub - pubsub implementation + * @param {import('interface-datastore').Datastore} datastore - datastore instance + * @param {PeerId} peerId - peer-id instance + * @param {Validator} validator - validator functions + * @param {SubscriptionKeyFn} [subscriptionKeyFn] - function to manipulate the key topic received before processing it * @memberof DatastorePubsub */ constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) { @@ -57,9 +64,9 @@ class DatastorePubsub extends Adapter { * * @param {Uint8Array} key - identifier of the value to be published. * @param {Uint8Array} val - value to be propagated. - * @returns {Promise} */ - async put (key, val) { // eslint-disable-line require-await + // @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays + async put (key, val) { if (!(key instanceof Uint8Array)) { const errMsg = 'datastore key does not have a valid format' @@ -79,15 +86,15 @@ class DatastorePubsub extends Adapter { log(`publish value for topic ${stringifiedTopic}`) // Publish record to pubsub - return this._pubsub.publish(stringifiedTopic, val) + await this._pubsub.publish(stringifiedTopic, val) } /** * Try to subscribe a topic with Pubsub and returns the local value if available. * * @param {Uint8Array} key - identifier of the value to be subscribed. - * @returns {Promise} */ + // @ts-ignore Datastores take keys as Keys, this one takes Uint8Arrays async get (key) { if (!(key instanceof Uint8Array)) { const errMsg = 'datastore key does not have a valid format' @@ -106,7 +113,8 @@ class DatastorePubsub extends Adapter { // subscribe try { - await this._pubsub.subscribe(stringifiedTopic, this._onMessage) + this._pubsub.on(stringifiedTopic, this._onMessage) + await this._pubsub.subscribe(stringifiedTopic) } catch (err) { const errMsg = `cannot subscribe topic ${stringifiedTopic}` @@ -127,10 +135,16 @@ class DatastorePubsub extends Adapter { unsubscribe (key) { const stringifiedTopic = keyToTopic(key) - return this._pubsub.unsubscribe(stringifiedTopic, this._onMessage) + this._pubsub.removeListener(stringifiedTopic, this._onMessage) + return this._pubsub.unsubscribe(stringifiedTopic) } - // Get record from local datastore + /** + * Get record from local datastore + * + * @private + * @param {Uint8Array} key + */ async _getLocal (key) { // encode key - base32(/ipns/{cid}) const routingKey = new Key('/' + encodeBase32(key), false) @@ -161,7 +175,11 @@ class DatastorePubsub extends Adapter { return dsVal } - // handles pubsub subscription messages + /** + * handles pubsub subscription messages + * + * @param {PubSubMessage} msg + */ async _onMessage (msg) { const { data, from, topicIDs } = msg let key @@ -200,7 +218,12 @@ class DatastorePubsub extends Adapter { } } - // Store the received record if it is better than the current stored + /** + * Store the received record if it is better than the current stored + * + * @param {Uint8Array} key + * @param {Uint8Array} data + */ async _storeIfSubscriptionIsBetter (key, data) { let isBetter = false @@ -217,12 +240,22 @@ class DatastorePubsub extends Adapter { } } - // Validate record according to the received validation function + /** + * Validate record according to the received validation function + * + * @param {Uint8Array} value + * @param {Uint8Array} peerId + */ async _validateRecord (value, peerId) { // eslint-disable-line require-await return this._validator.validate(value, peerId) } - // Select the best record according to the received select function. + /** + * Select the best record according to the received select function + * + * @param {Uint8Array} receivedRecord + * @param {Uint8Array} currentRecord + */ async _selectRecord (receivedRecord, currentRecord) { const res = await this._validator.select(receivedRecord, currentRecord) @@ -230,7 +263,12 @@ class DatastorePubsub extends Adapter { return res === 0 } - // Verify if the record received through pubsub is valid and better than the one currently stored + /** + * Verify if the record received through pubsub is valid and better than the one currently stored + * + * @param {Uint8Array} key + * @param {Uint8Array} val + */ async _isBetter (key, val) { // validate received record let error, valid @@ -261,7 +299,7 @@ class DatastorePubsub extends Adapter { } // if the same record, do not need to store - if (currentRecord.equals(val)) { + if (uint8ArrayEquals(currentRecord, val)) { return false } @@ -269,7 +307,12 @@ class DatastorePubsub extends Adapter { return this._selectRecord(val, currentRecord) } - // add record to datastore + /** + * add record to datastore + * + * @param {Uint8Array} key + * @param {Uint8Array} data + */ async _storeRecord (key, data) { // encode key - base32(/ipns/{cid}) const routingKey = new Key('/' + encodeBase32(key), false) @@ -277,48 +320,6 @@ class DatastorePubsub extends Adapter { await this._datastore.put(routingKey, data) log(`record for ${keyToTopic(key)} was stored in the datastore`) } - - open () { - const errMsg = 'open function was not implemented yet' - - log.error(errMsg) - throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') - } - - has (key) { - const errMsg = 'has function was not implemented yet' - - log.error(errMsg) - throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') - } - - delete (key) { - const errMsg = 'delete function was not implemented yet' - - log.error(errMsg) - throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') - } - - close () { - const errMsg = 'close function was not implemented yet' - - log.error(errMsg) - throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') - } - - batch () { - const errMsg = 'batch function was not implemented yet' - - log.error(errMsg) - throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') - } - - query () { - const errMsg = 'query function was not implemented yet' - - log.error(errMsg) - throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET') - } } exports = module.exports = DatastorePubsub diff --git a/src/types.d.ts b/src/types.d.ts new file mode 100644 index 0000000..16f93e3 --- /dev/null +++ b/src/types.d.ts @@ -0,0 +1,9 @@ + +type ValidateFn = (record: Uint8Array, peerId: Uint8Array) => Promise | boolean +type CompareFn = (received: Uint8Array, current: Uint8Array) => number +export type SubscriptionKeyFn = (key: Uint8Array) => Promise | Uint8Array + +export interface Validator { + validate: ValidateFn, + select: CompareFn +} diff --git a/src/utils.js b/src/utils.js index e4f942c..8d3de9c 100644 --- a/src/utils.js +++ b/src/utils.js @@ -4,18 +4,29 @@ const errcode = require('err-code') const uint8ArrayToString = require('uint8arrays/to-string') const uint8ArrayFromString = require('uint8arrays/from-string') +/** + * @typedef {import('interface-datastore').Key} Key + */ + const namespace = '/record/' -module.exports.encodeBase32 = (buf) => { +/** + * @param {Uint8Array} buf + */ +function encodeBase32 (buf) { return uint8ArrayToString(buf, 'base32') } -// converts a binary record key to a pubsub topic key. -module.exports.keyToTopic = (key) => { +/** + * converts a binary record key to a pubsub topic key + * + * @param {Uint8Array | string} key + */ +function keyToTopic (key) { // Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs // Encodes to "/record/base64url(key)" if (typeof key === 'string' || key instanceof String) { - key = uint8ArrayFromString(key) + key = uint8ArrayFromString(key.toString()) } const b64url = uint8ArrayToString(key, 'base64url') @@ -23,8 +34,12 @@ module.exports.keyToTopic = (key) => { return `${namespace}${b64url}` } -// converts a pubsub topic key to a binary record key. -module.exports.topicToKey = (topic) => { +/** + * converts a pubsub topic key to a binary record key + * + * @param {string} topic + */ +function topicToKey (topic) { if (topic.substring(0, namespace.length) !== namespace) { throw errcode(new Error('topic received is not from a record'), 'ERR_TOPIC_IS_NOT_FROM_RECORD_NAMESPACE') } @@ -33,3 +48,9 @@ module.exports.topicToKey = (topic) => { return uint8ArrayFromString(key, 'base64url') } + +module.exports = { + encodeBase32, + keyToTopic, + topicToKey +} diff --git a/test/index.spec.js b/test/index.spec.js index 4fddd5f..8b5925a 100644 --- a/test/index.spec.js +++ b/test/index.spec.js @@ -23,6 +23,14 @@ const { const { Record } = require('libp2p-record') const { keyToTopic, topicToKey } = require('../src/utils') +/** + * @typedef {import('libp2p-interfaces/src/pubsub')} PubSub + * @typedef {import('interface-datastore').Datastore} Datastore + * @typedef {import('peer-id')} PeerId + * @typedef {import('../src/types').Validator} Validator + * @typedef {import('../src/types').SubscriptionKeyFn} SubscriptionKeyFn + */ + // Always returning the expected values // Valid record and select the new one const smoothValidator = { @@ -39,14 +47,20 @@ describe('datastore-pubsub', function () { if (!isNode) return - let pubsubA = null - let datastoreA = null - let peerIdA = null + /** @type {PubSub} */ + let pubsubA + /** @type {Datastore} */ + let datastoreA + /** @type {PeerId} */ + let peerIdA const registrarRecordA = {} - let pubsubB = null - let datastoreB = null - let peerIdB = null + /** @type {PubSub} */ + let pubsubB + /** @type {Datastore} */ + let datastoreB + /** @type {PeerId} */ + let peerIdB const registrarRecordB = {} // Mount pubsub protocol and create datastore instances @@ -74,10 +88,13 @@ describe('datastore-pubsub', function () { const value = 'value' let testCounter = 0 - let keyRef = null - let key = null - let record = null - let serializedRecord = null + let keyRef = '' + /** @type {Uint8Array} */ + let key + /** @type {import('libp2p-record').Record} */ + let record + /** @type {Uint8Array} */ + let serializedRecord // prepare Record beforeEach(() => { @@ -136,6 +153,7 @@ describe('datastore-pubsub', function () { }) it('should validate if record content is the same', async () => { + /** @type {Validator} */ const customValidator = { validate: (data) => { const receivedRecord = Record.deserialize(data) @@ -158,16 +176,13 @@ describe('datastore-pubsub', function () { } // causes pubsub b to become subscribed to the topic - await dsPubsubB.get(key) - .then(() => expect.fail('Should have failed to fetch key'), (err) => { - // not locally stored record - expect(err.code).to.equal('ERR_NOT_FOUND') - }) + await expect(dsPubsubB.get(key)).to.eventually.be.rejected().with.property('code', 'ERR_NOT_FOUND') await waitForPeerToSubscribe(subsTopic, peerIdB, pubsubA) // subscribe in order to understand when the message arrive to the node - await pubsubB.subscribe(subsTopic, messageHandler) + pubsubB.on(subsTopic, messageHandler) + await pubsubB.subscribe(subsTopic) await dsPubsubA.put(key, serializedRecord) @@ -204,7 +219,8 @@ describe('datastore-pubsub', function () { await waitForPeerToSubscribe(subsTopic, peerIdB, pubsubA) // subscribe in order to understand when the message arrive to the node - await pubsubB.subscribe(subsTopic, messageHandler) + pubsubB.on(subsTopic, messageHandler) + await pubsubB.subscribe(subsTopic) await dsPubsubA.put(key, serializedRecord) // wait until message arrives @@ -221,7 +237,8 @@ describe('datastore-pubsub', function () { it('should fail to create the DatastorePubsub if no validator is provided', () => { let dsPubsubB try { - dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB) // no validator + // @ts-expect-error no validator provided + dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB) } catch (err) { expect(err.code).to.equal('ERR_INVALID_PARAMETERS') } @@ -239,6 +256,7 @@ describe('datastore-pubsub', function () { let dsPubsubB try { + // @ts-expect-error invalid validator provided dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) } catch (err) { expect(err.code).to.equal('ERR_INVALID_PARAMETERS') @@ -257,6 +275,7 @@ describe('datastore-pubsub', function () { let dsPubsubB try { + // @ts-expect-error invalid validator provided dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, customValidator) } catch (err) { expect(err.code).to.equal('ERR_INVALID_PARAMETERS') @@ -293,7 +312,8 @@ describe('datastore-pubsub', function () { await waitForPeerToSubscribe(subsTopic, peerIdB, pubsubA) // subscribe in order to understand when the message arrive to the node - await pubsubB.subscribe(subsTopic, messageHandler) + await pubsubB.on(subsTopic, messageHandler) + await pubsubB.subscribe(subsTopic) await dsPubsubA.put(key, serializedRecord) // wait until message arrives @@ -342,7 +362,8 @@ describe('datastore-pubsub', function () { await waitForPeerToSubscribe(subsTopic, peerIdB, pubsubA) // subscribe in order to understand when the message arrive to the node - await pubsubB.subscribe(subsTopic, messageHandler) + await pubsubB.on(subsTopic, messageHandler) + await pubsubB.subscribe(subsTopic) await dsPubsubA.put(key, serializedRecord) // wait until message arrives @@ -392,7 +413,8 @@ describe('datastore-pubsub', function () { await waitForPeerToSubscribe(subsTopic, peerIdB, pubsubA) // subscribe in order to understand when the message arrive to the node - await pubsubB.subscribe(subsTopic, messageHandler) + await pubsubB.on(subsTopic, messageHandler) + await pubsubB.subscribe(subsTopic) await dsPubsubA.put(key, serializedRecord) // wait until message arrives @@ -418,6 +440,7 @@ describe('datastore-pubsub', function () { }) it('should subscribe the topic and after a message being received, discard it using the subscriptionKeyFn', async () => { + /** @type {SubscriptionKeyFn} */ const subscriptionKeyFn = (key) => { expect(uint8ArrayToString(key)).to.equal(`/${keyRef}`) throw new Error('DISCARD MESSAGE') @@ -444,7 +467,8 @@ describe('datastore-pubsub', function () { await waitForPeerToSubscribe(subsTopic, peerIdB, pubsubA) // subscribe in order to understand when the message arrive to the node - await pubsubB.subscribe(subsTopic, messageHandler) + await pubsubB.on(subsTopic, messageHandler) + await pubsubB.subscribe(subsTopic) await dsPubsubA.put(key, serializedRecord) // wait until message arrives @@ -461,9 +485,10 @@ describe('datastore-pubsub', function () { }) it('should subscribe the topic and after a message being received, change its key using subscriptionKeyFn', async () => { + /** @type {SubscriptionKeyFn} */ const subscriptionKeyFn = (key) => { expect(uint8ArrayToString(key)).to.equal(`/${keyRef}`) - return topicToKey(`${keyToTopic(key)}new`) + return Promise.resolve(topicToKey(`${keyToTopic(key)}new`)) } const dsPubsubA = new DatastorePubsub(pubsubA, datastoreA, peerIdA, smoothValidator) const dsPubsubB = new DatastorePubsub(pubsubB, datastoreB, peerIdB, smoothValidator, subscriptionKeyFn) @@ -488,7 +513,8 @@ describe('datastore-pubsub', function () { await waitForPeerToSubscribe(subsTopic, peerIdB, pubsubA) // subscribe in order to understand when the message arrive to the node - await pubsubB.subscribe(subsTopic, messageHandler) + pubsubB.on(subsTopic, messageHandler) + await pubsubB.subscribe(subsTopic) await dsPubsubA.put(key, serializedRecord) // wait until message arrives @@ -519,6 +545,7 @@ describe('datastore-pubsub', function () { expect(err.code).to.equal('ERR_NOT_FOUND') }) + // @ts-ignore sinon added property expect(pubsubA.subscribe.calledOnce).to.equal(true) }) diff --git a/test/utils.js b/test/utils.js index 772de82..e5faf6d 100644 --- a/test/utils.js +++ b/test/utils.js @@ -1,39 +1,59 @@ 'use strict' const PeerId = require('peer-id') +// @ts-ignore const DuplexPair = require('it-pair/duplex') -const Pubsub = require('libp2p-gossipsub') -const { multicodec } = require('libp2p-gossipsub') +const Gossipsub = require('libp2p-gossipsub') +const { multicodec } = Gossipsub const pWaitFor = require('p-wait-for') -const createMockRegistrar = (registrarRecord) => ({ - handle: (multicodecs, handler) => { - const rec = registrarRecord[multicodecs[0]] || {} +/** + * @typedef {import('libp2p-interfaces/src/pubsub')} Pubsub + */ - registrarRecord[multicodecs[0]] = { - ...rec, - handler - } - }, - register: ({ multicodecs, _onConnect, _onDisconnect }) => { - const rec = registrarRecord[multicodecs[0]] || {} - - registrarRecord[multicodecs[0]] = { - ...rec, - onConnect: _onConnect, - onDisconnect: _onDisconnect - } +/** + * @param {Record} registrarRecord + */ +const createMockRegistrar = (registrarRecord) => { + return { + /** @type {import('libp2p')["handle"]} */ + handle: (multicodecs, handler) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + handler + } + }, + /** + * + * @param {import('libp2p-interfaces/src/topology') & { multicodecs: string[] }} arg + */ + register: ({ multicodecs, _onConnect, _onDisconnect }) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + onConnect: _onConnect, + onDisconnect: _onDisconnect + } - return multicodecs[0] - }, - unregister: () => {} -}) + return multicodecs[0] + }, + unregister: () => true + } +} -// as created by libp2p +/** + * As created by libp2p + * + * @param {object} registrarRecord + */ exports.createPubsubNode = async (registrarRecord) => { const peerId = await PeerId.create({ bits: 1024 }) + const libp2p = { peerId, registrar: createMockRegistrar(registrarRecord), @@ -41,31 +61,13 @@ exports.createPubsubNode = async (registrarRecord) => { getAll: () => [] } } - const pubsub = new Pubsub(libp2p) - await pubsub.start() + // @ts-ignore just enough libp2p + const pubsub = new Gossipsub(libp2p) - return { - peerId: pubsub.peerId, - subscribe: (topic, handler) => { - pubsub.subscribe(topic) - - pubsub.on(topic, handler) - }, - unsubscribe: (topic, handler) => { - if (!handler) { - pubsub.removeAllListeners(topic) - } else { - pubsub.removeListener(topic, handler) - } + await pubsub.start() - pubsub.unsubscribe(topic) - }, - publish: (topic, data) => pubsub.publish(topic, data), - getTopics: () => pubsub.getTopics(), - getSubscribers: (topic) => pubsub.getSubscribers(topic), - stop: () => pubsub.stop() - } + return pubsub } const ConnectionPair = () => { @@ -83,6 +85,14 @@ const ConnectionPair = () => { ] } +/** + * @typedef {object} Connectable + * @property {Pubsub} router + * @property {any} registrar + * + * @param {Connectable} pubsubA + * @param {Connectable} pubsubB + */ exports.connectPubsubNodes = async (pubsubA, pubsubB) => { const onConnectA = pubsubA.registrar[multicodec].onConnect const onConnectB = pubsubB.registrar[multicodec].onConnect @@ -111,10 +121,20 @@ exports.connectPubsubNodes = async (pubsubA, pubsubB) => { }) } -// Wait for a condition to become true. When its true, callback is called. +/** + * Wait for a condition to become true. When its true, callback is called. + * + * @param {() => boolean} predicate + */ exports.waitFor = predicate => pWaitFor(predicate, { interval: 1000, timeout: 10000 }) -// Wait until a peer subscribes a topic +/** + * Wait until a peer subscribes a topic + * + * @param {string} topic + * @param {PeerId} peer + * @param {Pubsub} node + */ exports.waitForPeerToSubscribe = (topic, peer, node) => { return pWaitFor(async () => { const peers = await node.getSubscribers(topic) diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..77830df --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "./node_modules/aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ] +}