Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add types #60

Merged
merged 7 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
language: node_js
cache: npm
dist: bionic

stages:
- check
- test
- cov
branches:
only:
- master
- /^release\/.*$/

node_js:
- '10'
- 'lts/*'
- 'node'

stages:
- check

os:
- linux
Expand Down
8 changes: 7 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"description": "Responsible for providing an interface-datastore compliant api to pubsub",
"leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",
"main": "src/index.js",
"types": "dist/src/index.d.ts",
"scripts": {
"build": "aegir build",
"prepare": "aegir build --no-bundle",
"prepublishOnly": "aegir build",
"lint": "aegir lint",
"release": "aegir release --target node",
"release-minor": "aegir release --target node --type minor",
Expand Down Expand Up @@ -38,8 +40,12 @@
"uint8arrays": "^2.0.5"
},
"devDependencies": {
"@types/detect-node": "^2.0.0",
"@types/mocha": "^8.2.1",
"@types/sinon": "^9.0.10",
"aegir": "^31.0.3",
"detect-node": "^2.0.4",
"ipfs-core-types": "^0.3.0",
"it-pair": "^1.0.0",
"libp2p": "^0.30.9",
"libp2p-gossipsub": "^0.8.0",
Expand Down
120 changes: 59 additions & 61 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@

const { Key, Adapter } = require('interface-datastore')
const { encodeBase32, keyToTopic, topicToKey } = require('./utils')
const uint8ArrayEquals = require('uint8arrays/equals')

const errcode = require('err-code')
const debug = require('debug')
const log = debug('datastore-pubsub:publisher')
log.error = debug('datastore-pubsub:publisher:error')
const log = Object.assign(debug('datastore-pubsub:publisher'), {
error: debug('datastore-pubsub:publisher:error')
})

/**
* @typedef {import('peer-id')} PeerId
* @typedef {import('./types').Validator} Validator
* @typedef {import('./types').SubscriptionKeyFn} SubscriptionKeyFn
* @typedef {import('libp2p-interfaces/src/pubsub/message').Message} PubSubMessage
*/

// DatastorePubsub is responsible for providing an api for pubsub to be used as a datastore with
// [TieredDatastore]{@link https://github.com/ipfs/js-datastore-core/blob/master/src/tiered.js}
Expand All @@ -16,11 +25,9 @@ class DatastorePubsub extends Adapter {
*
* @param {*} pubsub - pubsub implementation.
* @param {*} datastore - datastore instance.
* @param {*} peerId - peer-id instance.
* @param {Object} validator - validator functions.
* @param {(record: uint8Array, peerId: PeerId) => boolean} validator.validate - function to validate a record.
* @param {(received: uint8Array, current: uint8Array) => boolean} validator.select - function to select the newest between two records.
* @param {function(key, callback)} subscriptionKeyFn - optional function to manipulate the key topic received before processing it.
* @param {PeerId} peerId - peer-id instance.
* @param {Validator} validator - validator functions.
* @param {SubscriptionKeyFn} subscriptionKeyFn - optional function to manipulate the key topic received before processing it.
* @memberof DatastorePubsub
*/
constructor (pubsub, datastore, peerId, validator, subscriptionKeyFn) {
Expand Down Expand Up @@ -55,9 +62,9 @@ class DatastorePubsub extends Adapter {
/**
* Publishes a value through pubsub.
*
* @param {Uint8Array} key - identifier of the value to be published.
* @param {Key} key - identifier of the value to be published.
* @param {Uint8Array} val - value to be propagated.
* @returns {Promise}
* @returns {Promise<void>}
*/
async put (key, val) { // eslint-disable-line require-await
if (!(key instanceof Uint8Array)) {
Expand Down Expand Up @@ -85,8 +92,7 @@ class DatastorePubsub extends Adapter {
/**
* Try to subscribe a topic with Pubsub and returns the local value if available.
*
* @param {Uint8Array} key - identifier of the value to be subscribed.
* @returns {Promise<Uint8Array>}
* @param {Key} key - identifier of the value to be subscribed.
*/
async get (key) {
if (!(key instanceof Uint8Array)) {
Expand Down Expand Up @@ -130,7 +136,12 @@ class DatastorePubsub extends Adapter {
return this._pubsub.unsubscribe(stringifiedTopic, this._onMessage)
}

// Get record from local datastore
/**
* Get record from local datastore
*
* @private
* @param {Uint8Array} key
*/
async _getLocal (key) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)
Expand Down Expand Up @@ -161,7 +172,11 @@ class DatastorePubsub extends Adapter {
return dsVal
}

// handles pubsub subscription messages
/**
* handles pubsub subscription messages
*
* @param {PubSubMessage} msg
*/
async _onMessage (msg) {
const { data, from, topicIDs } = msg
let key
Expand Down Expand Up @@ -200,7 +215,12 @@ class DatastorePubsub extends Adapter {
}
}

// Store the received record if it is better than the current stored
/**
* Store the received record if it is better than the current stored
*
* @param {Uint8Array} key
* @param {Uint8Array} data
*/
async _storeIfSubscriptionIsBetter (key, data) {
let isBetter = false

Expand All @@ -217,20 +237,35 @@ class DatastorePubsub extends Adapter {
}
}

// Validate record according to the received validation function
/**
* Validate record according to the received validation function
*
* @param {Uint8Array} value
* @param {Uint8Array} peerId
*/
async _validateRecord (value, peerId) { // eslint-disable-line require-await
return this._validator.validate(value, peerId)
}

// Select the best record according to the received select function.
/**
* Select the best record according to the received select function
*
* @param {Uint8Array} receivedRecord
* @param {Uint8Array} currentRecord
*/
async _selectRecord (receivedRecord, currentRecord) {
const res = await this._validator.select(receivedRecord, currentRecord)

// If the selected was the first (0), it should be stored (true)
return res === 0
}

// Verify if the record received through pubsub is valid and better than the one currently stored
/**
* Verify if the record received through pubsub is valid and better than the one currently stored
*
* @param {Uint8Array} key
* @param {Uint8Array} val
*/
async _isBetter (key, val) {
// validate received record
let error, valid
Expand Down Expand Up @@ -261,64 +296,27 @@ class DatastorePubsub extends Adapter {
}

// if the same record, do not need to store
if (currentRecord.equals(val)) {
if (uint8ArrayEquals(currentRecord, val)) {
return false
}

// verify if the received record should replace the current one
return this._selectRecord(val, currentRecord)
}

// add record to datastore
/**
* add record to datastore
*
* @param {Uint8Array} key
* @param {Uint8Array} data
*/
async _storeRecord (key, data) {
// encode key - base32(/ipns/{cid})
const routingKey = new Key('/' + encodeBase32(key), false)

await this._datastore.put(routingKey, data)
log(`record for ${keyToTopic(key)} was stored in the datastore`)
}

open () {
const errMsg = 'open function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

has (key) {
const errMsg = 'has function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

delete (key) {
const errMsg = 'delete function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

close () {
const errMsg = 'close function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

batch () {
const errMsg = 'batch function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}

query () {
const errMsg = 'query function was not implemented yet'

log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NOT_IMPLEMENTED_YET')
}
}

exports = module.exports = DatastorePubsub
11 changes: 11 additions & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Key } from 'interface-datastore'
import PeerId from 'peer-id'

type ValidateFn = (record: Uint8Array, peerId: Uint8Array) => boolean
type CompareFn = (received: Uint8Array, current: Uint8Array) => number
export type SubscriptionKeyFn = (key: Uint8Array) => Promise<Uint8Array>

export interface Validator {
validate: ValidateFn,
select: CompareFn
}
33 changes: 27 additions & 6 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,42 @@ const errcode = require('err-code')
const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string')

/**
* @typedef {import('interface-datastore').Key} Key
*/

const namespace = '/record/'

module.exports.encodeBase32 = (buf) => {
/**
* @param {Uint8Array} buf
*/
function encodeBase32 (buf) {
return uint8ArrayToString(buf, 'base32')
}

// converts a binary record key to a pubsub topic key.
module.exports.keyToTopic = (key) => {
/**
* converts a binary record key to a pubsub topic key
*
* @param {Uint8Array | string} key
*/
function keyToTopic (key) {
// Record-store keys are arbitrary binary. However, pubsub requires UTF-8 string topic IDs
// Encodes to "/record/base64url(key)"
if (typeof key === 'string' || key instanceof String) {
key = uint8ArrayFromString(key)
key = uint8ArrayFromString(key.toString())
}

const b64url = uint8ArrayToString(key, 'base64url')

return `${namespace}${b64url}`
}

// converts a pubsub topic key to a binary record key.
module.exports.topicToKey = (topic) => {
/**
* converts a pubsub topic key to a binary record key
*
* @param {string} topic
*/
function topicToKey (topic) {
if (topic.substring(0, namespace.length) !== namespace) {
throw errcode(new Error('topic received is not from a record'), 'ERR_TOPIC_IS_NOT_FROM_RECORD_NAMESPACE')
}
Expand All @@ -33,3 +48,9 @@ module.exports.topicToKey = (topic) => {

return uint8ArrayFromString(key, 'base64url')
}

module.exports = {
encodeBase32,
keyToTopic,
topicToKey
}
9 changes: 9 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"extends": "./node_modules/aegir/src/config/tsconfig.aegir.json",
"compilerOptions": {
"outDir": "dist"
},
"include": [
"src"
]
}