Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactor(async): providers refactor from #82
Browse files Browse the repository at this point in the history
  • Loading branch information
kumavis committed Jun 3, 2019
1 parent d508452 commit 013fc62
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 308 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.2",
"pull-stream": "^3.6.9",
"pull-stream-to-async-iterator": "^1.0.1",
"varint": "^5.0.0",
"xor-distance": "^2.0.0"
},
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ class KadDHT extends EventEmitter {

const errors = []
waterfall([
(cb) => this.providers.addProvider(key, this.peerInfo.id, cb),
(cb) => promiseToCallback(this.providers.addProvider(key, this.peerInfo.id))(err => cb(err)),
(cb) => this.getClosestPeers(key.buffer, cb),
(peers, cb) => {
const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
Expand Down
2 changes: 1 addition & 1 deletion src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ module.exports = (dht) => ({
async _findNProvidersAsync (key, providerTimeout, n) {
const out = new LimitedPeerList(n)

const provs = await promisify(cb => dht.providers.getProviders(key, cb))()
const provs = await dht.providers.getProviders(key)

provs.forEach((id) => {
let info
Expand Down
250 changes: 71 additions & 179 deletions src/providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

const cache = require('hashlru')
const varint = require('varint')
const each = require('async/each')
const pull = require('pull-stream')
const CID = require('cids')
const PeerId = require('peer-id')
const Key = require('interface-datastore').Key
const Queue = require('p-queue')
const promisify = require('promisify-es6')
const toIterator = require('pull-stream-to-async-iterator')

const c = require('./constants')
const utils = require('./utils')
Expand Down Expand Up @@ -74,48 +73,12 @@ class Providers {
}

/**
* Check all providers if they are still valid, and if not
* delete them.
* Check all providers if they are still valid, and if not delete them.
*
* @returns {undefined}
* @returns {Promise}
*
* @private
*/
_cleanup () {
this._getProviderCids((err, cids) => {
if (err) {
return this._log.error('Failed to get cids', err)
}

each(cids, (cid, cb) => {
this._getProvidersMap(cid, (err, provs) => {
if (err) {
return cb(err)
}

provs.forEach((time, provider) => {
this._log('comparing: %s - %s > %s', Date.now(), time, this.provideValidity)
if (Date.now() - time > this.provideValidity) {
provs.delete(provider)
}
})

if (provs.size === 0) {
return this._deleteProvidersMap(cid, cb)
}

cb()
})
}, (err) => {
if (err) {
return this._log.error('Failed to cleanup', err)
}

this._log('Cleanup successfull')
})
})
}

_cleanup () {
return this.syncQueue.add(async () => {
this._log('start cleanup')
Expand All @@ -127,8 +90,8 @@ class Providers {
const batch = this.datastore.batch()

// Get all provider entries from the datastore
const it = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX })
for await (const entry of it) {
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX })
for await (const entry of toIterator(query)) {
try {
// Add a delete to the batch for each expired entry
const { cid, peerId } = parseProviderKey(entry.key)
Expand All @@ -154,7 +117,7 @@ class Providers {

// Commit the deletes to the datastore
if (deleted.size) {
await batch.commit()
await promisify(cb => batch.commit(cb))()
}

// Clear expired entries from the cache
Expand All @@ -178,91 +141,21 @@ class Providers {
}

/**
* Get a list of all cids that providers are known for.
*
* @param {function(Error, Array<CID>)} callback
* @returns {undefined}
*
* @private
*/
_getProviderCids (callback) {
pull(
this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX }),
pull.map((entry) => {
const parts = entry.key.toString().split('/')
if (parts.length !== 4) {
this._log.error('incorrectly formatted provider entry in datastore: %s', entry.key)
return
}

let decoded
try {
decoded = utils.decodeBase32(parts[2])
} catch (err) {
this._log.error('error decoding base32 provider key: %s', parts[2])
return
}

let cid
try {
cid = new CID(decoded)
} catch (err) {
this._log.error('error converting key to cid from datastore: %s', err.message)
}

return cid
}),
pull.filter(Boolean),
pull.collect(callback)
)
}

/**
* Get the currently known provider maps for a given CID.
* Get the currently known provider peer ids for a given CID.
*
* @param {CID} cid
* @param {function(Error, Map<PeerId, Date>)} callback
* @returns {undefined}
* @returns {Promise<Map<String, Date>>}
*
* @private
*/
_getProvidersMap (cid, callback) {
const provs = this.providers.get(makeProviderKey(cid))

async _getProvidersMap (cid) {
const cacheKey = makeProviderKey(cid)
let provs = this.providers.get(cacheKey)
if (!provs) {
return loadProviders(this.datastore, cid, callback)
provs = await loadProviders(this.datastore, cid)
this.providers.set(cacheKey, provs)
}

callback(null, provs)
}

/**
* Completely remove a providers map entry for a given CID.
*
* @param {CID} cid
* @param {function(Error)} callback
* @returns {undefined}
*
* @private
*/
_deleteProvidersMap (cid, callback) {
const dsKey = makeProviderKey(cid)
this.providers.set(dsKey, null)
const batch = this.datastore.batch()

pull(
this.datastore.query({
keysOnly: true,
prefix: dsKey
}),
pull.through((entry) => batch.delete(entry.key)),
pull.onEnd((err) => {
if (err) {
return callback(err)
}
batch.commit(callback)
})
)
return provs
}

get cleanupInterval () {
Expand All @@ -283,67 +176,55 @@ class Providers {
}

/**
* Add a new provider.
* Add a new provider for the given CID.
*
* @param {CID} cid
* @param {PeerId} provider
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise}
*/
addProvider (cid, provider, callback) {
this._log('addProvider %s', cid.toBaseEncodedString())
const dsKey = makeProviderKey(cid)
const provs = this.providers.get(dsKey)

const next = (err, provs) => {
if (err) {
return callback(err)
}
async addProvider (cid, provider) {
return this.syncQueue.add(async () => {
this._log('addProvider %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)

this._log('loaded %s provs', provs.size)
const now = Date.now()
provs.set(provider, now)
provs.set(utils.encodeBase32(provider.id), now)

const dsKey = makeProviderKey(cid)
this.providers.set(dsKey, provs)
writeProviderEntry(this.datastore, cid, provider, now, callback)
}

if (!provs) {
loadProviders(this.datastore, cid, next)
} else {
next(null, provs)
}
return writeProviderEntry(this.datastore, cid, provider, now)
})
}

/**
* Get a list of providers for the given CID.
*
* @param {CID} cid
* @param {function(Error, Array<PeerId>)} callback
* @returns {undefined}
* @returns {Promise<Array<PeerId>>}
*/
getProviders (cid, callback) {
this._log('getProviders %s', cid.toBaseEncodedString())
this._getProvidersMap(cid, (err, provs) => {
if (err) {
return callback(err)
}

callback(null, Array.from(provs.keys()))
async getProviders (cid) {
return this.syncQueue.add(async () => {
this._log('getProviders %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)
return [...provs.keys()].map((base32PeerId) => {
return new PeerId(utils.decodeBase32(base32PeerId))
})
})
}
}

/**
* Encode the given key its matching datastore key.
*
* @param {CID} cid
* @param {CID|string} cid - cid or base32 encoded string
* @returns {string}
*
* @private
*/
function makeProviderKey (cid) {
return c.PROVIDERS_KEY_PREFIX + utils.encodeBase32(cid.buffer)
cid = typeof cid === 'string' ? cid : utils.encodeBase32(cid.buffer)
return c.PROVIDERS_KEY_PREFIX + cid
}

/**
Expand All @@ -353,48 +234,59 @@ function makeProviderKey (cid) {
* @param {CID} cid
* @param {PeerId} peer
* @param {number} time
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise}
*
* @private
*/
function writeProviderEntry (store, cid, peer, time, callback) {
async function writeProviderEntry (store, cid, peer, time) {
const dsKey = [
makeProviderKey(cid),
'/',
utils.encodeBase32(peer.id)
].join('')

store.put(new Key(dsKey), Buffer.from(varint.encode(time)), callback)
const key = new Key(dsKey)
const buffer = Buffer.from(varint.encode(time))
return promisify(cb => store.put(key, buffer, cb))()
}

/**
* Load providers from the store.
* Parse the CID and provider peer id from the key
*
* @param {DKey} key
* @returns {Object} object with peer id and cid
*
* @private
*/
function parseProviderKey (key) {
const parts = key.toString().split('/')
if (parts.length !== 4) {
throw new Error('incorrectly formatted provider entry key in datastore: ' + key)
}

return {
cid: parts[2],
peerId: parts[3]
}
}

/**
* Load providers for the given CID from the store.
*
* @param {Datastore} store
* @param {CID} cid
* @param {function(Error, Map<PeerId, Date>)} callback
* @returns {undefined}
* @returns {Promise<Map<PeerId, Date>>}
*
* @private
*/
function loadProviders (store, cid, callback) {
pull(
store.query({ prefix: makeProviderKey(cid) }),
pull.map((entry) => {
const parts = entry.key.toString().split('/')
const lastPart = parts[parts.length - 1]
const rawPeerId = utils.decodeBase32(lastPart)
return [new PeerId(rawPeerId), readTime(entry.value)]
}),
pull.collect((err, res) => {
if (err) {
return callback(err)
}

return callback(null, new Map(res))
})
)
async function loadProviders (store, cid) {
const providers = new Map()
const query = store.query({ prefix: makeProviderKey(cid) })
for await (const entry of toIterator(query)) {
const { peerId } = parseProviderKey(entry.key)
providers.set(peerId, readTime(entry.value))
}
return providers
}

function readTime (buf) {
Expand Down
Loading

0 comments on commit 013fc62

Please sign in to comment.