Skip to content

Commit

Permalink
feat: peer store (#470)
Browse files Browse the repository at this point in the history
* feat: peer-store v0

* chore: apply suggestions from code review

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
  • Loading branch information
vasco-santos and jacobheun committed Dec 12, 2019
1 parent 138bb0b commit f3e276e
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 16 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
"once": "^1.4.0",
"p-queue": "^6.1.1",
"p-settle": "^3.1.0",
"peer-book": "^0.9.1",
"peer-id": "^0.13.3",
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
Expand Down
34 changes: 19 additions & 15 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const promisify = require('promisify-es6')
const each = require('async/each')
const nextTick = require('async/nextTick')

const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
Expand All @@ -29,6 +28,7 @@ const { codes } = require('./errors')
const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')

const notStarted = (action, state) => {
return errCode(
Expand All @@ -54,21 +54,23 @@ class Libp2p extends EventEmitter {

this.datastore = this._options.datastore
this.peerInfo = this._options.peerInfo
this.peerBook = this._options.peerBook || new PeerBook()
this.peerStore = new PeerStore()

this._modules = this._options.modules
this._config = this._options.config
this._transport = [] // Transport instances/references
this._discovery = [] // Discovery service instances/references

// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)

// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)

this.peerStore.put(peerInfo)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
Expand Down Expand Up @@ -179,10 +181,10 @@ class Libp2p extends EventEmitter {

// Once we start, emit and dial any peers we may have already discovered
this.state.on('STARTED', () => {
this.peerBook.getAllArray().forEach((peerInfo) => {
for (const peerInfo of this.peerStore.peers) {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})
}
})

this._peerDiscovered = this._peerDiscovered.bind(this)
Expand Down Expand Up @@ -245,7 +247,7 @@ class Libp2p extends EventEmitter {

/**
* Dials to the provided peer. If successful, the `PeerInfo` of the
* peer will be added to the nodes `PeerBook`
* peer will be added to the nodes `peerStore`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {object} options
Expand All @@ -258,7 +260,7 @@ class Libp2p extends EventEmitter {

/**
* Dials to the provided peer and handshakes with the given protocol.
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
* If successful, the `PeerInfo` of the peer will be added to the nodes `peerStore`,
* and the `Connection` will be sent in the callback
*
* @async
Expand All @@ -277,11 +279,19 @@ class Libp2p extends EventEmitter {
connection = await this.dialer.connectToPeer(peer, options)
}

const peerInfo = getPeerInfo(connection.remotePeer)

// If a protocol was provided, create a new stream
if (protocols) {
return connection.newStream(protocols)
const stream = await connection.newStream(protocols)

peerInfo.protocols.add(stream.protocol)
this.peerStore.put(peerInfo)

return stream
}

this.peerStore.put(peerInfo)
return connection
}

Expand Down Expand Up @@ -369,12 +379,6 @@ class Libp2p extends EventEmitter {
* the `peer:discovery` event. If auto dial is enabled for libp2p
* and the current connection count is under the low watermark, the
* peer will be dialed.
*
* TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
* it would be ideal if only new peers were emitted. Currently, with
* other modules adding peers to the `PeerBook` we have no way of knowing
* if a peer is new or not, so it has to be emitted.
*
* @private
* @param {PeerInfo} peerInfo
*/
Expand All @@ -383,7 +387,7 @@ class Libp2p extends EventEmitter {
log.error(new Error(codes.ERR_DISCOVERED_SELF))
return
}
peerInfo = this.peerBook.put(peerInfo)
peerInfo = this.peerStore.put(peerInfo)

if (!this.isStarted()) return

Expand Down
3 changes: 3 additions & 0 deletions src/peer-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Peerstore

WIP
190 changes: 190 additions & 0 deletions src/peer-store/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
'use strict'

const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')

const { EventEmitter } = require('events')

const PeerInfo = require('peer-info')

/**
* Responsible for managing known peers, as well as their addresses and metadata
* @fires PeerStore#peer Emitted when a peer is connected to this node
* @fires PeerStore#change:protocols
* @fires PeerStore#change:multiaddrs
*/
class PeerStore extends EventEmitter {
constructor () {
super()

/**
* Map of peers
*
* @type {Map<string, PeerInfo>}
*/
this.peers = new Map()

// TODO: Track ourselves. We should split `peerInfo` up into its pieces so we get better
// control and observability. This will be the initial step for removing PeerInfo
// https://github.com/libp2p/go-libp2p-core/blob/master/peerstore/peerstore.go
// this.addressBook = new Map()
// this.protoBook = new Map()
}

/**
* Stores the peerInfo of a new peer.
* If already exist, its info is updated.
* @param {PeerInfo} peerInfo
*/
put (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
this.update(peerInfo)
} else {
this.add(peerInfo)

// Emit the new peer found
this.emit('peer', peerInfo)
}
}

/**
* Add a new peer to the store.
* @param {PeerInfo} peerInfo
*/
add (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

// Create new instance and add values to it
const newPeerInfo = new PeerInfo(peerInfo.id)

peerInfo.multiaddrs.forEach((ma) => newPeerInfo.multiaddrs.add(ma))
peerInfo.protocols.forEach((p) => newPeerInfo.protocols.add(p))

const connectedMa = peerInfo.isConnected()
connectedMa && newPeerInfo.connect(connectedMa)

const peerProxy = new Proxy(newPeerInfo, {
set: (obj, prop, value) => {
if (prop === 'multiaddrs') {
this.emit('change:multiaddrs', {
peerInfo: obj,
multiaddrs: value.toArray()
})
} else if (prop === 'protocols') {
this.emit('change:protocols', {
peerInfo: obj,
protocols: Array.from(value)
})
}
return Reflect.set(...arguments)
}
})

this.peers.set(peerInfo.id.toB58String(), peerProxy)
}

/**
* Updates an already known peer.
* @param {PeerInfo} peerInfo
*/
update (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
const id = peerInfo.id.toB58String()
const recorded = this.peers.get(id)

// pass active connection state
const ma = peerInfo.isConnected()
if (ma) {
recorded.connect(ma)
}

// Verify new multiaddrs
// TODO: better track added and removed multiaddrs
const multiaddrsIntersection = [
...recorded.multiaddrs.toArray()
].filter((m) => peerInfo.multiaddrs.has(m))

if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size ||
multiaddrsIntersection.length !== recorded.multiaddrs.size) {
// recorded.multiaddrs = peerInfo.multiaddrs
recorded.multiaddrs.clear()

for (const ma of peerInfo.multiaddrs.toArray()) {
recorded.multiaddrs.add(ma)
}

this.emit('change:multiaddrs', {
peerInfo: peerInfo,
multiaddrs: recorded.multiaddrs.toArray()
})
}

// Update protocols
// TODO: better track added and removed protocols
const protocolsIntersection = new Set(
[...recorded.protocols].filter((p) => peerInfo.protocols.has(p))
)

if (protocolsIntersection.size !== peerInfo.protocols.size ||
protocolsIntersection.size !== recorded.protocols.size) {
recorded.protocols.clear()

for (const protocol of peerInfo.protocols) {
recorded.protocols.add(protocol)
}

this.emit('change:protocols', {
peerInfo: peerInfo,
protocols: Array.from(recorded.protocols)
})
}

// Add the public key if missing
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
recorded.id.pubKey = peerInfo.id.pubKey
}
}

/**
* Get the info to the given id.
* @param {string} peerId b58str id
* @returns {PeerInfo}
*/
get (peerId) {
const peerInfo = this.peers.get(peerId)

if (peerInfo) {
return peerInfo
}

return undefined
}

/**
* Removes the Peer with the matching `peerId` from the PeerStore
* @param {string} peerId b58str id
* @returns {boolean} true if found and removed
*/
remove (peerId) {
return this.peers.delete(peerId)
}

/**
* Completely replaces the existing peers metadata with the given `peerInfo`
* @param {PeerInfo} peerInfo
* @returns {void}
*/
replace (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

this.remove(peerInfo.id.toB58String())
this.add(peerInfo)
}
}

module.exports = PeerStore
Loading

0 comments on commit f3e276e

Please sign in to comment.