Skip to content

Commit

Permalink
fix: cache integrity and size events so late listeners still get them (
Browse files Browse the repository at this point in the history
  • Loading branch information
nlf authored May 19, 2022
1 parent c5b8c79 commit 8c78584
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 4 deletions.
4 changes: 2 additions & 2 deletions lib/cache/entry.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
const { Request, Response } = require('minipass-fetch')
const Minipass = require('minipass')
const MinipassFlush = require('minipass-flush')
const MinipassPipeline = require('minipass-pipeline')
const cacache = require('cacache')
const url = require('url')

const CachingMinipassPipeline = require('../pipeline.js')
const CachePolicy = require('./policy.js')
const cacheKey = require('./key.js')
const remote = require('../remote.js')
Expand Down Expand Up @@ -269,7 +269,7 @@ class CacheEntry {
cacheWriteReject = reject
})

body = new MinipassPipeline(new MinipassFlush({
body = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, new MinipassFlush({
flush () {
return cacheWritePromise
},
Expand Down
41 changes: 41 additions & 0 deletions lib/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict'

const MinipassPipeline = require('minipass-pipeline')

class CachingMinipassPipeline extends MinipassPipeline {
#events = []
#data = new Map()

constructor (opts, ...streams) {
// CRITICAL: do NOT pass the streams to the call to super(), this will start
// the flow of data and potentially cause the events we need to catch to emit
// before we've finished our own setup. instead we call super() with no args,
// finish our setup, and then push the streams into ourselves to start the
// data flow
super()
this.#events = opts.events

/* istanbul ignore next - coverage disabled because this is pointless to test here */
if (streams.length) {
this.push(...streams)
}
}

on (event, handler) {
if (this.#events.includes(event) && this.#data.has(event)) {
return handler(...this.#data.get(event))
}

return super.on(event, handler)
}

emit (event, ...data) {
if (this.#events.includes(event)) {
this.#data.set(event, data)
}

return super.emit(event, ...data)
}
}

module.exports = CachingMinipassPipeline
6 changes: 4 additions & 2 deletions lib/remote.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const Minipass = require('minipass')
const MinipassPipeline = require('minipass-pipeline')
const fetch = require('minipass-fetch')
const promiseRetry = require('promise-retry')
const ssri = require('ssri')

const CachingMinipassPipeline = require('./pipeline.js')
const getAgent = require('./agent.js')
const pkg = require('../package.json')

Expand Down Expand Up @@ -53,7 +53,9 @@ const remoteFetch = (request, options) => {
// we got a 200 response and the user has specified an expected
// integrity value, so wrap the response in an ssri stream to verify it
const integrityStream = ssri.integrityStream({ integrity: _opts.integrity })
const pipeline = new MinipassPipeline(res.body, integrityStream)
const pipeline = new CachingMinipassPipeline({
events: ['integrity', 'size'],
}, res.body, integrityStream)
// we also propagate the integrity and size events out to the pipeline so we can use
// this new response body as an integrityEmitter for cacache
integrityStream.on('integrity', i => pipeline.emit('integrity', i))
Expand Down
40 changes: 40 additions & 0 deletions test/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'

const events = require('events')
const ssri = require('ssri')
const t = require('tap')

const CachingMinipassPipeline = require('../lib/pipeline.js')

t.test('caches events and emits them again for new listeners', async (t) => {
const INTEGRITY = ssri.fromData('foobarbazbuzz')
const integrityStream = ssri.integrityStream()
const pipeline = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, integrityStream)
integrityStream.on('size', s => pipeline.emit('size', s))
integrityStream.on('integrity', i => pipeline.emit('integrity', i))

pipeline.write('foobarbazbuzz')
pipeline.resume()
// delay ending the stream so the early listeners will get the first events
setImmediate(() => pipeline.end())

const [earlySize, earlyIntegrity] = await Promise.all([
events.once(pipeline, 'size').then(res => res[0]),
events.once(pipeline, 'integrity').then(res => res[0]),
])

// now wait for the stream itself to have ended
await pipeline.promise()

// and add new listeners
const [lateSize, lateIntegrity] = await Promise.all([
events.once(pipeline, 'size').then(res => res[0]),
events.once(pipeline, 'integrity').then(res => res[0]),
])

// and make sure we got the same results
t.equal(earlySize, 13, 'got the right size')
t.same(earlyIntegrity, INTEGRITY, 'got the right integrity')
t.same(earlySize, lateSize, 'got the same size early and late')
t.same(earlyIntegrity, lateIntegrity, 'got the same integrity early and late')
})

0 comments on commit 8c78584

Please sign in to comment.