Skip to content

Commit 02a55e2

Browse files
wip: dht integration
1 parent 19a7f6d commit 02a55e2

File tree

3 files changed

+53
-24
lines changed

3 files changed

+53
-24
lines changed

src/components/network/index.js

+33-23
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
const debug = require('debug')
44
const lp = require('pull-length-prefixed')
55
const pull = require('pull-stream')
6-
const setImmediate = require('async/setImmediate')
6+
const waterfall = require('async/waterfall')
7+
const each = require('async/each')
78

89
const Message = require('../../types/message')
910
const CONSTANTS = require('../../constants')
@@ -99,24 +100,39 @@ class Network {
99100
this.bitswap._onPeerDisconnected(peerInfo.id)
100101
}
101102

102-
// Connect to the given peer
103103
connectTo (peerId, callback) {
104-
const done = (err) => setImmediate(() => callback(err))
105-
106104
if (!this._running) {
107-
return done(new Error('No running network'))
105+
return callback(new Error('No running network'))
108106
}
109107

110-
// NOTE: For now, all this does is ensure that we are
111-
// connected. Once we have Peer Routing, we will be able
112-
// to find the Peer
113-
if (this.libp2p.swarm.muxedConns[peerId.toB58String()]) {
114-
done()
115-
} else {
116-
done(new Error('Could not connect to peer with peerId:', peerId.toB58String()))
117-
}
108+
this.libp2p.dial(peerId, (err, conn) => {
109+
if (err) {
110+
return callback(err)
111+
}
112+
113+
pull(pull.empty, conn)
114+
callback()
115+
})
116+
}
117+
118+
findProviders (cid, maxProviders, callback) {
119+
this.libp2p.dht.findNProviders(cid, CONSTANTS.providerRequestTimeout, maxProviders, callback)
120+
}
121+
122+
findAndConnect (cid, maxProviders, callback) {
123+
waterfall([
124+
(cb) => this.findProviders(cid, maxProviders, cb),
125+
(provs, cb) => each(provs, (p, cb) => {
126+
this.connectTo(p, cb)
127+
})
128+
], callback)
129+
}
130+
131+
provide (cid, callback) {
132+
this.libp2p.dht.provide()
118133
}
119134

135+
// Connect to the given peer
120136
// Send the given msg (instance of Message) to the given peer
121137
sendMessage (peerId, msg, callback) {
122138
if (!this._running) {
@@ -125,14 +141,8 @@ class Network {
125141

126142
const stringId = peerId.toB58String()
127143
log('sendMessage to %s', stringId, msg)
128-
let peerInfo
129-
try {
130-
peerInfo = this.peerBook.get(stringId)
131-
} catch (err) {
132-
return callback(err)
133-
}
134144

135-
this._dialPeer(peerInfo, (err, conn, protocol) => {
145+
this._dialPeer(peerId, (err, conn, protocol) => {
136146
if (err) {
137147
return callback(err)
138148
}
@@ -157,14 +167,14 @@ class Network {
157167
})
158168
}
159169

160-
_dialPeer (peerInfo, callback) {
170+
_dialPeer (peer, callback) {
161171
// dialByPeerInfo throws if no network is there
162172
try {
163173
// Attempt Bitswap 1.1.0
164-
this.libp2p.dial(peerInfo, BITSWAP110, (err, conn) => {
174+
this.libp2p.dial(peer, BITSWAP110, (err, conn) => {
165175
if (err) {
166176
// Attempt Bitswap 1.0.0
167-
this.libp2p.dial(peerInfo, BITSWAP100, (err, conn) => {
177+
this.libp2p.dial(peer, BITSWAP100, (err, conn) => {
168178
if (err) {
169179
return callback(err)
170180
}

src/components/want-manager/msg-queue.js

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ module.exports = class MsgQueue {
5353
log.error('cant connect to peer %s: %s', this.peerId.toB58String(), err.message)
5454
return
5555
}
56+
5657
log('sending message')
5758
this.network.sendMessage(this.peerId, msg, (err) => {
5859
if (err) {

src/index.js

+19-1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ class Bitswap {
126126
`block:${block.cid.buffer.toString()}`,
127127
block
128128
)
129+
this.network.provide(block.cid, (err) => {
130+
if (err) {
131+
log.error('Failed to provide: %s', err.message)
132+
}
133+
})
134+
129135
this.engine.receivedBlocks([block.cid])
130136
callback()
131137
})
@@ -198,7 +204,14 @@ class Bitswap {
198204
}
199205

200206
addListener()
201-
this.wm.wantBlocks([cid])
207+
208+
this.network.findAndConnect(cid, CONSTANTS.maxProvidersPerRequest, (err) => {
209+
if (err) {
210+
return callback(err)
211+
}
212+
213+
this.wm.wantBlocks([cid])
214+
})
202215
})
203216
}
204217

@@ -269,6 +282,11 @@ class Bitswap {
269282
block
270283
)
271284
this.engine.receivedBlocks([block.cid])
285+
this.network.provide(block.cid, (err) => {
286+
if (err) {
287+
log.error('Failed to provide: %s', err.message)
288+
}
289+
})
272290
})
273291
cb()
274292
})

0 commit comments

Comments
 (0)