Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add monitor #25

Merged
merged 6 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const mutexify = require('mutexify')
const b4a = require('b4a')

const { BlobReadStream, BlobWriteStream } = require('./lib/streams')
const Monitor = require('./lib/monitor')

const DEFAULT_BLOCK_SIZE = 2 ** 16

Expand All @@ -12,6 +13,11 @@ module.exports = class Hyperblobs {

this._lock = mutexify()
this._core = core
this._monitors = new Set()

this._boundUpdatePeers = this._updatePeers.bind(this)
this._boundOnUpload = this._onUpload.bind(this)
this._boundOnDownload = this._onDownload.bind(this)
}

get feed () {
Expand Down Expand Up @@ -67,4 +73,42 @@ module.exports = class Hyperblobs {
const core = (opts && opts.core) ? opts.core : this._core
return new BlobWriteStream(core, this._lock, opts)
}

monitor (id) {
const monitor = new Monitor(this, id)
if (this._monitors.size === 0) this._startListening()
this._monitors.add(monitor)
return monitor
}

_removeMonitor (mon) {
this._monitors.delete(mon)
if (this._monitors.size === 0) this._stopListening()
}

_updatePeers () {
for (const m of this._monitors) m._updatePeers()
}

_onUpload (index, bytes, from) {
for (const m of this._monitors) m._onUpload(index, bytes, from)
}

_onDownload (index, bytes, from) {
for (const m of this._monitors) m._onDownload(index, bytes, from)
}

_startListening () {
this.core.on('peer-add', this._boundUpdatePeers)
this.core.on('peer-remove', this._boundUpdatePeers)
this.core.on('upload', this._boundOnUpload)
this.core.on('download', this._boundOnDownload)
}

_stopListening () {
this.core.off('peer-add', this._boundUpdatePeers)
this.core.off('peer-remove', this._boundUpdatePeers)
this.core.off('upload', this._boundOnUpload)
this.core.off('download', this._boundOnDownload)
}
}
92 changes: 92 additions & 0 deletions lib/monitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
const EventEmitter = require('events')
const speedometer = require('speedometer')

module.exports = class Monitor extends EventEmitter {
constructor (blobs, id) {
super()

if (!id) throw new Error('id is required')

this.blobs = blobs
this.id = id
this.peers = 0
this.uploadSpeedometer = null
this.downloadSpeedometer = null

const stats = {
startTime: 0,
percentage: 0,
peers: 0,
speed: 0,
blocks: 0,
totalBytes: 0, // local + bytes loaded during monitoring
monitoringBytes: 0, // bytes loaded during monitoring
targetBytes: 0,
targetBlocks: 0
}

this.uploadStats = { ...stats }
this.downloadStats = { ...stats }
this.uploadStats.targetBytes = this.downloadStats.targetBytes = this.id.byteLength
this.uploadStats.targetBlocks = this.downloadStats.targetBlocks = this.id.blockLength
this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length

this.uploadSpeedometer = speedometer()
this.downloadSpeedometer = speedometer()

// Handlers
}

// just an alias
destroy () {
return this.close()
}

close () {
this.blobs._removeMonitor(this)
}

_onUpload (index, bytes, from) {
this._updateStats(this.uploadSpeedometer, this.uploadStats, index, bytes, from)
}

_onDownload (index, bytes, from) {
this._updateStats(this.downloadSpeedometer, this.downloadStats, index, bytes, from)
}

_updatePeers () {
this.uploadStats.peers = this.downloadStats.peers = this.peers = this.blobs.core.peers.length
this.emit('update')
}

_updateStats (speed, stats, index, bytes) {
if (this.closing) return
if (!isWithinRange(index, this.id)) return

if (!stats.startTime) stats.startTime = Date.now()

stats.speed = speed(bytes)
stats.blocks++
stats.totalBytes += bytes
stats.monitoringBytes += bytes
stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100)

this.emit('update')
}

downloadSpeed () {
return this.downloadSpeedometer ? this.downloadSpeedometer() : 0
}

uploadSpeed () {
return this.uploadSpeedometer ? this.uploadSpeedometer() : 0
}
}

function isWithinRange (index, { blockOffset, blockLength }) {
return index >= blockOffset && index < blockOffset + blockLength
}

function toFixed (n) {
return Math.round(n * 100) / 100
}
8 changes: 8 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@
"index.js",
"lib/**.js"
],
"imports": {
"events": {
"bare": "bare-events",
"default": "events"
}
},
"dependencies": {
"b4a": "^1.6.1",
"bare-events": "^2.5.0",
"hypercore-errors": "^1.1.1",
"mutexify": "^1.4.0",
"speedometer": "^1.1.0",
"streamx": "^2.13.2"
},
"devDependencies": {
Expand Down
71 changes: 71 additions & 0 deletions test/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,77 @@ test('clear with diff option', async function (t) {
t.is(cleared3.blocks, 0)
})

test('upload/download can be monitored', async (t) => {
t.plan(30)

const [a, b] = await createPair()
const blobsA = new Hyperblobs(a)
const blobsB = new Hyperblobs(b)

const bytes = 1024 * 100 // big enough to trigger more than one update event
const buf = Buffer.alloc(bytes, '0')
const id = await blobsA.put(buf)

// add another blob which should not be monitored
const controlId = await blobsA.put(buf)

{
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
const expectedPercentage = [100, 50]

// Start monitoring upload
const monitor = blobsA.monitor(id)
monitor.on('update', () => {
t.is(monitor.uploadStats.blocks, expectedBlocks.pop())
t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop())
t.is(monitor.uploadStats.targetBlocks, 2)
t.is(monitor.uploadStats.targetBytes, bytes)
t.is(monitor.uploadSpeed(), monitor.uploadStats.speed)
t.is(monitor.uploadStats.percentage, expectedPercentage.pop())
t.absent(monitor.downloadStats.blocks)
})
}

{
// Start monitoring download
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
const expectedPercentage = [100, 50]

const monitor = blobsB.monitor(id)
monitor.on('update', () => {
t.is(monitor.downloadStats.blocks, expectedBlocks.pop())
t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop())
t.is(monitor.downloadStats.targetBlocks, 2)
t.is(monitor.downloadStats.targetBytes, bytes)
t.is(monitor.downloadSpeed(), monitor.downloadStats.speed)
t.is(monitor.downloadStats.percentage, expectedPercentage.pop())
t.absent(monitor.uploadStats.blocks)
})
}

const res = await blobsB.get(id)
t.alike(res, buf)

// should not generate events
const controRes = await blobsB.get(controlId)
t.alike(controRes, buf)
})

test('monitor is removed from the Set on close', async (t) => {
const core = new Hypercore(RAM)
const blobs = new Hyperblobs(core)

const bytes = 1024 * 100 // big enough to trigger more than one update event
const buf = Buffer.alloc(bytes, '0')
const id = await blobs.put(buf)
const monitor = blobs.monitor(id)
t.is(blobs._monitors.size, 1)
monitor.close()
t.is(blobs._monitors.size, 0)
})

async function createPair () {
const a = new Hypercore(RAM)
await a.ready()
Expand Down
Loading