diff --git a/.github/workflows/typecheck.yml b/.github/workflows/typecheck.yml index bab8a60e..43e6ef83 100644 --- a/.github/workflows/typecheck.yml +++ b/.github/workflows/typecheck.yml @@ -12,15 +12,12 @@ name: Typecheck jobs: check: runs-on: ubuntu-latest - strategy: - matrix: - node-version: [14.x] steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v1 + uses: actions/setup-node@v2 with: - node-version: ${{ matrix.node-version }} + node-version: 16 - name: Install dependencies run: npm install - name: Typecheck diff --git a/package.json b/package.json index 776bbaf2..4ca715d1 100644 --- a/package.json +++ b/package.json @@ -66,13 +66,14 @@ "aegir": "^36.0.2", "assert": "^2.0.0", "benchmark": "^2.1.4", + "datastore-core": "^6.0.7", "delay": "^5.0.0", "interface-datastore": "^6.0.2", "iso-random-stream": "^2.0.0", "it-all": "^1.0.5", "it-drain": "^1.0.4", - "libp2p": "^0.34.0", - "libp2p-kad-dht": "^0.27.1", + "libp2p": "libp2p/js-libp2p#feat/async-peerstore", + "libp2p-kad-dht": "^0.28.4", "libp2p-mplex": "^0.10.2", "libp2p-tcp": "^0.17.1", "lodash.difference": "^4.5.0", @@ -104,7 +105,7 @@ "it-length-prefixed": "^5.0.2", "it-pipe": "^1.1.0", "just-debounce-it": "^1.1.0", - "libp2p-interfaces": "^2.0.1", + "libp2p-interfaces": "^4.0.0", "multiaddr": "^10.0.0", "multiformats": "^9.0.4", "native-abort-controller": "^1.0.3", diff --git a/src/bitswap.js b/src/bitswap.js index 0527b682..a4756347 100644 --- a/src/bitswap.js +++ b/src/bitswap.js @@ -61,7 +61,7 @@ export class Bitswap extends BaseBlockstore { this._options = Object.assign({}, defaultOptions, options) // stats - this._stats = new Stats(statsKeys, { + this._stats = new Stats(libp2p, statsKeys, { enabled: this._options.statsEnabled, computeThrottleTimeout: this._options.statsComputeThrottleTimeout, computeThrottleMaxQueueSize: this._options.statsComputeThrottleMaxQueueSize @@ -75,10 +75,10 @@ export class Bitswap extends BaseBlockstore { // local database this.blockstore = blockstore - this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats) + this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats, libp2p) // handle message sending - this.wm = new WantManager(this.peerId, this.network, this._stats) + this.wm = new WantManager(this.peerId, this.network, this._stats, libp2p) this.notifications = new Notifications(this.peerId) diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index d4ff73fb..505888e0 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -6,6 +6,7 @@ import { Ledger } from './ledger.js' import { RequestQueue } from './req-queue.js' import { TaskMerger } from './task-merger.js' import { logger } from '../utils/index.js' +import trackedMap from 'libp2p/src/metrics/tracked-map.js' /** * @typedef {import('../message/entry').BitswapMessageEntry} BitswapMessageEntry @@ -35,11 +36,12 @@ export class DecisionEngine { * @param {import('interface-blockstore').Blockstore} blockstore * @param {import('../network').Network} network * @param {import('../stats').Stats} stats + * @param {import('libp2p')} libp2p * @param {Object} [opts] * @param {number} [opts.targetMessageSize] * @param {number} [opts.maxSizeReplaceHasWithBlock] */ - constructor (peerId, blockstore, network, stats, opts = {}) { + constructor (peerId, blockstore, network, stats, libp2p, opts = {}) { this._log = logger(peerId, 'engine') this.blockstore = blockstore this.network = network @@ -48,7 +50,12 @@ export class DecisionEngine { // A list of of ledgers by their partner id /** @type {Map} */ - this.ledgerMap = new Map() + this.ledgerMap = trackedMap({ + system: 'ipfs', + component: 'bitswap', + metric: 'ledger-map', + metrics: libp2p.metrics + }) this._running = false // Queue of want-have / want-block per peer @@ -455,16 +462,10 @@ export class DecisionEngine { /** * - * @param {PeerId} _peerId - * @returns {void} + * @param {PeerId} peerId */ - peerDisconnected (_peerId) { - // if (this.ledgerMap.has(peerId.toB58String())) { - // this.ledgerMap.delete(peerId.toB58String()) - // } - // - // TODO: figure out how to remove all other references - // in the peer request queue + peerDisconnected (peerId) { + this.ledgerMap.delete(peerId.toB58String()) } /** diff --git a/src/network.js b/src/network.js index 6c881df3..2072c55b 100644 --- a/src/network.js +++ b/src/network.js @@ -57,9 +57,9 @@ export class Network { this._hashLoader = options.hashLoader } - start () { + async start () { this._running = true - this._libp2p.handle(this._protocols, this._onConnection) + await this._libp2p.handle(this._protocols, this._onConnection) // register protocol with topology const topology = new MulticodecTopology({ @@ -69,21 +69,21 @@ export class Network { onDisconnect: this._onPeerDisconnect } }) - this._registrarId = this._libp2p.registrar.register(topology) + this._registrarId = await this._libp2p.registrar.register(topology) // All existing connections are like new ones for us - for (const peer of this._libp2p.peerStore.peers.values()) { + for await (const peer of this._libp2p.peerStore.getPeers()) { const conn = this._libp2p.connectionManager.get(peer.id) conn && this._onPeerConnect(conn.remotePeer) } } - stop () { + async stop () { this._running = false // Unhandle both, libp2p doesn't care if it's not already handled - this._libp2p.unhandle(this._protocols) + await this._libp2p.unhandle(this._protocols) // unregister protocol and handlers if (this._registrarId != null) { diff --git a/src/stats/index.js b/src/stats/index.js index 2f53d743..bd3d6e71 100644 --- a/src/stats/index.js +++ b/src/stats/index.js @@ -1,5 +1,6 @@ import { EventEmitter } from 'events' import { Stat } from './stat.js' +import trackedMap from 'libp2p/src/metrics/tracked-map.js' /** * @typedef {import('multiformats').CID} CID @@ -22,13 +23,14 @@ const defaultOptions = { export class Stats extends EventEmitter { /** + * @param {import('libp2p')} libp2p * @param {string[]} [initialCounters] * @param {Object} _options * @param {boolean} _options.enabled * @param {number} _options.computeThrottleTimeout * @param {number} _options.computeThrottleMaxQueueSize */ - constructor (initialCounters = [], _options = defaultOptions) { + constructor (libp2p, initialCounters = [], _options = defaultOptions) { super() const options = Object.assign({}, defaultOptions, _options) @@ -49,7 +51,12 @@ export class Stats extends EventEmitter { this._global.on('update', (stats) => this.emit('update', stats)) /** @type {Map} */ - this._peers = new Map() + this._peers = trackedMap({ + system: 'ipfs', + component: 'bitswap', + metric: 'stats-peers', + metrics: libp2p.metrics + }) } enable () { diff --git a/src/want-manager/index.js b/src/want-manager/index.js index 110b1fdd..7c0e447d 100644 --- a/src/want-manager/index.js +++ b/src/want-manager/index.js @@ -5,6 +5,7 @@ import * as CONSTANTS from '../constants.js' import { MsgQueue } from './msg-queue.js' import { logger } from '../utils/index.js' import { base58btc } from 'multiformats/bases/base58' +import trackedMap from 'libp2p/src/metrics/tracked-map.js' /** * @typedef {import('peer-id')} PeerId @@ -16,11 +17,17 @@ export class WantManager { * @param {PeerId} peerId * @param {import('../network').Network} network * @param {import('../stats').Stats} stats + * @param {import('libp2p')} libp2p */ - constructor (peerId, network, stats) { + constructor (peerId, network, stats, libp2p) { /** @type {Map} */ - this.peers = new Map() - this.wantlist = new Wantlist(stats) + this.peers = trackedMap({ + system: 'ipfs', + component: 'bitswap', + metric: 'want-manager-peers', + metrics: libp2p.metrics + }) + this.wantlist = new Wantlist(stats, libp2p) this.network = network this._stats = stats diff --git a/src/wantlist/index.js b/src/wantlist/index.js index 0daa6096..fbfcb659 100644 --- a/src/wantlist/index.js +++ b/src/wantlist/index.js @@ -2,6 +2,7 @@ import { WantListEntry as Entry } from './entry.js' import { base58btc } from 'multiformats/bases/base58' import { Message } from '../message/message.js' +import trackedMap from 'libp2p/src/metrics/tracked-map.js' const WantType = { Block: Message.Wantlist.WantType.Block, @@ -28,12 +29,19 @@ const sortBy = (fn, list) => { export class Wantlist { /** - * * @param {import('../stats').Stats} [stats] + * @param {import('libp2p')} [libp2p] */ - constructor (stats) { + constructor (stats, libp2p) { /** @type {Map} */ - this.set = new Map() + this.set = libp2p + ? trackedMap({ + system: 'ipfs', + component: 'bitswap', + metric: 'wantlist', + metrics: libp2p.metrics + }) + : new Map() this._stats = stats } diff --git a/test/decision-engine/decision-engine.js b/test/decision-engine/decision-engine.js index 110dc3e2..602e0452 100644 --- a/test/decision-engine/decision-engine.js +++ b/test/decision-engine/decision-engine.js @@ -55,7 +55,8 @@ function stringifyMessages (messages) { */ async function newEngine (network) { const peerId = await PeerId.create({ bits: 512 }) - const engine = new DecisionEngine(peerId, new MemoryBlockstore(), network, new Stats()) + // @ts-expect-error {} is not a real libp2p + const engine = new DecisionEngine(peerId, new MemoryBlockstore(), network, new Stats({}), {}) engine.start() return { peer: peerId, engine: engine } } @@ -194,7 +195,8 @@ describe('Engine', () => { }) const id = await PeerId.create({ bits: 512 }) const blockstore = new MemoryBlockstore() - const dEngine = new DecisionEngine(id, blockstore, network, new Stats()) + // @ts-expect-error {} is not a real libp2p + const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}) dEngine.start() // Send wants then cancels for some of the wants @@ -289,7 +291,8 @@ describe('Engine', () => { } }) - const dEngine = new DecisionEngine(id, blockstore, network, new Stats()) + // @ts-expect-error {} is not a real libp2p + const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}) dEngine.start() // Each peer requests all blocks @@ -313,7 +316,8 @@ describe('Engine', () => { const deferred = defer() const network = mockNetwork(blocks.length, undefined, (peer, msg) => deferred.resolve([peer, msg])) const blockstore = new MemoryBlockstore() - const dEngine = new DecisionEngine(id, blockstore, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 }) + // @ts-expect-error {} is not a real libp2p + const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}, { maxSizeReplaceHasWithBlock: 0 }) dEngine.start() const message = new Message(false) @@ -356,7 +360,8 @@ describe('Engine', () => { onMsg([peerId, message]) }) const blockstore = new MemoryBlockstore() - const dEngine = new DecisionEngine(id, blockstore, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 }) + // @ts-expect-error {} is not a real libp2p + const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}, { maxSizeReplaceHasWithBlock: 0 }) dEngine.start() const message = new Message(false) @@ -702,7 +707,8 @@ describe('Engine', () => { const blockstore = new MemoryBlockstore() await drain(blockstore.putMany(blocks.map(({ cid, data }) => ({ key: cid, value: data })))) - const dEngine = new DecisionEngine(id, blockstore, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 }) + // @ts-expect-error {} is not a real libp2p + const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}, { maxSizeReplaceHasWithBlock: 0 }) dEngine._scheduleProcessTasks = () => {} dEngine.start() diff --git a/test/network/network.node.js b/test/network/network.node.js index fe9e00ec..646fb9dd 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -68,10 +68,13 @@ describe('network', () => { bitswapMockB = createBitswapMock() bitswapMockC = createBitswapMock() - networkA = new Network(p2pA, bitswapMockA, new Stats()) - networkB = new Network(p2pB, bitswapMockB, new Stats()) + // @ts-expect-error {} is not a real libp2p + networkA = new Network(p2pA, bitswapMockA, new Stats({})) + // @ts-expect-error {} is not a real libp2p + networkB = new Network(p2pB, bitswapMockB, new Stats({})) // only bitswap100 - networkC = new Network(p2pC, bitswapMockC, new Stats(), { b100Only: true }) + // @ts-expect-error {} is not a real libp2p + networkC = new Network(p2pC, bitswapMockC, new Stats({}), { b100Only: true }) networkA.start() networkB.start() @@ -213,7 +216,7 @@ describe('network', () => { // In a real network scenario, peers will be discovered and their addresses // will be added to the addressBook before bitswap kicks in - p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs) + await p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs) bitswapMockB._receiveMessage = async (peerId, msgReceived) => { // eslint-disable-line require-await // cannot do deep comparison on objects as one has Buffers and one has Uint8Arrays @@ -249,7 +252,7 @@ describe('network', () => { // In a real network scenario, peers will be discovered and their addresses // will be added to the addressBook before bitswap kicks in - p2pA.peerStore.addressBook.set(p2pC.peerId, p2pC.multiaddrs) + await p2pA.peerStore.addressBook.set(p2pC.peerId, p2pC.multiaddrs) bitswapMockC._receiveMessage = async (peerId, msgReceived) => { // eslint-disable-line require-await // cannot do deep comparison on objects as one has Buffers and one has Uint8Arrays @@ -267,10 +270,12 @@ describe('network', () => { }) it('dials to peer using Bitswap 1.2.0', async () => { - networkA = new Network(p2pA, bitswapMockA, new Stats()) + // @ts-expect-error {} is not a real libp2p + networkA = new Network(p2pA, bitswapMockA, new Stats({})) // only supports 1.2.0 - networkB = new Network(p2pB, bitswapMockB, new Stats()) + // @ts-expect-error {} is not a real libp2p + networkB = new Network(p2pB, bitswapMockB, new Stats({})) networkB._protocols = ['/ipfs/bitswap/1.2.0'] networkA.start() @@ -278,7 +283,7 @@ describe('network', () => { // In a real network scenario, peers will be discovered and their addresses // will be added to the addressBook before bitswap kicks in - p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs) + await p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs) const deferred = pDefer() @@ -297,23 +302,23 @@ describe('network', () => { /** @type {Libp2p} */ const libp2p = { - // @ts-ignore incomplete implementation + // @ts-expect-error incomplete implementation contentRouting: { findProviders: mockFindProviders }, - // @ts-ignore incomplete implementation + // @ts-expect-error incomplete implementation registrar: { register: sinon.stub() }, - // @ts-ignore incomplete implementation peerStore: { + // @ts-expect-error {} incomplete implementation peers: new Map() }, dial: mockDial, handle: sinon.stub() } - const network = new Network(libp2p, bitswapMockA, new Stats()) + const network = new Network(libp2p, bitswapMockA, new Stats(libp2p)) const cid = CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn') const provider1 = { diff --git a/test/utils/mocks.js b/test/utils/mocks.js index 6801f7b7..139cdba8 100644 --- a/test/utils/mocks.js +++ b/test/utils/mocks.js @@ -3,6 +3,7 @@ import PeerStore from 'libp2p/src/peer-store/index.js' import { Node } from './create-libp2p-node.js' import { MemoryBlockstore } from 'blockstore-core/memory' import { EventEmitter } from 'events' +import { MemoryDatastore } from 'datastore-core/memory' import { Bitswap } from '../../src/bitswap.js' import { Network } from '../../src/network.js' import { Stats } from '../../src/stats/index.js' @@ -50,7 +51,7 @@ export const mockLibp2pNode = () => { swarm: { setMaxListeners () {} }, - peerStore: new PeerStore({ peerId }) + peerStore: new PeerStore({ peerId, datastore: new MemoryDatastore() }) }) } @@ -84,7 +85,7 @@ export const mockNetwork = (calls = Infinity, done = () => {}, onMsg = () => {}) class MockNetwork extends Network { constructor () { // @ts-ignore - {} is not an instance of libp2p - super({}, new Bitswap({}, new MemoryBlockstore()), new Stats()) + super({}, new Bitswap({}, new MemoryBlockstore()), new Stats({})) this.connects = connects this.messages = messages @@ -184,15 +185,19 @@ export const genBitswapNetwork = async (n, enableDHT = false) => { ) // create PeerStore and populate peerStore - netArray.forEach((net, i) => { - const pb = net.libp2p.peerStore - netArray.forEach((net, j) => { + for (let i = 0; i < netArray.length; i++) { + const pb = netArray[i].libp2p.peerStore + + for (let j = 0; j < netArray.length; j++) { if (i === j) { - return + continue } - pb.addressBook.set(net.peerId, net.libp2p.multiaddrs) - }) - }) + + const net = netArray[j] + + await pb.addressBook.set(net.peerId, net.libp2p.multiaddrs) + } + } // create every Bitswap netArray.forEach((net) => { diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index 4f5d2562..787c763d 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -54,7 +54,8 @@ describe('WantManager', () => { resolve() }) - const wantManager = new WantManager(peerIds[2], network, new Stats()) + // @ts-expect-error {} is not a real libp2p + const wantManager = new WantManager(peerIds[2], network, new Stats({}), {}) wantManager.start() wantManager.wantBlocks([cid1, cid2])