diff --git a/lib/cache/entry.js b/lib/cache/entry.js index 4f94187..7a7572b 100644 --- a/lib/cache/entry.js +++ b/lib/cache/entry.js @@ -1,10 +1,10 @@ const { Request, Response } = require('minipass-fetch') const Minipass = require('minipass') const MinipassFlush = require('minipass-flush') -const MinipassPipeline = require('minipass-pipeline') const cacache = require('cacache') const url = require('url') +const CachingMinipassPipeline = require('../pipeline.js') const CachePolicy = require('./policy.js') const cacheKey = require('./key.js') const remote = require('../remote.js') @@ -269,7 +269,7 @@ class CacheEntry { cacheWriteReject = reject }) - body = new MinipassPipeline(new MinipassFlush({ + body = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, new MinipassFlush({ flush () { return cacheWritePromise }, diff --git a/lib/pipeline.js b/lib/pipeline.js new file mode 100644 index 0000000..b1d221b --- /dev/null +++ b/lib/pipeline.js @@ -0,0 +1,41 @@ +'use strict' + +const MinipassPipeline = require('minipass-pipeline') + +class CachingMinipassPipeline extends MinipassPipeline { + #events = [] + #data = new Map() + + constructor (opts, ...streams) { + // CRITICAL: do NOT pass the streams to the call to super(), this will start + // the flow of data and potentially cause the events we need to catch to emit + // before we've finished our own setup. instead we call super() with no args, + // finish our setup, and then push the streams into ourselves to start the + // data flow + super() + this.#events = opts.events + + /* istanbul ignore next - coverage disabled because this is pointless to test here */ + if (streams.length) { + this.push(...streams) + } + } + + on (event, handler) { + if (this.#events.includes(event) && this.#data.has(event)) { + return handler(...this.#data.get(event)) + } + + return super.on(event, handler) + } + + emit (event, ...data) { + if (this.#events.includes(event)) { + this.#data.set(event, data) + } + + return super.emit(event, ...data) + } +} + +module.exports = CachingMinipassPipeline diff --git a/lib/remote.js b/lib/remote.js index e066260..763fc0d 100644 --- a/lib/remote.js +++ b/lib/remote.js @@ -1,9 +1,9 @@ const Minipass = require('minipass') -const MinipassPipeline = require('minipass-pipeline') const fetch = require('minipass-fetch') const promiseRetry = require('promise-retry') const ssri = require('ssri') +const CachingMinipassPipeline = require('./pipeline.js') const getAgent = require('./agent.js') const pkg = require('../package.json') @@ -53,7 +53,9 @@ const remoteFetch = (request, options) => { // we got a 200 response and the user has specified an expected // integrity value, so wrap the response in an ssri stream to verify it const integrityStream = ssri.integrityStream({ integrity: _opts.integrity }) - const pipeline = new MinipassPipeline(res.body, integrityStream) + const pipeline = new CachingMinipassPipeline({ + events: ['integrity', 'size'], + }, res.body, integrityStream) // we also propagate the integrity and size events out to the pipeline so we can use // this new response body as an integrityEmitter for cacache integrityStream.on('integrity', i => pipeline.emit('integrity', i)) diff --git a/test/pipeline.js b/test/pipeline.js new file mode 100644 index 0000000..c595b4d --- /dev/null +++ b/test/pipeline.js @@ -0,0 +1,40 @@ +'use strict' + +const events = require('events') +const ssri = require('ssri') +const t = require('tap') + +const CachingMinipassPipeline = require('../lib/pipeline.js') + +t.test('caches events and emits them again for new listeners', async (t) => { + const INTEGRITY = ssri.fromData('foobarbazbuzz') + const integrityStream = ssri.integrityStream() + const pipeline = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, integrityStream) + integrityStream.on('size', s => pipeline.emit('size', s)) + integrityStream.on('integrity', i => pipeline.emit('integrity', i)) + + pipeline.write('foobarbazbuzz') + pipeline.resume() + // delay ending the stream so the early listeners will get the first events + setImmediate(() => pipeline.end()) + + const [earlySize, earlyIntegrity] = await Promise.all([ + events.once(pipeline, 'size').then(res => res[0]), + events.once(pipeline, 'integrity').then(res => res[0]), + ]) + + // now wait for the stream itself to have ended + await pipeline.promise() + + // and add new listeners + const [lateSize, lateIntegrity] = await Promise.all([ + events.once(pipeline, 'size').then(res => res[0]), + events.once(pipeline, 'integrity').then(res => res[0]), + ]) + + // and make sure we got the same results + t.equal(earlySize, 13, 'got the right size') + t.same(earlyIntegrity, INTEGRITY, 'got the right integrity') + t.same(earlySize, lateSize, 'got the same size early and late') + t.same(earlyIntegrity, lateIntegrity, 'got the same integrity early and late') +})