Skip to content

Commit

Permalink
feat: libp2p async peerstore (#413)
Browse files Browse the repository at this point in the history
Refactors interfaces and classes used by `libp2p-interfaces` to use the async peer store from libp2p/js-libp2p#1058

BREAKING CHANGE: peerstore methods are now all async
  • Loading branch information
achingbrain authored Jan 18, 2022
1 parent 773032c commit 9f5fda9
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 65 deletions.
9 changes: 3 additions & 6 deletions .github/workflows/typecheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@ name: Typecheck
jobs:
check:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [14.x]
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
uses: actions/setup-node@v2
with:
node-version: ${{ matrix.node-version }}
node-version: 16
- name: Install dependencies
run: npm install
- name: Typecheck
Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@
"aegir": "^36.0.2",
"assert": "^2.0.0",
"benchmark": "^2.1.4",
"datastore-core": "^6.0.7",
"delay": "^5.0.0",
"interface-datastore": "^6.0.2",
"iso-random-stream": "^2.0.0",
"it-all": "^1.0.5",
"it-drain": "^1.0.4",
"libp2p": "^0.34.0",
"libp2p-kad-dht": "^0.27.1",
"libp2p": "libp2p/js-libp2p#feat/async-peerstore",
"libp2p-kad-dht": "^0.28.4",
"libp2p-mplex": "^0.10.2",
"libp2p-tcp": "^0.17.1",
"lodash.difference": "^4.5.0",
Expand Down Expand Up @@ -104,7 +105,7 @@
"it-length-prefixed": "^5.0.2",
"it-pipe": "^1.1.0",
"just-debounce-it": "^1.1.0",
"libp2p-interfaces": "^2.0.1",
"libp2p-interfaces": "^4.0.0",
"multiaddr": "^10.0.0",
"multiformats": "^9.0.4",
"native-abort-controller": "^1.0.3",
Expand Down
6 changes: 3 additions & 3 deletions src/bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class Bitswap extends BaseBlockstore {
this._options = Object.assign({}, defaultOptions, options)

// stats
this._stats = new Stats(statsKeys, {
this._stats = new Stats(libp2p, statsKeys, {
enabled: this._options.statsEnabled,
computeThrottleTimeout: this._options.statsComputeThrottleTimeout,
computeThrottleMaxQueueSize: this._options.statsComputeThrottleMaxQueueSize
Expand All @@ -75,10 +75,10 @@ export class Bitswap extends BaseBlockstore {
// local database
this.blockstore = blockstore

this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats)
this.engine = new DecisionEngine(this.peerId, blockstore, this.network, this._stats, libp2p)

// handle message sending
this.wm = new WantManager(this.peerId, this.network, this._stats)
this.wm = new WantManager(this.peerId, this.network, this._stats, libp2p)

this.notifications = new Notifications(this.peerId)

Expand Down
23 changes: 12 additions & 11 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Ledger } from './ledger.js'
import { RequestQueue } from './req-queue.js'
import { TaskMerger } from './task-merger.js'
import { logger } from '../utils/index.js'
import trackedMap from 'libp2p/src/metrics/tracked-map.js'

/**
* @typedef {import('../message/entry').BitswapMessageEntry} BitswapMessageEntry
Expand Down Expand Up @@ -35,11 +36,12 @@ export class DecisionEngine {
* @param {import('interface-blockstore').Blockstore} blockstore
* @param {import('../network').Network} network
* @param {import('../stats').Stats} stats
* @param {import('libp2p')} libp2p
* @param {Object} [opts]
* @param {number} [opts.targetMessageSize]
* @param {number} [opts.maxSizeReplaceHasWithBlock]
*/
constructor (peerId, blockstore, network, stats, opts = {}) {
constructor (peerId, blockstore, network, stats, libp2p, opts = {}) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
this.network = network
Expand All @@ -48,7 +50,12 @@ export class DecisionEngine {

// A list of of ledgers by their partner id
/** @type {Map<string, Ledger>} */
this.ledgerMap = new Map()
this.ledgerMap = trackedMap({
system: 'ipfs',
component: 'bitswap',
metric: 'ledger-map',
metrics: libp2p.metrics
})
this._running = false

// Queue of want-have / want-block per peer
Expand Down Expand Up @@ -455,16 +462,10 @@ export class DecisionEngine {

/**
*
* @param {PeerId} _peerId
* @returns {void}
* @param {PeerId} peerId
*/
peerDisconnected (_peerId) {
// if (this.ledgerMap.has(peerId.toB58String())) {
// this.ledgerMap.delete(peerId.toB58String())
// }
//
// TODO: figure out how to remove all other references
// in the peer request queue
peerDisconnected (peerId) {
this.ledgerMap.delete(peerId.toB58String())
}

/**
Expand Down
12 changes: 6 additions & 6 deletions src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ export class Network {
this._hashLoader = options.hashLoader
}

start () {
async start () {
this._running = true
this._libp2p.handle(this._protocols, this._onConnection)
await this._libp2p.handle(this._protocols, this._onConnection)

// register protocol with topology
const topology = new MulticodecTopology({
Expand All @@ -69,21 +69,21 @@ export class Network {
onDisconnect: this._onPeerDisconnect
}
})
this._registrarId = this._libp2p.registrar.register(topology)
this._registrarId = await this._libp2p.registrar.register(topology)

// All existing connections are like new ones for us
for (const peer of this._libp2p.peerStore.peers.values()) {
for await (const peer of this._libp2p.peerStore.getPeers()) {
const conn = this._libp2p.connectionManager.get(peer.id)

conn && this._onPeerConnect(conn.remotePeer)
}
}

stop () {
async stop () {
this._running = false

// Unhandle both, libp2p doesn't care if it's not already handled
this._libp2p.unhandle(this._protocols)
await this._libp2p.unhandle(this._protocols)

// unregister protocol and handlers
if (this._registrarId != null) {
Expand Down
11 changes: 9 additions & 2 deletions src/stats/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EventEmitter } from 'events'
import { Stat } from './stat.js'
import trackedMap from 'libp2p/src/metrics/tracked-map.js'

/**
* @typedef {import('multiformats').CID} CID
Expand All @@ -22,13 +23,14 @@ const defaultOptions = {

export class Stats extends EventEmitter {
/**
* @param {import('libp2p')} libp2p
* @param {string[]} [initialCounters]
* @param {Object} _options
* @param {boolean} _options.enabled
* @param {number} _options.computeThrottleTimeout
* @param {number} _options.computeThrottleMaxQueueSize
*/
constructor (initialCounters = [], _options = defaultOptions) {
constructor (libp2p, initialCounters = [], _options = defaultOptions) {
super()

const options = Object.assign({}, defaultOptions, _options)
Expand All @@ -49,7 +51,12 @@ export class Stats extends EventEmitter {
this._global.on('update', (stats) => this.emit('update', stats))

/** @type {Map<string, Stat>} */
this._peers = new Map()
this._peers = trackedMap({
system: 'ipfs',
component: 'bitswap',
metric: 'stats-peers',
metrics: libp2p.metrics
})
}

enable () {
Expand Down
13 changes: 10 additions & 3 deletions src/want-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as CONSTANTS from '../constants.js'
import { MsgQueue } from './msg-queue.js'
import { logger } from '../utils/index.js'
import { base58btc } from 'multiformats/bases/base58'
import trackedMap from 'libp2p/src/metrics/tracked-map.js'

/**
* @typedef {import('peer-id')} PeerId
Expand All @@ -16,11 +17,17 @@ export class WantManager {
* @param {PeerId} peerId
* @param {import('../network').Network} network
* @param {import('../stats').Stats} stats
* @param {import('libp2p')} libp2p
*/
constructor (peerId, network, stats) {
constructor (peerId, network, stats, libp2p) {
/** @type {Map<string, MsgQueue>} */
this.peers = new Map()
this.wantlist = new Wantlist(stats)
this.peers = trackedMap({
system: 'ipfs',
component: 'bitswap',
metric: 'want-manager-peers',
metrics: libp2p.metrics
})
this.wantlist = new Wantlist(stats, libp2p)

this.network = network
this._stats = stats
Expand Down
14 changes: 11 additions & 3 deletions src/wantlist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { WantListEntry as Entry } from './entry.js'
import { base58btc } from 'multiformats/bases/base58'
import { Message } from '../message/message.js'
import trackedMap from 'libp2p/src/metrics/tracked-map.js'

const WantType = {
Block: Message.Wantlist.WantType.Block,
Expand All @@ -28,12 +29,19 @@ const sortBy = (fn, list) => {

export class Wantlist {
/**
*
* @param {import('../stats').Stats} [stats]
* @param {import('libp2p')} [libp2p]
*/
constructor (stats) {
constructor (stats, libp2p) {
/** @type {Map<string, Entry>} */
this.set = new Map()
this.set = libp2p
? trackedMap({
system: 'ipfs',
component: 'bitswap',
metric: 'wantlist',
metrics: libp2p.metrics
})
: new Map()
this._stats = stats
}

Expand Down
18 changes: 12 additions & 6 deletions test/decision-engine/decision-engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ function stringifyMessages (messages) {
*/
async function newEngine (network) {
const peerId = await PeerId.create({ bits: 512 })
const engine = new DecisionEngine(peerId, new MemoryBlockstore(), network, new Stats())
// @ts-expect-error {} is not a real libp2p
const engine = new DecisionEngine(peerId, new MemoryBlockstore(), network, new Stats({}), {})
engine.start()
return { peer: peerId, engine: engine }
}
Expand Down Expand Up @@ -194,7 +195,8 @@ describe('Engine', () => {
})
const id = await PeerId.create({ bits: 512 })
const blockstore = new MemoryBlockstore()
const dEngine = new DecisionEngine(id, blockstore, network, new Stats())
// @ts-expect-error {} is not a real libp2p
const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {})
dEngine.start()

// Send wants then cancels for some of the wants
Expand Down Expand Up @@ -289,7 +291,8 @@ describe('Engine', () => {
}
})

const dEngine = new DecisionEngine(id, blockstore, network, new Stats())
// @ts-expect-error {} is not a real libp2p
const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {})
dEngine.start()

// Each peer requests all blocks
Expand All @@ -313,7 +316,8 @@ describe('Engine', () => {
const deferred = defer()
const network = mockNetwork(blocks.length, undefined, (peer, msg) => deferred.resolve([peer, msg]))
const blockstore = new MemoryBlockstore()
const dEngine = new DecisionEngine(id, blockstore, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 })
// @ts-expect-error {} is not a real libp2p
const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}, { maxSizeReplaceHasWithBlock: 0 })
dEngine.start()

const message = new Message(false)
Expand Down Expand Up @@ -356,7 +360,8 @@ describe('Engine', () => {
onMsg([peerId, message])
})
const blockstore = new MemoryBlockstore()
const dEngine = new DecisionEngine(id, blockstore, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 })
// @ts-expect-error {} is not a real libp2p
const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}, { maxSizeReplaceHasWithBlock: 0 })
dEngine.start()

const message = new Message(false)
Expand Down Expand Up @@ -702,7 +707,8 @@ describe('Engine', () => {

const blockstore = new MemoryBlockstore()
await drain(blockstore.putMany(blocks.map(({ cid, data }) => ({ key: cid, value: data }))))
const dEngine = new DecisionEngine(id, blockstore, network, new Stats(), { maxSizeReplaceHasWithBlock: 0 })
// @ts-expect-error {} is not a real libp2p
const dEngine = new DecisionEngine(id, blockstore, network, new Stats({}), {}, { maxSizeReplaceHasWithBlock: 0 })
dEngine._scheduleProcessTasks = () => {}
dEngine.start()

Expand Down
Loading

0 comments on commit 9f5fda9

Please sign in to comment.