diff --git a/package.json b/package.json index 6837318936..910325e8f2 100644 --- a/package.json +++ b/package.json @@ -63,7 +63,6 @@ "once": "^1.4.0", "p-queue": "^6.1.1", "p-settle": "^3.1.0", - "peer-book": "^0.9.1", "peer-id": "^0.13.3", "peer-info": "^0.17.0", "promisify-es6": "^1.0.3", diff --git a/src/index.js b/src/index.js index 4d2f31b2c1..df830918ae 100644 --- a/src/index.js +++ b/src/index.js @@ -11,7 +11,6 @@ const promisify = require('promisify-es6') const each = require('async/each') const nextTick = require('async/nextTick') -const PeerBook = require('peer-book') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') const Switch = require('./switch') @@ -29,6 +28,7 @@ const { codes } = require('./errors') const Dialer = require('./dialer') const TransportManager = require('./transport-manager') const Upgrader = require('./upgrader') +const PeerStore = require('./peer-store') const notStarted = (action, state) => { return errCode( @@ -54,7 +54,7 @@ class Libp2p extends EventEmitter { this.datastore = this._options.datastore this.peerInfo = this._options.peerInfo - this.peerBook = this._options.peerBook || new PeerBook() + this.peerStore = new PeerStore() this._modules = this._options.modules this._config = this._options.config @@ -62,13 +62,15 @@ class Libp2p extends EventEmitter { this._discovery = [] // Discovery service instances/references // create the switch, and listen for errors - this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch) + this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch) // Setup the Upgrader this.upgrader = new Upgrader({ localPeer: this.peerInfo.id, onConnection: (connection) => { const peerInfo = getPeerInfo(connection.remotePeer) + + this.peerStore.put(peerInfo) this.emit('peer:connect', peerInfo) }, onConnectionEnd: (connection) => { @@ -179,10 +181,10 @@ class Libp2p extends EventEmitter { // Once we start, emit and dial any peers we may have already discovered this.state.on('STARTED', () => { - this.peerBook.getAllArray().forEach((peerInfo) => { + for (const peerInfo of this.peerStore.peers) { this.emit('peer:discovery', peerInfo) this._maybeConnect(peerInfo) - }) + } }) this._peerDiscovered = this._peerDiscovered.bind(this) @@ -245,7 +247,7 @@ class Libp2p extends EventEmitter { /** * Dials to the provided peer. If successful, the `PeerInfo` of the - * peer will be added to the nodes `PeerBook` + * peer will be added to the nodes `peerStore` * * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial * @param {object} options @@ -258,7 +260,7 @@ class Libp2p extends EventEmitter { /** * Dials to the provided peer and handshakes with the given protocol. - * If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`, + * If successful, the `PeerInfo` of the peer will be added to the nodes `peerStore`, * and the `Connection` will be sent in the callback * * @async @@ -277,11 +279,19 @@ class Libp2p extends EventEmitter { connection = await this.dialer.connectToPeer(peer, options) } + const peerInfo = getPeerInfo(connection.remotePeer) + // If a protocol was provided, create a new stream if (protocols) { - return connection.newStream(protocols) + const stream = await connection.newStream(protocols) + + peerInfo.protocols.add(stream.protocol) + this.peerStore.put(peerInfo) + + return stream } + this.peerStore.put(peerInfo) return connection } @@ -369,12 +379,6 @@ class Libp2p extends EventEmitter { * the `peer:discovery` event. If auto dial is enabled for libp2p * and the current connection count is under the low watermark, the * peer will be dialed. - * - * TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345, - * it would be ideal if only new peers were emitted. Currently, with - * other modules adding peers to the `PeerBook` we have no way of knowing - * if a peer is new or not, so it has to be emitted. - * * @private * @param {PeerInfo} peerInfo */ @@ -383,7 +387,7 @@ class Libp2p extends EventEmitter { log.error(new Error(codes.ERR_DISCOVERED_SELF)) return } - peerInfo = this.peerBook.put(peerInfo) + peerInfo = this.peerStore.put(peerInfo) if (!this.isStarted()) return diff --git a/src/peer-store/README.md b/src/peer-store/README.md new file mode 100644 index 0000000000..d9d79fe56a --- /dev/null +++ b/src/peer-store/README.md @@ -0,0 +1,3 @@ +# Peerstore + +WIP \ No newline at end of file diff --git a/src/peer-store/index.js b/src/peer-store/index.js new file mode 100644 index 0000000000..1e91b926f0 --- /dev/null +++ b/src/peer-store/index.js @@ -0,0 +1,190 @@ +'use strict' + +const assert = require('assert') +const debug = require('debug') +const log = debug('libp2p:peer-store') +log.error = debug('libp2p:peer-store:error') + +const { EventEmitter } = require('events') + +const PeerInfo = require('peer-info') + +/** + * Responsible for managing known peers, as well as their addresses and metadata + * @fires PeerStore#peer Emitted when a peer is connected to this node + * @fires PeerStore#change:protocols + * @fires PeerStore#change:multiaddrs + */ +class PeerStore extends EventEmitter { + constructor () { + super() + + /** + * Map of peers + * + * @type {Map} + */ + this.peers = new Map() + + // TODO: Track ourselves. We should split `peerInfo` up into its pieces so we get better + // control and observability. This will be the initial step for removing PeerInfo + // https://github.com/libp2p/go-libp2p-core/blob/master/peerstore/peerstore.go + // this.addressBook = new Map() + // this.protoBook = new Map() + } + + /** + * Stores the peerInfo of a new peer. + * If already exist, its info is updated. + * @param {PeerInfo} peerInfo + */ + put (peerInfo) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + + // Already know the peer? + if (this.peers.has(peerInfo.id.toB58String())) { + this.update(peerInfo) + } else { + this.add(peerInfo) + + // Emit the new peer found + this.emit('peer', peerInfo) + } + } + + /** + * Add a new peer to the store. + * @param {PeerInfo} peerInfo + */ + add (peerInfo) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + + // Create new instance and add values to it + const newPeerInfo = new PeerInfo(peerInfo.id) + + peerInfo.multiaddrs.forEach((ma) => newPeerInfo.multiaddrs.add(ma)) + peerInfo.protocols.forEach((p) => newPeerInfo.protocols.add(p)) + + const connectedMa = peerInfo.isConnected() + connectedMa && newPeerInfo.connect(connectedMa) + + const peerProxy = new Proxy(newPeerInfo, { + set: (obj, prop, value) => { + if (prop === 'multiaddrs') { + this.emit('change:multiaddrs', { + peerInfo: obj, + multiaddrs: value.toArray() + }) + } else if (prop === 'protocols') { + this.emit('change:protocols', { + peerInfo: obj, + protocols: Array.from(value) + }) + } + return Reflect.set(...arguments) + } + }) + + this.peers.set(peerInfo.id.toB58String(), peerProxy) + } + + /** + * Updates an already known peer. + * @param {PeerInfo} peerInfo + */ + update (peerInfo) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + const id = peerInfo.id.toB58String() + const recorded = this.peers.get(id) + + // pass active connection state + const ma = peerInfo.isConnected() + if (ma) { + recorded.connect(ma) + } + + // Verify new multiaddrs + // TODO: better track added and removed multiaddrs + const multiaddrsIntersection = [ + ...recorded.multiaddrs.toArray() + ].filter((m) => peerInfo.multiaddrs.has(m)) + + if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size || + multiaddrsIntersection.length !== recorded.multiaddrs.size) { + // recorded.multiaddrs = peerInfo.multiaddrs + recorded.multiaddrs.clear() + + for (const ma of peerInfo.multiaddrs.toArray()) { + recorded.multiaddrs.add(ma) + } + + this.emit('change:multiaddrs', { + peerInfo: peerInfo, + multiaddrs: recorded.multiaddrs.toArray() + }) + } + + // Update protocols + // TODO: better track added and removed protocols + const protocolsIntersection = new Set( + [...recorded.protocols].filter((p) => peerInfo.protocols.has(p)) + ) + + if (protocolsIntersection.size !== peerInfo.protocols.size || + protocolsIntersection.size !== recorded.protocols.size) { + recorded.protocols.clear() + + for (const protocol of peerInfo.protocols) { + recorded.protocols.add(protocol) + } + + this.emit('change:protocols', { + peerInfo: peerInfo, + protocols: Array.from(recorded.protocols) + }) + } + + // Add the public key if missing + if (!recorded.id.pubKey && peerInfo.id.pubKey) { + recorded.id.pubKey = peerInfo.id.pubKey + } + } + + /** + * Get the info to the given id. + * @param {string} peerId b58str id + * @returns {PeerInfo} + */ + get (peerId) { + const peerInfo = this.peers.get(peerId) + + if (peerInfo) { + return peerInfo + } + + return undefined + } + + /** + * Removes the Peer with the matching `peerId` from the PeerStore + * @param {string} peerId b58str id + * @returns {boolean} true if found and removed + */ + remove (peerId) { + return this.peers.delete(peerId) + } + + /** + * Completely replaces the existing peers metadata with the given `peerInfo` + * @param {PeerInfo} peerInfo + * @returns {void} + */ + replace (peerInfo) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + + this.remove(peerInfo.id.toB58String()) + this.add(peerInfo) + } +} + +module.exports = PeerStore diff --git a/test/peer-store/peer-store.spec.js b/test/peer-store/peer-store.spec.js new file mode 100644 index 0000000000..8dfba557d3 --- /dev/null +++ b/test/peer-store/peer-store.spec.js @@ -0,0 +1,220 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') + +const pDefer = require('p-defer') +const mergeOptions = require('merge-options') + +const Libp2p = require('../../src') +const PeerStore = require('../../src/peer-store') +const multiaddr = require('multiaddr') + +const baseOptions = require('../utils/base-options') +const peerUtils = require('../utils/creators/peer') +const mockConnection = require('../utils/mockConnection') + +const addr = multiaddr('/ip4/127.0.0.1/tcp/8000') +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('peer-store', () => { + let peerStore + + beforeEach(() => { + peerStore = new PeerStore() + }) + + it('should add a new peer and emit it when it does not exist', async () => { + const defer = pDefer() + + sinon.spy(peerStore, 'put') + sinon.spy(peerStore, 'add') + sinon.spy(peerStore, 'update') + + const [peerInfo] = await peerUtils.createPeerInfo(1) + + peerStore.on('peer', (peer) => { + expect(peer).to.exist() + defer.resolve() + }) + peerStore.put(peerInfo) + + // Wait for peerStore to emit the peer + await defer.promise + + expect(peerStore.put.callCount).to.equal(1) + expect(peerStore.add.callCount).to.equal(1) + expect(peerStore.update.callCount).to.equal(0) + }) + + it('should update peer when it is already in the store', async () => { + const [peerInfo] = await peerUtils.createPeerInfo(1) + + // Put the peer in the store + peerStore.put(peerInfo) + + sinon.spy(peerStore, 'put') + sinon.spy(peerStore, 'add') + sinon.spy(peerStore, 'update') + + // When updating, peer event must not be emitted + peerStore.on('peer', () => { + throw new Error('should not emit twice') + }) + // If no multiaddrs change, the event should not be emitted + peerStore.on('change:multiaddrs', () => { + throw new Error('should not emit change:multiaddrs') + }) + // If no protocols change, the event should not be emitted + peerStore.on('change:protocols', () => { + throw new Error('should not emit change:protocols') + }) + + peerStore.put(peerInfo) + + expect(peerStore.put.callCount).to.equal(1) + expect(peerStore.add.callCount).to.equal(0) + expect(peerStore.update.callCount).to.equal(1) + }) + + it('should emit the "change:multiaddrs" event when a peer has new multiaddrs', async () => { + const defer = pDefer() + const [createdPeerInfo] = await peerUtils.createPeerInfo(1) + + // Put the peer in the store + peerStore.put(createdPeerInfo) + + // When updating, "change:multiaddrs" event must not be emitted + peerStore.on('change:multiaddrs', ({ peerInfo, multiaddrs }) => { + expect(peerInfo).to.exist() + expect(peerInfo.id).to.eql(createdPeerInfo.id) + expect(peerInfo.protocols).to.eql(createdPeerInfo.protocols) + expect(multiaddrs).to.exist() + expect(multiaddrs).to.eql(createdPeerInfo.multiaddrs.toArray()) + defer.resolve() + }) + // If no protocols change, the event should not be emitted + peerStore.on('change:protocols', () => { + throw new Error('should not emit change:protocols') + }) + + createdPeerInfo.multiaddrs.add(addr) + peerStore.put(createdPeerInfo) + + // Wait for peerStore to emit the event + await defer.promise + }) + + it('should emit the "change:protocols" event when a peer has new protocols', async () => { + const defer = pDefer() + const [createdPeerInfo] = await peerUtils.createPeerInfo(1) + + // Put the peer in the store + peerStore.put(createdPeerInfo) + + // If no multiaddrs change, the event should not be emitted + peerStore.on('change:multiaddrs', () => { + throw new Error('should not emit change:multiaddrs') + }) + // When updating, "change:protocols" event must be emitted + peerStore.on('change:protocols', ({ peerInfo, protocols }) => { + expect(peerInfo).to.exist() + expect(peerInfo.id).to.eql(createdPeerInfo.id) + expect(peerInfo.multiaddrs).to.eql(createdPeerInfo.multiaddrs) + expect(protocols).to.exist() + expect(protocols).to.eql(Array.from(createdPeerInfo.protocols)) + defer.resolve() + }) + + createdPeerInfo.protocols.add('/new-protocol/1.0.0') + peerStore.put(createdPeerInfo) + + // Wait for peerStore to emit the event + await defer.promise + }) + + it('should be able to retrieve a peer from store through its b58str id', async () => { + const [peerInfo] = await peerUtils.createPeerInfo(1) + const id = peerInfo.id.toB58String() + + let retrievedPeer = peerStore.get(id) + expect(retrievedPeer).to.not.exist() + + // Put the peer in the store + peerStore.put(peerInfo) + + retrievedPeer = peerStore.get(id) + expect(retrievedPeer).to.exist() + expect(retrievedPeer.id).to.equal(peerInfo.id) + expect(retrievedPeer.multiaddrs).to.eql(peerInfo.multiaddrs) + expect(retrievedPeer.protocols).to.eql(peerInfo.protocols) + }) + + it('should be able to remove a peer from store through its b58str id', async () => { + const [peerInfo] = await peerUtils.createPeerInfo(1) + const id = peerInfo.id.toB58String() + + let removed = peerStore.remove(id) + expect(removed).to.eql(false) + + // Put the peer in the store + peerStore.put(peerInfo) + expect(peerStore.peers.size).to.equal(1) + + removed = peerStore.remove(id) + expect(removed).to.eql(true) + expect(peerStore.peers.size).to.equal(0) + }) +}) + +describe('peer-store on dial', () => { + let peerInfo + let remotePeerInfo + let libp2p + let remoteLibp2p + + before(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo: remotePeerInfo + })) + }) + + after(async () => { + sinon.restore() + await remoteLibp2p.stop() + libp2p && await libp2p.stop() + }) + + it('should put the remote peerInfo after dial and emit event', async () => { + const remoteId = remotePeerInfo.id.toB58String() + + libp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo + })) + + sinon.spy(libp2p.peerStore, 'put') + sinon.spy(libp2p.peerStore, 'add') + sinon.spy(libp2p.peerStore, 'update') + sinon.stub(libp2p.dialer, 'connectToMultiaddr').returns(mockConnection({ + remotePeer: remotePeerInfo.id + })) + + const connection = await libp2p.dial(listenAddr) + await connection.close() + + expect(libp2p.peerStore.put.callCount).to.equal(1) + expect(libp2p.peerStore.add.callCount).to.equal(1) + expect(libp2p.peerStore.update.callCount).to.equal(0) + + const storedPeer = libp2p.peerStore.get(remoteId) + expect(storedPeer).to.exist() + }) +}) + +describe('peer-store on discovery', () => { + // TODO: implement with discovery +}) diff --git a/test/utils/base-options.js b/test/utils/base-options.js new file mode 100644 index 0000000000..038df59219 --- /dev/null +++ b/test/utils/base-options.js @@ -0,0 +1,13 @@ +'use strict' + +const Transport = require('libp2p-tcp') +const Muxer = require('libp2p-mplex') +const mockCrypto = require('../utils/mockCrypto') + +module.exports = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [mockCrypto] + } +} diff --git a/test/utils/creators/peer.js b/test/utils/creators/peer.js new file mode 100644 index 0000000000..e39e52e46d --- /dev/null +++ b/test/utils/creators/peer.js @@ -0,0 +1,24 @@ +'use strict' + +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') + +const Peers = require('../../fixtures/peers') + +module.exports.createPeerInfo = async (length) => { + const peers = await Promise.all( + Array.from({ length }) + .map((_, i) => PeerId.create()) + ) + + return peers.map((peer) => new PeerInfo(peer)) +} + +module.exports.createPeerInfoFromFixture = async (length) => { + const peers = await Promise.all( + Array.from({ length }) + .map((_, i) => PeerId.createFromJSON(Peers[i])) + ) + + return peers.map((peer) => new PeerInfo(peer)) +} diff --git a/test/utils/mockConnection.js b/test/utils/mockConnection.js new file mode 100644 index 0000000000..f3ce885884 --- /dev/null +++ b/test/utils/mockConnection.js @@ -0,0 +1,50 @@ +'use strict' + +const { Connection } = require('libp2p-interfaces/src/connection') +const multiaddr = require('multiaddr') + +const pair = require('it-pair') + +const peerUtils = require('./creators/peer') + +module.exports = async (properties = {}) => { + const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080') + const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081') + + const [localPeer, remotePeer] = await peerUtils.createPeerInfoFromFixture(2) + const openStreams = [] + let streamId = 0 + + return new Connection({ + localPeer: localPeer.id, + remotePeer: remotePeer.id, + localAddr, + remoteAddr, + stat: { + timeline: { + open: Date.now() - 10, + upgraded: Date.now() + }, + direction: 'outbound', + encryption: '/secio/1.0.0', + multiplexer: '/mplex/6.7.0' + }, + newStream: (protocols) => { + const id = streamId++ + const stream = pair() + + stream.close = () => stream.sink([]) + stream.id = id + + openStreams.push(stream) + + return { + stream, + protocol: protocols[0] + } + }, + close: () => { }, + getStreams: () => openStreams, + ...properties + }) +}