Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove node global #587

Merged
merged 7 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const after = async () => {
}

module.exports = {
bundlesize: { maxSize: '220kB' },
bundlesize: { maxSize: '179kB' },
hooks: {
pre: before,
post: after
Expand Down
5 changes: 3 additions & 2 deletions examples/pnet/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint no-console: ["off"] */
'use strict'

const { Buffer } = require('buffer')
const { generate } = require('libp2p/src/pnet')
const privateLibp2pNode = require('./libp2p-node')

Expand All @@ -17,7 +18,7 @@ generate(otherSwarmKey)
;(async () => {
const node1 = await privateLibp2pNode(swarmKey)

// TASK: switch the commented out line below so we're using a different key, to see the nodes fail to connect
// TASK: switch the commented out line below so we're using a different key, to see the nodes fail to connect
const node2 = await privateLibp2pNode(swarmKey)
// const node2 = await privateLibp2pNode(otherSwarmKey)

Expand Down Expand Up @@ -47,4 +48,4 @@ generate(otherSwarmKey)
['This message is sent on a private network'],
stream
)
})();
})()
5 changes: 3 additions & 2 deletions examples/pubsub/1.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-disable no-console */
'use strict'

const { Buffer } = require('buffer')
const Libp2p = require('../../')
const TCP = require('libp2p-tcp')
const Mplex = require('libp2p-mplex')
Expand Down Expand Up @@ -31,7 +32,7 @@ const createNode = async () => {

const [node1, node2] = await Promise.all([
createNode(),
createNode(),
createNode()
])

await node1.dial(node2.peerInfo)
Expand All @@ -48,4 +49,4 @@ const createNode = async () => {
setInterval(() => {
node2.pubsub.publish(topic, Buffer.from('Bird bird bird, bird is the word!'))
}, 1000)
})();
})()
35 changes: 18 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,39 @@
"class-is": "^1.1.0",
"debug": "^4.1.1",
"err-code": "^2.0.0",
"events": "^3.1.0",
"hashlru": "^2.3.0",
"ipfs-utils": "^2.2.0",
"it-all": "^1.0.1",
"it-buffer": "^0.1.1",
"it-buffer": "^0.1.2",
"it-handshake": "^1.0.1",
"it-length-prefixed": "^3.0.0",
"it-length-prefixed": "^3.0.1",
"it-pipe": "^1.1.0",
"it-protocol-buffers": "^0.2.0",
"latency-monitor": "~0.2.1",
"libp2p-crypto": "^0.17.1",
"libp2p-interfaces": "^0.2.3",
"libp2p-crypto": "^0.17.6",
"libp2p-interfaces": "^0.2.8",
"libp2p-utils": "^0.1.2",
"mafmt": "^7.0.0",
"merge-options": "^2.0.0",
"moving-average": "^1.0.0",
"multiaddr": "^7.2.1",
"multiaddr": "^7.4.3",
"multistream-select": "^0.15.0",
"mutable-proxy": "^1.0.0",
"p-any": "^3.0.0",
"p-fifo": "^1.0.0",
"p-settle": "^4.0.0",
"peer-id": "^0.13.4",
"p-settle": "^4.0.1",
"peer-id": "^0.13.11",
"peer-info": "^0.17.0",
"protons": "^1.0.1",
"retimer": "^2.0.0",
"streaming-iterables": "^4.1.0",
"timeout-abort-controller": "^1.0.0",
"xsalsa20": "^1.0.2"
},
"devDependencies": {
"@nodeutils/defaults-deep": "^1.1.0",
"abortable-iterator": "^3.0.0",
"aegir": "^21.3.0",
"aegir": "^21.9.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"cids": "^0.8.0",
Expand All @@ -92,20 +94,19 @@
"libp2p-delegated-content-routing": "^0.4.5",
"libp2p-delegated-peer-routing": "^0.4.3",
"libp2p-floodsub": "^0.20.0",
"libp2p-gossipsub": "^0.2.0",
"libp2p-kad-dht": "^0.18.2",
"libp2p-gossipsub": "^0.2.6",
"libp2p-kad-dht": "^0.18.6",
"libp2p-mdns": "^0.13.0",
"libp2p-mplex": "^0.9.1",
"libp2p-secio": "^0.12.1",
"libp2p-mplex": "^0.9.5",
"libp2p-secio": "^0.12.4",
"libp2p-tcp": "^0.14.1",
"libp2p-webrtc-star": "^0.17.0",
"libp2p-webrtc-star": "^0.17.9",
"libp2p-websockets": "^0.13.1",
"nock": "^12.0.0",
"nock": "^12.0.3",
"p-defer": "^3.0.0",
"p-times": "^2.1.0",
"p-wait-for": "^3.1.0",
"sinon": "^9.0.0",
"streaming-iterables": "^4.1.0",
"sinon": "^9.0.2",
"wrtc": "^0.4.1"
},
"contributors": [
Expand Down
2 changes: 1 addition & 1 deletion src/connection-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const errcode = require('err-code')
const mergeOptions = require('merge-options')
const LatencyMonitor = require('latency-monitor').default
const LatencyMonitor = require('./latency-monitor')
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')

Expand Down
246 changes: 246 additions & 0 deletions src/connection-manager/latency-monitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
'use strict'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latency monitor was pulled from the npm package right? Can we get attribution added, I'm not seeing it anywhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks better now ?


/**
* This code is based on `latency-monitor` (https://github.com/mlucool/latency-monitor) by `mlucool` (https://github.com/mlucool), available under Apache License 2.0 (https://github.com/mlucool/latency-monitor/blob/master/LICENSE)
*/

/* global window */
const globalThis = require('ipfs-utils/src/globalthis')
const EventEmitter = require('events')
const VisibilityChangeEmitter = require('./visibility-change-emitter')
const debug = require('debug')('latency-monitor:LatencyMonitor')

/**
* @typedef {Object} SummaryObject
* @property {Number} events How many events were called
* @property {Number} minMS What was the min time for a cb to be called
* @property {Number} maxMS What was the max time for a cb to be called
* @property {Number} avgMs What was the average time for a cb to be called
* @property {Number} lengthMs How long this interval was in ms
*/

/**
* A class to monitor latency of any async function which works in a browser or node. This works by periodically calling
* the asyncTestFn and timing how long it takes the callback to be called. It can also periodically emit stats about this.
* This can be disabled and stats can be pulled via setting dataEmitIntervalMs = 0.
*
* The default implementation is an event loop latency monitor. This works by firing periodic events into the event loop
* and timing how long it takes to get back.
*
* @example
* const monitor = new LatencyMonitor();
* monitor.on('data', (summary) => console.log('Event Loop Latency: %O', summary));
*
* @example
* const monitor = new LatencyMonitor({latencyCheckIntervalMs: 1000, dataEmitIntervalMs: 60000, asyncTestFn:ping});
* monitor.on('data', (summary) => console.log('Ping Pong Latency: %O', summary));
*/
class LatencyMonitor extends EventEmitter {
/**
* @param {Number} [latencyCheckIntervalMs=500] How often to add a latency check event (ms)
* @param {Number} [dataEmitIntervalMs=5000] How often to summarize latency check events. null or 0 disables event firing
* @param {function} [asyncTestFn] What cb-style async function to use
* @param {Number} [latencyRandomPercentage=5] What percent (+/-) of latencyCheckIntervalMs should we randomly use? This helps avoid alignment to other events.
*/
constructor ({ latencyCheckIntervalMs, dataEmitIntervalMs, asyncTestFn, latencyRandomPercentage } = {}) {
super()
const that = this

// 0 isn't valid here, so its ok to use ||
that.latencyCheckIntervalMs = latencyCheckIntervalMs || 500 // 0.5s
that.latencyRandomPercentage = latencyRandomPercentage || 10
that._latecyCheckMultiply = 2 * (that.latencyRandomPercentage / 100.0) * that.latencyCheckIntervalMs
that._latecyCheckSubtract = that._latecyCheckMultiply / 2

that.dataEmitIntervalMs = (dataEmitIntervalMs === null || dataEmitIntervalMs === 0) ? undefined
: dataEmitIntervalMs || 5 * 1000 // 5s
debug('latencyCheckIntervalMs: %s dataEmitIntervalMs: %s',
that.latencyCheckIntervalMs, that.dataEmitIntervalMs)
if (that.dataEmitIntervalMs) {
debug('Expecting ~%s events per summary', that.latencyCheckIntervalMs / that.dataEmitIntervalMs)
} else {
debug('Not emitting summaries')
}

that.asyncTestFn = asyncTestFn // If there is no asyncFn, we measure latency

// If process: use high resolution timer
if (globalThis.process && globalThis.process.hrtime) {
debug('Using process.hrtime for timing')
that.now = globalThis.process.hrtime
that.getDeltaMS = (startTime) => {
const hrtime = that.now(startTime)
return (hrtime[0] * 1000) + (hrtime[1] / 1000000)
}
// Let's try for a timer that only monotonically increases
} else if (typeof window !== 'undefined' && window.performance && window.performance.now) {
debug('Using performance.now for timing')
that.now = window.performance.now.bind(window.performance)
that.getDeltaMS = (startTime) => Math.round(that.now() - startTime)
} else {
debug('Using Date.now for timing')
that.now = Date.now
that.getDeltaMS = (startTime) => that.now() - startTime
}

that._latencyData = that._initLatencyData()

// We check for isBrowser because of browsers set max rates of timeouts when a page is hidden,
// so we fall back to another library
// See: http://stackoverflow.com/questions/6032429/chrome-timeouts-interval-suspended-in-background-tabs
if (isBrowser()) {
that._visibilityChangeEmitter = new VisibilityChangeEmitter()
that._visibilityChangeEmitter.on('visibilityChange', (pageInFocus) => {
if (pageInFocus) {
that._startTimers()
} else {
that._emitSummary()
that._stopTimers()
}
})
}

if (!that._visibilityChangeEmitter || that._visibilityChangeEmitter.isVisible()) {
that._startTimers()
}
}

/**
* Start internal timers
* @private
*/
_startTimers () {
// Timer already started, ignore this
if (this._checkLatencyID) {
return
}
this._checkLatency()
if (this.dataEmitIntervalMs) {
this._emitIntervalID = setInterval(() => this._emitSummary(), this.dataEmitIntervalMs)
if (typeof this._emitIntervalID.unref === 'function') {
this._emitIntervalID.unref() // Doesn't block exit
}
}
}

/**
* Stop internal timers
* @private
*/
_stopTimers () {
if (this._checkLatencyID) {
clearTimeout(this._checkLatencyID)
this._checkLatencyID = undefined
}
if (this._emitIntervalID) {
clearInterval(this._emitIntervalID)
this._emitIntervalID = undefined
}
}

/**
* Emit summary only if there were events. It might not have any events if it was forced via a page hidden/show
* @private
*/
_emitSummary () {
const summary = this.getSummary()
if (summary.events > 0) {
this.emit('data', summary)
}
}

/**
* Calling this function will end the collection period. If a timing event was already fired and somewhere in the queue,
* it will not count for this time period
* @returns {SummaryObject}
*/
getSummary () {
// We might want to adjust for the number of expected events
// Example: first 1 event it comes back, then such a long blocker that the next emit check comes
// Then this fires - looks like no latency!!
const latency = {
events: this._latencyData.events,
minMs: this._latencyData.minMs,
maxMs: this._latencyData.maxMs,
avgMs: this._latencyData.events ? this._latencyData.totalMs / this._latencyData.events
: Number.POSITIVE_INFINITY,
lengthMs: this.getDeltaMS(this._latencyData.startTime)
}
this._latencyData = this._initLatencyData() // Clear

debug('Summary: %O', latency)
return latency
}

/**
* Randomly calls an async fn every roughly latencyCheckIntervalMs (plus some randomness). If no async fn is found,
* it will simply report on event loop latency.
*
* @private
*/
_checkLatency () {
const that = this
// Randomness is needed to avoid alignment by accident to regular things in the event loop
const randomness = (Math.random() * that._latecyCheckMultiply) - that._latecyCheckSubtract

// We use this to ensure that in case some overlap somehow, we don't take the wrong startTime/offset
const localData = {
deltaOffset: Math.ceil(that.latencyCheckIntervalMs + randomness),
startTime: that.now()
}

const cb = () => {
// We are already stopped, ignore this datapoint
if (!this._checkLatencyID) {
return
}
const deltaMS = that.getDeltaMS(localData.startTime) - localData.deltaOffset
that._checkLatency() // Start again ASAP

// Add the data point. If this gets complex, refactor it
that._latencyData.events++
that._latencyData.minMs = Math.min(that._latencyData.minMs, deltaMS)
that._latencyData.maxMs = Math.max(that._latencyData.maxMs, deltaMS)
that._latencyData.totalMs += deltaMS
debug('MS: %s Data: %O', deltaMS, that._latencyData)
}
debug('localData: %O', localData)

this._checkLatencyID = setTimeout(() => {
// This gets rid of including event loop
if (that.asyncTestFn) {
// Clear timing related things
localData.deltaOffset = 0
localData.startTime = that.now()
that.asyncTestFn(cb)
} else {
// setTimeout is not more accurate than 1ms, so this will ensure positive numbers. Add 1 to emitted data to remove.
// This is not the best, but for now it'll be just fine. This isn't meant to be sub ms accurate.
localData.deltaOffset -= 1
// If there is no function to test, we mean check latency which is a special case that is really cb => cb()
// We avoid that for the few extra function all overheads. Also, we want to keep the timers different
cb()
}
}, localData.deltaOffset)

if (typeof this._checkLatencyID.unref === 'function') {
this._checkLatencyID.unref() // Doesn't block exit
}
}

_initLatencyData () {
return {
startTime: this.now(),
minMs: Number.POSITIVE_INFINITY,
maxMs: Number.NEGATIVE_INFINITY,
events: 0,
totalMs: 0
}
}
}

function isBrowser () {
return typeof window !== 'undefined'
}

module.exports = LatencyMonitor
Loading