From 2a41c93378ae14a348f43eaad46336cda1cb3627 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Wed, 19 Apr 2023 16:11:28 -0400 Subject: [PATCH 1/5] fix(notifier)!: Stop retaining durable publish kit values in RAM Fixes #7298 This change is breaking only because the identity of promises and `head` objects is no longer preserved. --- packages/notifier/src/publish-kit.js | 152 ++++++++++++--------- packages/notifier/test/test-publish-kit.js | 22 ++- 2 files changed, 102 insertions(+), 72 deletions(-) diff --git a/packages/notifier/src/publish-kit.js b/packages/notifier/src/publish-kit.js index d99afb8b8e0..876e350006c 100644 --- a/packages/notifier/src/publish-kit.js +++ b/packages/notifier/src/publish-kit.js @@ -15,6 +15,8 @@ const makeQuietRejection = reason => { void E.when(rejection, sink, sink); return rejection; }; +const makeTooFarRejection = () => + makeQuietRejection(new Error('Cannot read past end of iteration.')); export const PublisherI = M.interface('Publisher', { publish: M.call(M.any()).returns(), @@ -97,9 +99,7 @@ export const makePublishKit = () => { const resolveCurrent = tailR; if (done) { - tailP = makeQuietRejection( - new Error('Cannot read past end of iteration.'), - ); + tailP = makeTooFarRejection(); tailR = undefined; } else { ({ promise: tailP, resolve: tailR } = makePromiseKit()); @@ -216,21 +216,67 @@ const initDurablePublishKitState = (options = {}) => { */ const getEphemeralKey = facets => facets.publisher; -/** @type {WeakMap} */ +/** + * @typedef DurablePublishKitEphemeralData + * @property {Promise<*> | undefined} currentP The current-result promise + * (undefined unless resolved with unrecoverable ephemeral data) + * @property {Promise<*>} tailP The next-result promise + * @property {import('@endo/promise-kit').PromiseKit<*>['resolve'] | undefined} tailR The next-result resolver + * (undefined when the publish kit has terminated) + */ + +/** @type {WeakMap} */ const durablePublishKitEphemeralData = new WeakMap(); /** - * Returns the current/next-result promises and next-result resolver - * associated with a given durable publish kit. - * They are lost on upgrade, but recreated on-demand. - * Such recreation preserves the value in (but not the identity of) the - * current { value, done } result when possible, which is always the - * case when that value is terminal (i.e., from `finish` or `fail`) or - * when the durable publish kit is configured with - * `valueDurability: 'mandatory'`. + * Returns the current-result promise associated with a given durable + * publish kit, recreated unless we already have one with retained + * ephemeral data. * * @param {DurablePublishKitState} state * @param {PublishKit<*>} facets + * @param {Promise<*>} tail + * @returns {Promise<*>} + */ +const provideCurrentP = (state, facets, tail) => { + const ephemeralKey = getEphemeralKey(facets); + const foundData = durablePublishKitEphemeralData.get(ephemeralKey); + const currentP = foundData?.currentP; + if (currentP) { + return currentP; + } + + const { publishCount, status, hasValue, value } = state; + if (!hasValue) { + assert(status === 'live'); + return tail; + } + if (status === 'live' || status === 'finished') { + const cell = harden({ + head: { value, done: status !== 'live' }, + publishCount, + tail, + }); + return E.resolve(cell); + } else if (status === 'failed') { + return makeQuietRejection(value); + } else { + throw Fail`Invalid durable promise kit status: ${q(status)}`; + } +}; + +/** + * Returns the next-result promise and resolver associated with a given + * durable publish kit. + * These are lost on upgrade but recreated on-demand, preserving the + * value in (but not the identity of) the current { value, done } result + * when possible, which is always the case when that value is terminal + * (i.e., from `finish` or `fail`) or when the durable publish kit is + * configured with `valueDurability: 'mandatory'`. + * + * @param {DurablePublishKitState} state + * @param {PublishKit<*>} facets + * @returns {DurablePublishKitEphemeralData} */ const provideDurablePublishKitEphemeralData = (state, facets) => { const ephemeralKey = getEphemeralKey(facets); @@ -238,53 +284,22 @@ const provideDurablePublishKitEphemeralData = (state, facets) => { if (foundData) { return foundData; } - const { status, publishCount } = state; - /** @type {object} */ - let newData; - if (status === 'failed') { - newData = { - currentP: makeQuietRejection(state.value), - tailP: makeQuietRejection( - new Error('Cannot read past end of iteration.'), - ), - tailR: undefined, - }; - } else if (status === 'finished') { - const tailP = makeQuietRejection( - new Error('Cannot read past end of iteration.'), - ); - newData = { - currentP: E.resolve( - harden({ - head: { value: state.value, done: true }, - publishCount, - tail: tailP, - }), - ), - tailP, - tailR: undefined, - }; - } else if (status === 'live') { - const { promise: tailP, resolve: tailR } = makePromiseKit(); + + const { status } = state; + let tailP; + let tailR; + if (status === 'live') { + ({ promise: tailP, resolve: tailR } = makePromiseKit()); void E.when(tailP, sink, sink); - newData = { - currentP: state.hasValue - ? E.resolve( - harden({ - head: { value: state.value, done: false }, - publishCount, - tail: tailP, - }), - ) - : tailP, - tailP, - tailR, - }; + } else if (status === 'finished' || status === 'failed') { + tailP = makeTooFarRejection(); } else { - Fail`Invalid durable promise kit status: ${q(status)}`; + throw Fail`Invalid durable promise kit status: ${q(status)}`; } - durablePublishKitEphemeralData.set(ephemeralKey, harden(newData)); - return newData; + // currentP is not ephemeral when restoring from persisted state. + const obj = harden({ currentP: undefined, tailP, tailR }); + durablePublishKitEphemeralData.set(ephemeralKey, obj); + return obj; }; /** @@ -304,32 +319,30 @@ const advanceDurablePublishKit = (context, value, targetStatus = 'live') => { if (done || valueDurability === 'mandatory') { canBeDurable(value) || Fail`Cannot accept non-durable value: ${value}`; } - const { tailP: currentP, tailR: resolveCurrent } = + const { tailP: oldTailP, tailR: resolveOldTail } = provideDurablePublishKitEphemeralData(state, facets); + assert.typeof(resolveOldTail, 'function'); const publishCount = state.publishCount + 1n; state.publishCount = publishCount; + let tailP; let tailR; - if (done) { state.status = targetStatus; - tailP = makeQuietRejection(new Error('Cannot read past end of iteration.')); + tailP = makeTooFarRejection(); tailR = undefined; } else { ({ promise: tailP, resolve: tailR } = makePromiseKit()); void E.when(tailP, sink, sink); } - durablePublishKitEphemeralData.set( - getEphemeralKey(facets), - harden({ currentP, tailP, tailR }), - ); + let currentP; if (targetStatus === 'failed') { state.hasValue = true; state.value = value; const rejection = makeQuietRejection(value); - resolveCurrent(rejection); + resolveOldTail(rejection); } else { // Persist a terminal value, or a non-terminal value // if configured as 'mandatory' or 'opportunistic'. @@ -339,9 +352,11 @@ const advanceDurablePublishKit = (context, value, targetStatus = 'live') => { } else { state.hasValue = false; state.value = undefined; + // Retain any promise with non-durable resolution. + currentP = oldTailP; } - resolveCurrent( + resolveOldTail( harden({ head: { value, done }, publishCount, @@ -349,6 +364,11 @@ const advanceDurablePublishKit = (context, value, targetStatus = 'live') => { }), ); } + + durablePublishKitEphemeralData.set( + getEphemeralKey(facets), + harden({ currentP, tailP, tailR }), + ); }; /** @@ -396,7 +416,7 @@ export const prepareDurablePublishKit = (baggage, kindName) => { if (publishCount === currentPublishCount) { return tailP; } else if (publishCount < currentPublishCount) { - return currentP; + return currentP || provideCurrentP(state, facets, tailP); } else { throw new Error( 'subscribeAfter argument must be a previously-issued publishCount.', diff --git a/packages/notifier/test/test-publish-kit.js b/packages/notifier/test/test-publish-kit.js index 02820c6cc5c..4c66ae4463f 100644 --- a/packages/notifier/test/test-publish-kit.js +++ b/packages/notifier/test/test-publish-kit.js @@ -63,13 +63,23 @@ const assertCells = (t, label, cells, publishCount, expected, options = {}) => { t.is(firstCell.publishCount, publishCount, `${label} cell publishCount`); if (strict) { - t.deepEqual( - new Set(cells), - new Set([firstCell]), - `all ${label} cells must referentially match`, - ); + const { head, ...otherProps } = firstCell; + for (const [headKey, headValue] of Object.entries(head)) { + t.deepEqual( + new Set(cells.map(cell => cell.head[headKey])), + new Set([headValue]), + `the head ${q(headKey)} of each ${label} cell must referentially match`, + ); + } + for (const [key, value] of Object.entries(otherProps)) { + t.deepEqual( + new Set(cells.map(cell => cell[key])), + new Set([value]), + `the ${q(key)} of each ${label} cell must referentially match`, + ); + } } else { - const { tail: _tail, ...props } = cells[0]; + const { tail: _tail, ...props } = firstCell; cells.slice(1).forEach((cell, i) => { t.like(cell, props, `${label} cell ${i + 1} must match cell 0`); }); From da954ebb9b37a637fd6815eaea9fa33f6faad31a Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Mon, 24 Apr 2023 12:46:25 -0400 Subject: [PATCH 2/5] chore(notifier): Harden constructed errors good hygiene and avoids leaking some memory --- packages/notifier/src/publish-kit.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/notifier/src/publish-kit.js b/packages/notifier/src/publish-kit.js index 876e350006c..7e37eeed498 100644 --- a/packages/notifier/src/publish-kit.js +++ b/packages/notifier/src/publish-kit.js @@ -15,8 +15,10 @@ const makeQuietRejection = reason => { void E.when(rejection, sink, sink); return rejection; }; -const makeTooFarRejection = () => - makeQuietRejection(new Error('Cannot read past end of iteration.')); +const makeTooFarRejection = () => { + const err = harden(new Error('Cannot read past end of iteration.')); + return makeQuietRejection(err); +}; export const PublisherI = M.interface('Publisher', { publish: M.call(M.any()).returns(), From e4f067d8b63779cb26d73071811e3f49fc7850b2 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Mon, 24 Apr 2023 12:47:56 -0400 Subject: [PATCH 3/5] chore(notifier): Simplify a parameter type --- packages/notifier/src/publish-kit.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/notifier/src/publish-kit.js b/packages/notifier/src/publish-kit.js index 7e37eeed498..921efdd45f5 100644 --- a/packages/notifier/src/publish-kit.js +++ b/packages/notifier/src/publish-kit.js @@ -223,7 +223,7 @@ const getEphemeralKey = facets => facets.publisher; * @property {Promise<*> | undefined} currentP The current-result promise * (undefined unless resolved with unrecoverable ephemeral data) * @property {Promise<*>} tailP The next-result promise - * @property {import('@endo/promise-kit').PromiseKit<*>['resolve'] | undefined} tailR The next-result resolver + * @property {((value: any) => void) | undefined} tailR The next-result resolver * (undefined when the publish kit has terminated) */ From 7764e365cd147f8189871c9c753273fa40a44ea2 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Mon, 24 Apr 2023 13:24:30 -0400 Subject: [PATCH 4/5] refactor(notifier): Make the "read too far" rejection a singleton Reduces memory leakage, cf. https://github.com/Agoric/agoric-sdk/pull/7459#discussion_r1174436174 --- packages/notifier/src/publish-kit.js | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/notifier/src/publish-kit.js b/packages/notifier/src/publish-kit.js index 921efdd45f5..f146f8cc0db 100644 --- a/packages/notifier/src/publish-kit.js +++ b/packages/notifier/src/publish-kit.js @@ -15,10 +15,9 @@ const makeQuietRejection = reason => { void E.when(rejection, sink, sink); return rejection; }; -const makeTooFarRejection = () => { - const err = harden(new Error('Cannot read past end of iteration.')); - return makeQuietRejection(err); -}; +const tooFarRejection = makeQuietRejection( + harden(new Error('Cannot read past end of iteration.')), +); export const PublisherI = M.interface('Publisher', { publish: M.call(M.any()).returns(), @@ -101,7 +100,7 @@ export const makePublishKit = () => { const resolveCurrent = tailR; if (done) { - tailP = makeTooFarRejection(); + tailP = tooFarRejection; tailR = undefined; } else { ({ promise: tailP, resolve: tailR } = makePromiseKit()); @@ -294,7 +293,7 @@ const provideDurablePublishKitEphemeralData = (state, facets) => { ({ promise: tailP, resolve: tailR } = makePromiseKit()); void E.when(tailP, sink, sink); } else if (status === 'finished' || status === 'failed') { - tailP = makeTooFarRejection(); + tailP = tooFarRejection; } else { throw Fail`Invalid durable promise kit status: ${q(status)}`; } @@ -332,7 +331,7 @@ const advanceDurablePublishKit = (context, value, targetStatus = 'live') => { let tailR; if (done) { state.status = targetStatus; - tailP = makeTooFarRejection(); + tailP = tooFarRejection; tailR = undefined; } else { ({ promise: tailP, resolve: tailR } = makePromiseKit()); From 5bcb2cf10442aa967e9d20b6fa5efcb1b7bf56f6 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Wed, 26 Apr 2023 10:40:59 -0400 Subject: [PATCH 5/5] chore(notifier): Replace optional chaining deploy-script-support can't have nice things --- packages/notifier/src/publish-kit.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/notifier/src/publish-kit.js b/packages/notifier/src/publish-kit.js index f146f8cc0db..a4938370a9c 100644 --- a/packages/notifier/src/publish-kit.js +++ b/packages/notifier/src/publish-kit.js @@ -242,7 +242,7 @@ const durablePublishKitEphemeralData = new WeakMap(); const provideCurrentP = (state, facets, tail) => { const ephemeralKey = getEphemeralKey(facets); const foundData = durablePublishKitEphemeralData.get(ephemeralKey); - const currentP = foundData?.currentP; + const currentP = foundData && foundData.currentP; if (currentP) { return currentP; }