Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Awesome DHT #86

Merged
merged 8 commits into from
Apr 6, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const WS = require('libp2p-websockets')
const spdy = require('libp2p-spdy')
const secio = require('libp2p-secio')
const MulticastDNS = require('libp2p-mdns')
const DHT = require('libp2p-dht')

class Node extends libp2p {
constructor (peerInfo, peerBook, options) {
Expand All @@ -95,7 +96,9 @@ class Node extends libp2p {
},
discovery: [
new MulticastDNS(peerInfo, 'your-identifier')
]
],
// DHT is passed as its own enabling PeerRouting, ContentRouting and DHT itself components
dht: new DHT()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing arguments

}

super(modules, peerInfo, peerBook, options)
Expand Down Expand Up @@ -144,6 +147,36 @@ class Node extends libp2p {

`callback` is a function with the following `function (err) {}` signature, where `err` is an Error in case stopping the node fails.

#### `libp2p.peerRouting.findPeer(id, callback)`

> Looks up for multiaddrs of a peer in the DHT

- `id`: instance of [PeerId][]

#### `libp2p.contentRouting.findProviders(key, timeout, callback)`

- `key`:
- `timeout`: Number miliseconds

#### `libp2p.contentRouting.provide(key, timeout, callback)`

- `key`:
- `timeout`: Number miliseconds

#### `libp2p.dht.put(key, value, callback)`

- `key`:
- `value`:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dignifiedquire can you clarify here what type can be key and value? If it is "String" and "Buffer" cause the implementation lets you to, let's instead direct the users to the things we want them to actually use the DHT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUFFER

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Update docs to say Buffer


#### `libp2p.dht.get(key, callback)`

- `key`:

#### `libp2p.dht.getMany(key, nVals, callback)`

- `key`:
- `nVals`: Number

#### `libp2p.handle(protocol, handlerFunc [, matchFunc])`

> Handle new protocol
Expand Down
159 changes: 114 additions & 45 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const assert = require('assert')

const setImmediate = require('async/setImmediate')
const each = require('async/each')
const series = require('async/series')

const Ping = require('libp2p-ping')
const Swarm = require('libp2p-swarm')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const mafmt = require('mafmt')
const PeerBook = require('peer-book')
const mafmt = require('mafmt')
const multiaddr = require('multiaddr')
const EventEmitter = require('events').EventEmitter
const assert = require('assert')
const Ping = require('libp2p-ping')
const setImmediate = require('async/setImmediate')

exports = module.exports

Expand Down Expand Up @@ -73,9 +77,49 @@ class Node extends EventEmitter {
// Mount default protocols
Ping.mount(this.swarm)

// Not fully implemented in js-libp2p yet
this.routing = undefined
this.records = undefined
// dht provided components (peerRouting, contentRouting, dht)
if (_modules.DHT) {
this._dht = new this.modules.DHT(this, 20, _options.DHT && _options.DHT.datastore)
}

this.peerRouting = {
findPeer: (id, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.findPeer(id, callback)
}
}

this.contentRouting = {
findProviders: (key, timeout, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.findProviders(key, timeout, callback)
},
provide: (key, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.provide(key, callback)
}
}

this.dht = {
put: (key, value, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.put(key, value, callback)
},
get: (key, callback) => {
assert(this._dht, 'DHT is not available')

this._dht.get(key, callback)
},
getMany (key, nVals, callback) {
assert(this._dht, 'DHT is not available')

this._dht.getMany(key, nVals, callback)
}
}
}

/*
Expand Down Expand Up @@ -117,24 +161,30 @@ class Node extends EventEmitter {
}
})

this.swarm.listen((err) => {
if (err) {
return callback(err)
series([
(cb) => this.swarm.listen(cb),
(cb) => {
// listeners on, libp2p is on
this.isOnline = true

if (ws) {
// always add dialing on websockets
this.swarm.transport.add(ws.tag || ws.constructor.name, ws)
}

// all transports need to be setup before discover starts
if (this.modules.discovery) {
return each(this.modules.discovery, (d, cb) => d.start(cb), cb)
}
cb()
},
(cb) => {
if (this._dht) {
return this._dht.start(cb)
}
cb()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transports (swarm) prep needs to happen first than discovery as discovery will trigger dials that use swarm.

if (ws) {
this.swarm.transport.add(ws.tag || ws.constructor.name, ws)
}

this.isOnline = true

if (this.modules.discovery) {
this.modules.discovery.forEach((discovery) => {
setImmediate(() => discovery.start(() => {}))
})
}

callback()
})
], callback)
}

/*
Expand All @@ -149,7 +199,15 @@ class Node extends EventEmitter {
})
}

this.swarm.close(callback)
series([
(cb) => {
if (this._dht) {
return this._dht.stop(cb)
}
cb()
},
(cb) => this.swarm.close(cb)
], callback)
}

isOn () {
Expand All @@ -158,8 +216,13 @@ class Node extends EventEmitter {

ping (peer, callback) {
assert(this.isOn(), OFFLINE_ERROR_MESSAGE)
const peerInfo = this._getPeerInfo(peer)
callback(null, new Ping(this.swarm, peerInfo))
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) {
return callback(err)
}

callback(null, new Ping(this.swarm, peerInfo))
})
}

dial (peer, protocol, callback) {
Expand All @@ -170,27 +233,31 @@ class Node extends EventEmitter {
protocol = undefined
}

let peerInfo
try {
peerInfo = this._getPeerInfo(peer)
} catch (err) {
return callback(err)
}

this.swarm.dial(peerInfo, protocol, (err, conn) => {
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) {
return callback(err)
}
this.peerBook.put(peerInfo)
callback(null, conn)

this.swarm.dial(peerInfo, protocol, (err, conn) => {
if (err) {
return callback(err)
}
this.peerBook.put(peerInfo)
callback(null, conn)
})
})
}

hangUp (peer, callback) {
assert(this.isOn(), OFFLINE_ERROR_MESSAGE)
const peerInfo = this._getPeerInfo(peer)

this.swarm.hangUp(peerInfo, callback)
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) {
return callback(err)
}

this.swarm.hangUp(peerInfo, callback)
})
}

handle (protocol, handlerFunc, matchFunc) {
Expand All @@ -204,10 +271,12 @@ class Node extends EventEmitter {
/*
* Helper method to check the data type of peer and convert it to PeerInfo
*/
_getPeerInfo (peer) {
_getPeerInfo (peer, callback) {
let p
// PeerInfo
if (PeerInfo.isPeerInfo(peer)) {
p = peer
// Multiaddr instance (not string)
} else if (multiaddr.isMultiaddr(peer)) {
const peerIdB58Str = peer.getPeerId()
try {
Expand All @@ -216,19 +285,19 @@ class Node extends EventEmitter {
p = new PeerInfo(PeerId.createFromB58String(peerIdB58Str))
}
p.multiaddrs.add(peer)
// PeerId
} else if (PeerId.isPeerId(peer)) {
const peerIdB58Str = peer.toB58String()
try {
p = this.peerBook.get(peerIdB58Str)
} catch (err) {
// TODO this is where PeerRouting comes into place
throw new Error('No knowledge about: ' + peerIdB58Str)
return this.peerRouting.findPeer(peer, callback)
}
} else {
throw new Error('peer type not recognized')
return setImmediate(() => callback(new Error('peer type not recognized')))
}

return p
setImmediate(() => callback(null, p))
}
}

Expand Down