Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactor: async await (#148)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Switch to using async/await and async iterators.

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
vasco-santos and jacobheun authored Nov 19, 2019
1 parent 088534b commit c49fa92
Show file tree
Hide file tree
Showing 56 changed files with 3,111 additions and 3,855 deletions.
42 changes: 25 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,46 @@
"url": "https://github.com/libp2p/js-libp2p-kad-dht/issues"
},
"engines": {
"node": ">=6.0.0",
"npm": ">=3.0.0"
"node": ">=10.0.0",
"npm": ">=6.0.0"
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
"cids": "~0.7.0",
"cids": "~0.7.1",
"debug": "^4.1.1",
"err-code": "^1.1.2",
"err-code": "^2.0.0",
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "~0.7.0",
"interface-datastore": "~0.8.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.16.1",
"libp2p-record": "~0.6.2",
"multihashes": "~0.4.14",
"multihashing-async": "~0.5.2",
"p-queue": "^6.0.0",
"libp2p-crypto": "~0.17.1",
"libp2p-record": "~0.7.0",
"multihashes": "~0.4.15",
"multihashing-async": "~0.8.0",
"p-filter": "^2.1.0",
"p-map": "^3.0.0",
"p-queue": "^6.2.1",
"p-timeout": "^3.2.0",
"p-times": "^2.1.0",
"peer-id": "~0.12.2",
"peer-info": "~0.15.1",
"peer-id": "~0.13.5",
"peer-info": "~0.17.0",
"promise-to-callback": "^1.0.0",
"promisify-es6": "^1.0.3",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.2",
"pull-stream": "^3.6.9",
"pull-length-prefixed": "^1.3.3",
"pull-stream": "^3.6.14",
"varint": "^5.0.0",
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^20.0.0",
"aegir": "^20.4.1",
"chai": "^4.2.0",
"datastore-level": "~0.12.1",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.3",
"libp2p-mplex": "~0.8.5",
Expand All @@ -76,8 +80,12 @@
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"peer-book": "~0.9.1",
"sinon": "^7.3.1"
"p-defer": "^3.0.0",
"p-each-series": "^2.1.0",
"p-map-series": "^2.1.0",
"p-retry": "^4.2.0",
"peer-book": "~0.9.2",
"sinon": "^7.5.0"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
Expand Down
278 changes: 278 additions & 0 deletions src/content-fetching/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
'use strict'

const errcode = require('err-code')

const pFilter = require('p-filter')
const pTimeout = require('p-timeout')

const libp2pRecord = require('libp2p-record')

const c = require('../constants')
const Query = require('../query')

const utils = require('../utils')

const Record = libp2pRecord.Record

module.exports = (dht) => {
const putLocal = async (key, rec) => { // eslint-disable-line require-await
return dht.datastore.put(utils.bufferToKey(key), rec)
}

/**
* Attempt to retrieve the value for the given key from
* the local datastore.
*
* @param {Buffer} key
* @returns {Promise<Record>}
*
* @private
*/
const getLocal = async (key) => {
dht._log('getLocal %b', key)

const raw = await dht.datastore.get(utils.bufferToKey(key))
dht._log('found %b in local datastore', key)
const rec = Record.deserialize(raw)

await dht._verifyRecordLocally(rec)
return rec
}

/**
* Send the best record found to any peers that have an out of date record.
*
* @param {Buffer} key
* @param {Array<Object>} vals - values retrieved from the DHT
* @param {Object} best - the best record that was found
* @returns {Promise}
*
* @private
*/
const sendCorrectionRecord = async (key, vals, best) => {
const fixupRec = await utils.createPutRecord(key, best)

return Promise.all(vals.map(async (v) => {
// no need to do anything
if (v.val.equals(best)) {
return
}

// correct ourself
if (dht._isSelf(v.from)) {
try {
await dht._putLocal(key, fixupRec)
} catch (err) {
dht._log.error('Failed error correcting self', err)
}
return
}

// send correction
try {
await dht._putValueToPeer(key, fixupRec, v.from)
} catch (err) {
dht._log.error('Failed error correcting entry', err)
}
}))
}

return {
/**
* Store the given key/value pair locally, in the datastore.
* @param {Buffer} key
* @param {Buffer} rec - encoded record
* @returns {Promise<void>}
* @private
*/
async _putLocal (key, rec) { // eslint-disable-line require-await
return putLocal(key, rec)
},

/**
* Store the given key/value pair in the DHT.
*
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
* @returns {Promise<void>}
*/
async put (key, value, options = {}) {
dht._log('PutValue %b', key)

// create record in the dht format
const record = await utils.createPutRecord(key, value)

// store the record locally
await putLocal(key, record)

// put record to the closest peers
const peers = await dht.getClosestPeers(key, { shallow: true })
const results = await pFilter(peers, async (peer) => {
try {
await dht._putValueToPeer(key, record, peer)
return true
} catch (err) {
dht._log.error('Failed to put to peer (%b): %s', peer.id, err)
return false
}
})

// verify if we were able to put to enough peers
const minPeers = options.minPeers || peers.length // Ensure we have a default `minPeers`

if (minPeers > results.length) {
const error = errcode(new Error(`Failed to put value to enough peers: ${results.length}/${minPeers}`), 'ERR_NOT_ENOUGH_PUT_PEERS')
dht._log.error(error)
throw error
}
},

/**
* Get the value to the given key.
* Times out after 1 minute by default.
*
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<{from: PeerId, val: Buffer}>}
*/
async get (key, options = {}) {
options.timeout = options.timeout || c.minute

dht._log('_get %b', key)

const vals = await dht.getMany(key, c.GET_MANY_RECORD_COUNT, options)
const recs = vals.map((v) => v.val)
let i = 0

try {
i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs)
} catch (err) {
// Assume the first record if no selector available
if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') {
throw err
}
}

const best = recs[i]
dht._log('GetValue %b %s', key, best)

if (!best) {
throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND')
}

await sendCorrectionRecord(key, vals, best)

return best
},

/**
* Get the `n` values to the given key without sorting.
*
* @param {Buffer} key
* @param {number} nvals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
async getMany (key, nvals, options = {}) {
options.timeout = options.timeout || c.minute

dht._log('getMany %b (%s)', key, nvals)

let vals = []
let localRec

try {
localRec = await getLocal(key)
} catch (err) {
if (nvals === 0) {
throw err
}
}

if (localRec) {
vals.push({
val: localRec.value,
from: dht.peerInfo.id
})
}

if (vals.length >= nvals) {
return vals
}

const paths = []
const id = await utils.convertBuffer(key)
const rtp = dht.routingTable.closestPeers(id, this.kBucketSize)

dht._log('peers in rt: %d', rtp.length)

if (rtp.length === 0) {
const errMsg = 'Failed to lookup key! No peers from routing table!'

dht._log.error(errMsg)
throw errcode(new Error(errMsg), 'ERR_NO_PEERS_IN_ROUTING_TABLE')
}

// we have peers, lets do the actual query to them
const query = new Query(dht, key, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - vals.length, numPaths)
const pathVals = []
paths.push(pathVals)

// Here we return the query function to use on this particular disjoint path
return async (peer) => {
let rec, peers, lookupErr
try {
const results = await dht._getValueOrPeers(peer, key)
rec = results.record
peers = results.peers
} catch (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (err.code !== 'ERR_INVALID_RECORD') {
throw err
}
lookupErr = err
}

const res = { closerPeers: peers }

if ((rec && rec.value) || lookupErr) {
pathVals.push({
val: rec && rec.value,
from: peer
})
}

// enough is enough
if (pathVals.length >= pathSize) {
res.pathComplete = true
}

return res
}
})

let error
try {
await pTimeout(query.run(rtp), options.timeout)
} catch (err) {
error = err
}
query.stop()

// combine vals from each path
vals = [].concat.apply(vals, paths).slice(0, nvals)

if (error && vals.length === 0) {
throw error
}

return vals
}
}
}
Loading

0 comments on commit c49fa92

Please sign in to comment.