Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 5110423

Browse files
author
Alan Shaw
authored
refactor: convert dht API to async/await (#1156)
1 parent 621973c commit 5110423

12 files changed

+238
-285
lines changed

src/dht/find-peer.js

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
'use strict'
2+
3+
const PeerId = require('peer-id')
4+
const PeerInfo = require('peer-info')
5+
const multiaddr = require('multiaddr')
6+
const ndjson = require('iterable-ndjson')
7+
const configure = require('../lib/configure')
8+
const toIterable = require('../lib/stream-to-iterable')
9+
10+
module.exports = configure(({ ky }) => {
11+
return (peerId, options) => (async function * () {
12+
options = options || {}
13+
14+
const searchParams = new URLSearchParams(options.searchParams)
15+
searchParams.set('arg', `${peerId}`)
16+
if (options.verbose != null) searchParams.set('verbose', options.verbose)
17+
18+
const res = await ky.get('dht/findpeer', {
19+
timeout: options.timeout,
20+
signal: options.signal,
21+
headers: options.headers,
22+
searchParams
23+
})
24+
25+
for await (const message of ndjson(toIterable(res.body))) {
26+
// 2 = FinalPeer
27+
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18
28+
if (message.Type === 2 && message.Responses) {
29+
for (const { ID, Addrs } of message.Responses) {
30+
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
31+
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
32+
yield peerInfo
33+
}
34+
}
35+
}
36+
})()
37+
})

src/dht/find-provs.js

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict'
2+
3+
const PeerId = require('peer-id')
4+
const PeerInfo = require('peer-info')
5+
const multiaddr = require('multiaddr')
6+
const ndjson = require('iterable-ndjson')
7+
const configure = require('../lib/configure')
8+
const toIterable = require('../lib/stream-to-iterable')
9+
10+
module.exports = configure(({ ky }) => {
11+
return (cid, options) => (async function * () {
12+
options = options || {}
13+
14+
const searchParams = new URLSearchParams(options.searchParams)
15+
searchParams.set('arg', `${cid}`)
16+
if (options.numProviders) searchParams.set('num-providers', options.numProviders)
17+
if (options.verbose != null) searchParams.set('verbose', options.verbose)
18+
19+
const res = await ky.get('dht/findprovs', {
20+
timeout: options.timeout,
21+
signal: options.signal,
22+
headers: options.headers,
23+
searchParams
24+
})
25+
26+
for await (const message of ndjson(toIterable(res.body))) {
27+
// 4 = Provider
28+
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20
29+
if (message.Type === 4 && message.Responses) {
30+
for (const { ID, Addrs } of message.Responses) {
31+
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
32+
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
33+
yield peerInfo
34+
}
35+
}
36+
}
37+
})()
38+
})

src/dht/findpeer.js

-63
This file was deleted.

src/dht/findprovs.js

-63
This file was deleted.

src/dht/get.js

+22-40
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,30 @@
11
'use strict'
22

3-
const promisify = require('promisify-es6')
3+
const ndjson = require('iterable-ndjson')
4+
const configure = require('../lib/configure')
5+
const toIterable = require('../lib/stream-to-iterable')
46

5-
module.exports = (send) => {
6-
return promisify((key, opts, callback) => {
7-
if (typeof opts === 'function' && !callback) {
8-
callback = opts
9-
opts = {}
10-
}
11-
12-
// opts is the real callback --
13-
// 'callback' is being injected by promisify
14-
if (typeof opts === 'function' && typeof callback === 'function') {
15-
callback = opts
16-
opts = {}
17-
}
7+
module.exports = configure(({ ky }) => {
8+
return (key, options) => (async function * () {
9+
options = options || {}
1810

19-
function handleResult (done, err, res) {
20-
if (err) {
21-
return done(err)
22-
}
23-
if (!res) {
24-
return done(new Error('empty response'))
25-
}
26-
if (res.length === 0) {
27-
return done(new Error('no value returned for key'))
28-
}
11+
const searchParams = new URLSearchParams(options.searchParams)
12+
searchParams.set('arg', `${key}`)
13+
if (options.verbose != null) searchParams.set('verbose', options.verbose)
2914

30-
// Inconsistent return values in the browser vs node
31-
if (Array.isArray(res)) {
32-
res = res[0]
33-
}
15+
const res = await ky.get('dht/get', {
16+
timeout: options.timeout,
17+
signal: options.signal,
18+
headers: options.headers,
19+
searchParams
20+
})
3421

35-
if (res.Type === 5) {
36-
done(null, res.Extra)
37-
} else {
38-
done(new Error('key was not found (type 6)'))
22+
for await (const message of ndjson(toIterable(res.body))) {
23+
// 5 = Value
24+
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21
25+
if (message.Type === 5) {
26+
yield message.Extra
3927
}
4028
}
41-
42-
send({
43-
path: 'dht/get',
44-
args: key,
45-
qs: opts
46-
}, handleResult.bind(null, callback))
47-
})
48-
}
29+
})()
30+
})

src/dht/index.js

+22-9
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,30 @@
11
'use strict'
22

3-
const moduleConfig = require('../utils/module-config')
3+
const callbackify = require('callbackify')
4+
const errCode = require('err-code')
5+
const { collectify } = require('../lib/converters')
46

5-
module.exports = (arg) => {
6-
const send = moduleConfig(arg)
7+
module.exports = config => {
8+
const get = require('./get')(config)
9+
const findPeer = require('./find-peer')(config)
710

811
return {
9-
get: require('./get')(send),
10-
put: require('./put')(send),
11-
findProvs: require('./findprovs')(send),
12-
findPeer: require('./findpeer')(send),
13-
provide: require('./provide')(send),
12+
get: callbackify.variadic(async (key, options) => {
13+
for await (const value of get(key, options)) {
14+
return value
15+
}
16+
throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND')
17+
}),
18+
put: callbackify.variadic(collectify(require('./put')(config))),
19+
findProvs: callbackify.variadic(collectify(require('./find-provs')(config))),
20+
findPeer: callbackify.variadic(async (peerId, options) => {
21+
for await (const peerInfo of findPeer(peerId, options)) {
22+
return peerInfo
23+
}
24+
throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND')
25+
}),
26+
provide: callbackify.variadic(collectify(require('./provide')(config))),
1427
// find closest peerId to given peerId
15-
query: require('./query')(send)
28+
query: callbackify.variadic(collectify(require('./query')(config)))
1629
}
1730
}

src/dht/provide.js

+33-30
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,40 @@
11
'use strict'
22

3-
const promisify = require('promisify-es6')
4-
const CID = require('cids')
3+
const PeerId = require('peer-id')
4+
const PeerInfo = require('peer-info')
5+
const multiaddr = require('multiaddr')
6+
const ndjson = require('iterable-ndjson')
7+
const configure = require('../lib/configure')
8+
const toIterable = require('../lib/stream-to-iterable')
9+
const toCamel = require('../lib/object-to-camel')
510

6-
module.exports = (send) => {
7-
return promisify((cids, opts, callback) => {
8-
if (typeof opts === 'function' && !callback) {
9-
callback = opts
10-
opts = {}
11-
}
11+
module.exports = configure(({ ky }) => {
12+
return (cids, options) => (async function * () {
13+
cids = Array.isArray(cids) ? cids : [cids]
14+
options = options || {}
1215

13-
// opts is the real callback --
14-
// 'callback' is being injected by promisify
15-
if (typeof opts === 'function' && typeof callback === 'function') {
16-
callback = opts
17-
opts = {}
18-
}
16+
const searchParams = new URLSearchParams(options.searchParams)
17+
cids.forEach(cid => searchParams.append('arg', `${cid}`))
18+
if (options.recursive != null) searchParams.set('recursive', options.recursive)
19+
if (options.verbose != null) searchParams.set('verbose', options.verbose)
1920

20-
if (!Array.isArray(cids)) {
21-
cids = [cids]
22-
}
21+
const res = await ky.get('dht/provide', {
22+
timeout: options.timeout,
23+
signal: options.signal,
24+
headers: options.headers,
25+
searchParams
26+
})
2327

24-
// Validate CID(s) and serialize
25-
try {
26-
cids = cids.map(cid => new CID(cid).toBaseEncodedString('base58btc'))
27-
} catch (err) {
28-
return callback(err)
28+
for await (let message of ndjson(toIterable(res.body))) {
29+
message = toCamel(message)
30+
if (message.responses) {
31+
message.responses = message.responses.map(({ ID, Addrs }) => {
32+
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
33+
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
34+
return peerInfo
35+
})
36+
}
37+
yield message
2938
}
30-
31-
send({
32-
path: 'dht/provide',
33-
args: cids,
34-
qs: opts
35-
}, callback)
36-
})
37-
}
39+
})()
40+
})

0 commit comments

Comments
 (0)