Skip to content

Commit

Permalink
fix: remove wants from wantlist when multiple block retrievers are us…
Browse files Browse the repository at this point in the history
…ed (#491)

Fixes a bug where sometimes a wanted CID was not removed from the wantlist when it was retrieved by an alternative block retriever.

Also switches to using events for block/presence arrival to remove shared promises.
  • Loading branch information
achingbrain authored Apr 9, 2024
1 parent 395cd9e commit b1c761d
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 227 deletions.
5 changes: 4 additions & 1 deletion packages/bitswap/src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ export class Bitswap implements BitswapInterface {
* Sends notifications about the arrival of a block
*/
async notify (cid: CID, block: Uint8Array, options: ProgressOptions<BitswapNotifyProgressEvents> & AbortOptions = {}): Promise<void> {
await this.peerWantLists.receivedBlock(cid, options)
await Promise.all([
this.peerWantLists.receivedBlock(cid, options),
this.wantList.receivedBlock(cid, options)
])
}

getWantlist (): WantListEntry[] {
Expand Down
56 changes: 23 additions & 33 deletions packages/bitswap/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import { CID } from 'multiformats/cid'
import { CustomProgressEvent } from 'progress-events'
import { raceEvent } from 'race-event'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.js'
import { BITSWAP_120, DEFAULT_MAX_INBOUND_STREAMS, DEFAULT_MAX_OUTBOUND_STREAMS, DEFAULT_MAX_PROVIDERS_PER_REQUEST, DEFAULT_MESSAGE_RECEIVE_TIMEOUT, DEFAULT_MESSAGE_SEND_CONCURRENCY, DEFAULT_MESSAGE_SEND_TIMEOUT, DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS } from './constants.js'
import { BitswapMessage } from './pb/message.js'
import type { WantOptions } from './bitswap.js'
import type { MultihashHasherLoader } from './index.js'
import type { Block, BlockPresence, WantlistEntry } from './pb/message.js'
import type { Provider, Routing } from '@helia/interface/routing'
import type { Libp2p, AbortOptions, Connection, PeerId, IncomingStreamData, Topology, MetricGroup, ComponentLogger, Metrics, IdentifyResult } from '@libp2p/interface'
import type { Libp2p, AbortOptions, Connection, PeerId, IncomingStreamData, Topology, ComponentLogger, IdentifyResult, Counter } from '@libp2p/interface'
import type { Logger } from '@libp2p/logger'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

Expand Down Expand Up @@ -79,7 +79,6 @@ export interface NetworkComponents {
routing: Routing
logger: ComponentLogger
libp2p: Libp2p
metrics?: Metrics
}

export interface BitswapMessageEventDetail {
Expand Down Expand Up @@ -107,7 +106,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
private readonly maxOutboundStreams: number
private readonly messageReceiveTimeout: number
private registrarIds: string[]
private readonly metrics?: { blocksSent: MetricGroup, dataSent: MetricGroup }
private readonly metrics: { blocksSent?: Counter, dataSent?: Counter }
private readonly sendQueue: PeerQueue<void, SendMessageJobOptions>
private readonly messageSendTimeout: number
private readonly runOnTransientConnections: boolean
Expand All @@ -129,17 +128,14 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
this.messageReceiveTimeout = init.messageReceiveTimeout ?? DEFAULT_MESSAGE_RECEIVE_TIMEOUT
this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT
this.runOnTransientConnections = init.runOnTransientConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS

if (components.metrics != null) {
this.metrics = {
blocksSent: components.metrics?.registerMetricGroup('ipfs_bitswap_sent_blocks'),
dataSent: components.metrics?.registerMetricGroup('ipfs_bitswap_sent_data_bytes')
}
this.metrics = {
blocksSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_blocks_total'),
dataSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_data_bytes_total')
}

this.sendQueue = new PeerQueue({
concurrency: init.messageSendConcurrency,
metrics: components.metrics,
concurrency: init.messageSendConcurrency ?? DEFAULT_MESSAGE_SEND_CONCURRENCY,
metrics: components.libp2p.metrics,
metricName: 'ipfs_bitswap_message_send_queue'
})
this.sendQueue.addEventListener('error', (evt) => {
Expand Down Expand Up @@ -302,9 +298,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
*/
async findAndConnect (cid: CID, options?: WantOptions): Promise<void> {
await drain(
take(
map(this.findProviders(cid, options), async provider => this.connectTo(provider.id, options)),
options?.maxProviders ?? DEFAULT_MAX_PROVIDERS_PER_REQUEST
map(
take(this.findProviders(cid, options), options?.maxProviders ?? DEFAULT_MAX_PROVIDERS_PER_REQUEST),
async provider => this.connectTo(provider.id, options)
)
)
.catch(err => {
Expand Down Expand Up @@ -335,9 +331,11 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
setMaxListeners(Infinity, signal)

try {
const existingJob = this.sendQueue.find(peerId)
const existingJob = this.sendQueue.queue.find(job => {
return peerId.equals(job.options.peerId) && job.status === 'queued'
})

if (existingJob?.status === 'queued') {
if (existingJob != null) {
// merge messages instead of adding new job
existingJob.options.message = mergeMessages(existingJob.options.message, message)

Expand Down Expand Up @@ -371,7 +369,7 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
stream.abort(err)
}

this._updateSentStats(peerId, message.blocks)
this._updateSentStats(message.blocks)
}, {
peerId,
signal,
Expand Down Expand Up @@ -417,23 +415,15 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
return connection
}

_updateSentStats (peerId: PeerId, blocks: Block[] = []): void {
if (this.metrics != null) {
let bytes = 0

for (const block of blocks.values()) {
bytes += block.data.byteLength
}
_updateSentStats (blocks: Block[] = []): void {
let bytes = 0

this.metrics.dataSent.increment({
global: bytes,
[peerId.toString()]: bytes
})
this.metrics.blocksSent.increment({
global: blocks.length,
[peerId.toString()]: blocks.length
})
for (const block of blocks.values()) {
bytes += block.data.byteLength
}

this.metrics.dataSent?.increment(bytes)
this.metrics.blocksSent?.increment(blocks.length)
}
}

Expand Down
6 changes: 3 additions & 3 deletions packages/bitswap/src/peer-want-lists/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Ledger } from './ledger.js'
import type { BitswapNotifyProgressEvents, WantListEntry } from '../index.js'
import type { Network } from '../network.js'
import type { BitswapMessage } from '../pb/message.js'
import type { ComponentLogger, Logger, Metrics, PeerId } from '@libp2p/interface'
import type { ComponentLogger, Libp2p, Logger, PeerId } from '@libp2p/interface'
import type { PeerMap } from '@libp2p/peer-collections'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'it-length-prefixed-stream'
Expand All @@ -19,7 +19,7 @@ export interface PeerWantListsInit {
export interface PeerWantListsComponents {
blockstore: Blockstore
network: Network
metrics?: Metrics
libp2p: Libp2p
logger: ComponentLogger
}

Expand All @@ -46,7 +46,7 @@ export class PeerWantLists {

this.ledgerMap = trackedPeerMap({
name: 'ipfs_bitswap_ledger_map',
metrics: components.metrics
metrics: components.libp2p.metrics
})

this.network.addEventListener('bitswap:message', (evt) => {
Expand Down
5 changes: 5 additions & 0 deletions packages/bitswap/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ class BitswapSession implements BitswapSessionInterface {
void Promise.resolve()
.then(async () => {
for await (const peerId of source) {
if (found === this.maxProviders) {
this.queue.clear()
break
}

// eslint-disable-next-line no-loop-func
await this.queue.add(async () => {
try {
Expand Down
12 changes: 6 additions & 6 deletions packages/bitswap/src/stats.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { MetricGroup, Metrics, PeerId } from '@libp2p/interface'
import type { Libp2p, MetricGroup, PeerId } from '@libp2p/interface'

export interface StatsComponents {
metrics?: Metrics
libp2p: Libp2p
}

export class Stats {
Expand All @@ -11,10 +11,10 @@ export class Stats {
private readonly duplicateDataReceived?: MetricGroup

constructor (components: StatsComponents) {
this.blocksReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_received_blocks')
this.duplicateBlocksReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_received_blocks')
this.dataReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_data_received_bytes')
this.duplicateDataReceived = components.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_data_received_bytes')
this.blocksReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_received_blocks')
this.duplicateBlocksReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_received_blocks')
this.dataReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_data_received_bytes')
this.duplicateDataReceived = components.libp2p.metrics?.registerMetricGroup('ipfs_bitswap_duplicate_data_received_bytes')
}

updateBlocksReceived (count: number = 1, peerId?: PeerId): void {
Expand Down
Loading

0 comments on commit b1c761d

Please sign in to comment.