Skip to content

Commit

Permalink
feat: stats improvements (#158)
Browse files Browse the repository at this point in the history
* bitswap stats: async updater

* linter happy

* pre-commit hook for linting and tests

* stats: support dataReceived

* stats: data received

* stats: blocks sent and data sent

* stats: using bignum

* stats: compute throttle with threshold

* stats: update timeout is now dynamic based on queue size

* stats: moving averages

* stats: providesBufferLength

* stats: support for providesBufferLength and wantListLength

* stats: support for peerCount

* stats: enable / disable

* increased test timeout
  • Loading branch information
pgte authored and daviddias committed Nov 27, 2017
1 parent 6886a59 commit 17e15d0
Show file tree
Hide file tree
Showing 11 changed files with 493 additions and 45 deletions.
7 changes: 7 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
"peer-book": "~0.5.1",
"peer-id": "~0.10.2",
"peer-info": "~0.11.1",
"pre-commit": "^1.2.2",
"rimraf": "^2.6.2",
"safe-buffer": "^5.1.1"
},
"dependencies": {
"async": "^2.6.0",
"big.js": "^5.0.3",
"cids": "~0.5.2",
"debug": "^3.1.0",
"ipfs-block": "~0.6.1",
Expand All @@ -70,6 +72,7 @@
"lodash.sortby": "^4.7.0",
"lodash.uniqwith": "^4.5.0",
"lodash.values": "^4.3.0",
"moving-average": "^1.0.0",
"multicodec": "~0.2.5",
"multihashing-async": "~0.4.7",
"protons": "^1.0.0",
Expand All @@ -80,6 +83,10 @@
"safe-buffer": "^5.1.1",
"varint-decoder": "^0.1.1"
},
"pre-commit": [
"lint",
"test"
],
"contributors": [
"David Dias <daviddias.p@gmail.com>",
"Dmitriy Ryajov <dryajov@gmail.com>",
Expand Down
6 changes: 5 additions & 1 deletion src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ const logger = require('../utils').logger
const MAX_MESSAGE_SIZE = 512 * 1024

class DecisionEngine {
constructor (peerId, blockstore, network) {
constructor (peerId, blockstore, network, stats) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
this.network = network
this._stats = stats

// A list of of ledgers by their partner id
this.ledgerMap = new Map()
Expand Down Expand Up @@ -267,6 +268,9 @@ class DecisionEngine {
const l = new Ledger(peerId)

this.ledgerMap.set(peerIdStr, l)
if (this._stats) {
this._stats.push('peerCount', 1)
}

return l
}
Expand Down
73 changes: 54 additions & 19 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@ const Network = require('./network')
const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const Stats = require('./stats')

const defaultOptions = {
statsEnabled: false,
statsComputeThrottleTimeout: 1000,
statsComputeThrottleMaxQueueSize: 1000
}
const statsKeys = [
'blocksReceived',
'dataReceived',
'dupBlksReceived',
'dupDataReceived',
'blocksSent',
'dataSent',
'providesBufferLength',
'wantListLength',
'peerCount'
]

/**
* JavaScript implementation of the Bitswap 'data exchange' protocol
Expand All @@ -21,24 +39,29 @@ const logger = require('./utils').logger
* @param {Blockstore} blockstore
*/
class Bitswap {
constructor (libp2p, blockstore) {
constructor (libp2p, blockstore, options) {
this._libp2p = libp2p
this._log = logger(this.peerInfo.id)

this._options = Object.assign({}, defaultOptions, options)

// stats
this._stats = new Stats(statsKeys, {
enabled: this._options.statsEnabled,
computeThrottleTimeout: this._options.statsComputeThrottleTimeout,
computeThrottleMaxQueueSize: this._options.statsComputeThrottleMaxQueueSize
})

// the network delivers messages
this.network = new Network(libp2p, this)
this.network = new Network(libp2p, this, {}, this._stats)

// local database
this.blockstore = blockstore

this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network)
this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats)

// handle message sending
this.wm = new WantManager(this.peerInfo.id, this.network)

this.blocksRecvd = 0
this.dupBlocksRecvd = 0
this.dupDataRecvd = 0
this.wm = new WantManager(this.peerInfo.id, this.network, this._stats)

this.notifications = new Notifications(this.peerInfo.id)
}
Expand Down Expand Up @@ -95,11 +118,12 @@ class Bitswap {
}

_updateReceiveCounters (block, exists) {
this.blocksRecvd++
this._stats.push('blocksReceived', 1)
this._stats.push('dataReceived', block.data.length)

if (exists) {
this.dupBlocksRecvd++
this.dupDataRecvd += block.data.length
this._stats.push('dupBlksReceived', 1)
this._stats.push('dupDataReceived', block.data.length)
}
}

Expand Down Expand Up @@ -137,6 +161,14 @@ class Bitswap {
})
}

enableStats () {
this._stats.enable()
}

disableStats () {
this._stats.disable()
}

/**
* Return the current wantlist for a given `peerId`
*
Expand Down Expand Up @@ -329,25 +361,28 @@ class Bitswap {
/**
* Get the current list of wants.
*
* @returns {Array<WantlistEntry>}
* @returns {Iterator<WantlistEntry>}
*/
getWantlist () {
return this.wm.wantlist.entries()
}

/**
* Get the current list of partners.
*
* @returns {Array<PeerId>}
*/
peers () {
return this.engine.peers()
}

/**
* Get stats about the bitswap node.
*
* @returns {Object}
*/
stat () {
return {
wantlist: this.getWantlist(),
blocksReceived: this.blocksRecvd,
dupBlksReceived: this.dupBlocksRecvd,
dupDataReceived: this.dupDataRecvd,
peers: this.engine.peers()
}
return this._stats
}

/**
Expand Down
11 changes: 10 additions & 1 deletion src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ const BITSWAP100 = '/ipfs/bitswap/1.0.0'
const BITSWAP110 = '/ipfs/bitswap/1.1.0'

class Network {
constructor (libp2p, bitswap, options) {
constructor (libp2p, bitswap, options, stats) {
this._log = logger(libp2p.peerInfo.id, 'network')
options = options || {}
this.libp2p = libp2p
this.bitswap = bitswap
this.b100Only = options.b100Only || false

this._stats = stats
this._running = false
}

Expand Down Expand Up @@ -149,6 +150,7 @@ class Network {
}
})
callback()
this._updateSentStats(msg.blocks)
})
}

Expand Down Expand Up @@ -176,6 +178,13 @@ class Network {
callback(null, conn, BITSWAP110)
})
}

_updateSentStats (blocks) {
if (this._stats) {
blocks.forEach((block) => this._stats.push('dataSent', block.data.length))
this._stats.push('blocksSent', blocks.size)
}
}
}

function writeMessage (conn, msg, callback) {
Expand Down
154 changes: 154 additions & 0 deletions src/stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
'use strict'

const EventEmitter = require('events')
const Big = require('big.js')
const MovingAverage = require('moving-average')

const defaultOptions = {
movingAverageIntervals: [
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
]
}

class Stats extends EventEmitter {
constructor (initialCounters, _options) {
super()

const options = Object.assign({}, defaultOptions, _options)

if (typeof options.computeThrottleTimeout !== 'number') {
throw new Error('need computeThrottleTimeout')
}

if (typeof options.computeThrottleMaxQueueSize !== 'number') {
throw new Error('need computeThrottleMaxQueueSize')
}

this._options = options
this._queue = []
this._stats = {}

this._frequencyLastTime = Date.now()
this._frequencyAccumulators = {}
this._movingAverages = {}

this._update = this._update.bind(this)

initialCounters.forEach((key) => {
this._stats[key] = Big(0)
this._movingAverages[key] = {}
this._options.movingAverageIntervals.forEach((interval) => {
const ma = this._movingAverages[key][interval] = MovingAverage(interval)
ma.push(this._frequencyLastTime, 0)
})
})

this._enabled = this._options.enabled
}

enable () {
this._enabled = true
}

disable () {
this._disabled = true
}

get snapshot () {
return Object.assign({}, this._stats)
}

get movingAverages () {
return Object.assign({}, this._movingAverages)
}

push (counter, inc) {
if (this._enabled) {
this._queue.push([counter, inc, Date.now()])
this._resetComputeTimeout()
}
}

_resetComputeTimeout () {
if (this._timeout) {
clearTimeout(this._timeout)
}
this._timeout = setTimeout(this._update, this._nextTimeout())
}

_nextTimeout () {
// calculate the need for an update, depending on the queue length
const urgency = this._queue.length / this._options.computeThrottleMaxQueueSize
return Math.max(this._options.computeThrottleTimeout * (1 - urgency), 0)
}

_update () {
this._timeout = null
if (this._queue.length) {
let last
while (this._queue.length) {
const op = last = this._queue.shift()
this._applyOp(op)
}

this._updateFrequency(last[2]) // contains timestamp of last op

this.emit('update', this._stats)
}
}

_updateFrequency (latestTime) {
const timeDiff = latestTime - this._frequencyLastTime

Object.keys(this._stats).forEach((key) => {
this._updateFrequencyFor(key, timeDiff, latestTime)
})

this._frequencyLastTime = latestTime
}

_updateFrequencyFor (key, timeDiffMS, latestTime) {
const count = this._frequencyAccumulators[key] || 0
this._frequencyAccumulators[key] = 0
const hz = (count / timeDiffMS) * 1000

let movingAverages = this._movingAverages[key]
if (!movingAverages) {
movingAverages = this._movingAverages[key] = {}
}
this._options.movingAverageIntervals.forEach((movingAverageInterval) => {
let movingAverage = movingAverages[movingAverageInterval]
if (!movingAverage) {
movingAverage = movingAverages[movingAverageInterval] = MovingAverage(movingAverageInterval)
}
movingAverage.push(latestTime, hz)
})
}

_applyOp (op) {
const key = op[0]
const inc = op[1]

if (typeof inc !== 'number') {
throw new Error('invalid increment number:', inc)
}

let n

if (!this._stats.hasOwnProperty(key)) {
n = this._stats[key] = Big(0)
} else {
n = this._stats[key]
}
this._stats[key] = n.plus(inc)

if (!this._frequencyAccumulators[key]) {
this._frequencyAccumulators[key] = 0
}
this._frequencyAccumulators[key] += inc
}
}

module.exports = Stats
Loading

0 comments on commit 17e15d0

Please sign in to comment.