Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats improvements #158

Merged
merged 15 commits into from
Nov 27, 2017
7 changes: 7 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
"peer-book": "~0.5.1",
"peer-id": "~0.10.2",
"peer-info": "~0.11.1",
"pre-commit": "^1.2.2",
"rimraf": "^2.6.2",
"safe-buffer": "^5.1.1"
},
"dependencies": {
"async": "^2.6.0",
"big.js": "^5.0.3",
"cids": "~0.5.2",
"debug": "^3.1.0",
"ipfs-block": "~0.6.1",
Expand All @@ -70,6 +72,7 @@
"lodash.sortby": "^4.7.0",
"lodash.uniqwith": "^4.5.0",
"lodash.values": "^4.3.0",
"moving-average": "^1.0.0",
"multicodec": "~0.2.5",
"multihashing-async": "~0.4.7",
"protons": "^1.0.0",
Expand All @@ -80,6 +83,10 @@
"safe-buffer": "^5.1.1",
"varint-decoder": "^0.1.1"
},
"pre-commit": [
"lint",
"test"
],
"contributors": [
"David Dias <daviddias.p@gmail.com>",
"Dmitriy Ryajov <dryajov@gmail.com>",
Expand Down
6 changes: 5 additions & 1 deletion src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ const logger = require('../utils').logger
const MAX_MESSAGE_SIZE = 512 * 1024

class DecisionEngine {
constructor (peerId, blockstore, network) {
constructor (peerId, blockstore, network, stats) {
this._log = logger(peerId, 'engine')
this.blockstore = blockstore
this.network = network
this._stats = stats

// A list of of ledgers by their partner id
this.ledgerMap = new Map()
Expand Down Expand Up @@ -267,6 +268,9 @@ class DecisionEngine {
const l = new Ledger(peerId)

this.ledgerMap.set(peerIdStr, l)
if (this._stats) {
this._stats.push('peerCount', 1)
}

return l
}
Expand Down
73 changes: 54 additions & 19 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@ const Network = require('./network')
const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const Stats = require('./stats')

const defaultOptions = {
statsEnabled: false,
statsComputeThrottleTimeout: 1000,
statsComputeThrottleMaxQueueSize: 1000
}
const statsKeys = [
'blocksReceived',
'dataReceived',
'dupBlksReceived',
'dupDataReceived',
'blocksSent',
'dataSent',
'providesBufferLength',
'wantListLength',
'peerCount'
]

/**
* JavaScript implementation of the Bitswap 'data exchange' protocol
Expand All @@ -21,24 +39,29 @@ const logger = require('./utils').logger
* @param {Blockstore} blockstore
*/
class Bitswap {
constructor (libp2p, blockstore) {
constructor (libp2p, blockstore, options) {
this._libp2p = libp2p
this._log = logger(this.peerInfo.id)

this._options = Object.assign({}, defaultOptions, options)

// stats
this._stats = new Stats(statsKeys, {
enabled: this._options.statsEnabled,
computeThrottleTimeout: this._options.statsComputeThrottleTimeout,
computeThrottleMaxQueueSize: this._options.statsComputeThrottleMaxQueueSize
})

// the network delivers messages
this.network = new Network(libp2p, this)
this.network = new Network(libp2p, this, {}, this._stats)

// local database
this.blockstore = blockstore

this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network)
this.engine = new DecisionEngine(this.peerInfo.id, blockstore, this.network, this._stats)

// handle message sending
this.wm = new WantManager(this.peerInfo.id, this.network)

this.blocksRecvd = 0
this.dupBlocksRecvd = 0
this.dupDataRecvd = 0
this.wm = new WantManager(this.peerInfo.id, this.network, this._stats)

this.notifications = new Notifications(this.peerInfo.id)
}
Expand Down Expand Up @@ -95,11 +118,12 @@ class Bitswap {
}

_updateReceiveCounters (block, exists) {
this.blocksRecvd++
this._stats.push('blocksReceived', 1)
this._stats.push('dataReceived', block.data.length)

if (exists) {
this.dupBlocksRecvd++
this.dupDataRecvd += block.data.length
this._stats.push('dupBlksReceived', 1)
this._stats.push('dupDataReceived', block.data.length)
}
}

Expand Down Expand Up @@ -137,6 +161,14 @@ class Bitswap {
})
}

enableStats () {
this._stats.enable()
}

disableStats () {
this._stats.disable()
}

/**
* Return the current wantlist for a given `peerId`
*
Expand Down Expand Up @@ -329,25 +361,28 @@ class Bitswap {
/**
* Get the current list of wants.
*
* @returns {Array<WantlistEntry>}
* @returns {Iterator<WantlistEntry>}
*/
getWantlist () {
return this.wm.wantlist.entries()
}

/**
* Get the current list of partners.
*
* @returns {Array<PeerId>}
*/
peers () {
return this.engine.peers()
}

/**
* Get stats about the bitswap node.
*
* @returns {Object}
*/
stat () {
return {
wantlist: this.getWantlist(),
blocksReceived: this.blocksRecvd,
dupBlksReceived: this.dupBlocksRecvd,
dupDataReceived: this.dupDataRecvd,
peers: this.engine.peers()
}
return this._stats
}

/**
Expand Down
11 changes: 10 additions & 1 deletion src/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ const BITSWAP100 = '/ipfs/bitswap/1.0.0'
const BITSWAP110 = '/ipfs/bitswap/1.1.0'

class Network {
constructor (libp2p, bitswap, options) {
constructor (libp2p, bitswap, options, stats) {
this._log = logger(libp2p.peerInfo.id, 'network')
options = options || {}
this.libp2p = libp2p
this.bitswap = bitswap
this.b100Only = options.b100Only || false

this._stats = stats
this._running = false
}

Expand Down Expand Up @@ -149,6 +150,7 @@ class Network {
}
})
callback()
this._updateSentStats(msg.blocks)
})
}

Expand Down Expand Up @@ -176,6 +178,13 @@ class Network {
callback(null, conn, BITSWAP110)
})
}

_updateSentStats (blocks) {
if (this._stats) {
blocks.forEach((block) => this._stats.push('dataSent', block.data.length))
this._stats.push('blocksSent', blocks.size)
}
}
}

function writeMessage (conn, msg, callback) {
Expand Down
154 changes: 154 additions & 0 deletions src/stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
'use strict'

const EventEmitter = require('events')
const Big = require('big.js')
const MovingAverage = require('moving-average')

const defaultOptions = {
movingAverageIntervals: [
60 * 1000, // 1 minute
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000 // 15 minutes
]
}

class Stats extends EventEmitter {
constructor (initialCounters, _options) {
super()

const options = Object.assign({}, defaultOptions, _options)

if (typeof options.computeThrottleTimeout !== 'number') {
throw new Error('need computeThrottleTimeout')
}

if (typeof options.computeThrottleMaxQueueSize !== 'number') {
throw new Error('need computeThrottleMaxQueueSize')
}

this._options = options
this._queue = []
this._stats = {}

this._frequencyLastTime = Date.now()
this._frequencyAccumulators = {}
this._movingAverages = {}

this._update = this._update.bind(this)

initialCounters.forEach((key) => {
this._stats[key] = Big(0)
this._movingAverages[key] = {}
this._options.movingAverageIntervals.forEach((interval) => {
const ma = this._movingAverages[key][interval] = MovingAverage(interval)
ma.push(this._frequencyLastTime, 0)
})
})

this._enabled = this._options.enabled
}

enable () {
this._enabled = true
}

disable () {
this._disabled = true
}

get snapshot () {
return Object.assign({}, this._stats)
}

get movingAverages () {
return Object.assign({}, this._movingAverages)
}

push (counter, inc) {
if (this._enabled) {
this._queue.push([counter, inc, Date.now()])
this._resetComputeTimeout()
}
}

_resetComputeTimeout () {
if (this._timeout) {
clearTimeout(this._timeout)
}
this._timeout = setTimeout(this._update, this._nextTimeout())
}

_nextTimeout () {
// calculate the need for an update, depending on the queue length
const urgency = this._queue.length / this._options.computeThrottleMaxQueueSize
return Math.max(this._options.computeThrottleTimeout * (1 - urgency), 0)
}

_update () {
this._timeout = null
if (this._queue.length) {
let last
while (this._queue.length) {
const op = last = this._queue.shift()
this._applyOp(op)
}

this._updateFrequency(last[2]) // contains timestamp of last op

this.emit('update', this._stats)
}
}

_updateFrequency (latestTime) {
const timeDiff = latestTime - this._frequencyLastTime

Object.keys(this._stats).forEach((key) => {
this._updateFrequencyFor(key, timeDiff, latestTime)
})

this._frequencyLastTime = latestTime
}

_updateFrequencyFor (key, timeDiffMS, latestTime) {
const count = this._frequencyAccumulators[key] || 0
this._frequencyAccumulators[key] = 0
const hz = (count / timeDiffMS) * 1000

let movingAverages = this._movingAverages[key]
if (!movingAverages) {
movingAverages = this._movingAverages[key] = {}
}
this._options.movingAverageIntervals.forEach((movingAverageInterval) => {
let movingAverage = movingAverages[movingAverageInterval]
if (!movingAverage) {
movingAverage = movingAverages[movingAverageInterval] = MovingAverage(movingAverageInterval)
}
movingAverage.push(latestTime, hz)
})
}

_applyOp (op) {
const key = op[0]
const inc = op[1]

if (typeof inc !== 'number') {
throw new Error('invalid increment number:', inc)
}

let n

if (!this._stats.hasOwnProperty(key)) {
n = this._stats[key] = Big(0)
} else {
n = this._stats[key]
}
this._stats[key] = n.plus(inc)

if (!this._frequencyAccumulators[key]) {
this._frequencyAccumulators[key] = 0
}
this._frequencyAccumulators[key] += inc
}
}

module.exports = Stats
Loading