From 87c78f668696116015ec6910cf291c5be6e77098 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Thu, 7 Jul 2022 20:36:12 -0400 Subject: [PATCH] fix(notifier): Revert "Make makeAsyncIterableFromNotifier lossy" Eager consumption led to infinite loops; see https://github.com/Agoric/agoric-sdk/pull/5695 for context. --- packages/notifier/src/asyncIterableAdaptor.js | 51 ++++-- packages/notifier/src/notifier.js | 170 +++++------------- packages/notifier/src/subscriber.js | 4 +- 3 files changed, 80 insertions(+), 145 deletions(-) diff --git a/packages/notifier/src/asyncIterableAdaptor.js b/packages/notifier/src/asyncIterableAdaptor.js index a70a43fbd9b6..c003e2efc867 100644 --- a/packages/notifier/src/asyncIterableAdaptor.js +++ b/packages/notifier/src/asyncIterableAdaptor.js @@ -38,34 +38,49 @@ import './types.js'; export const makeAsyncIterableFromNotifier = notifierP => { return Far('asyncIterableFromNotifier', { [Symbol.asyncIterator]: () => { - /** @type {boolean} */ - let done = false; /** @type {UpdateCount} */ let localUpdateCount; - /** @type {{value: T, done: boolean}} */ - let myIterationResult; + /** @type {Promise<{value: T, done: boolean}> | undefined} */ + let myIterationResultP; return Far('asyncIteratorFromNotifier', { - next: async () => { - // Request another result only if the notifier has not terminated - // (in which case the final result is used for all subsequent - // `next()` calls). - if (!done) { - // This adaptor waits for each result promise to settle - // before returning its own. - // The eager consumption inherent to a notifier is not - // compatible with direct use of the async iterator protocol - // to collect multiple pending results, even though such use - // cases do exist in general as documented at + next: () => { + if (!myIterationResultP) { + // In this adaptor, once `next()` is called and returns an + // unresolved promise, `myIterationResultP`, and until + // `myIterationResultP` is fulfilled with an + // iteration result, further `next()` calls will return the same + // `myIterationResultP` promise again without asking the notifier + // for more updates. If there's already an unanswered ask in the + // air, all further asks should just reuse the result of that one. + // + // This reuse behavior is only needed for code that uses the async + // iterator protocol explicitly. When this async iterator is + // consumed by a for/await/of loop, `next()` will only be called + // after the promise for the previous iteration result has + // fulfilled. If it fulfills with `done: true`, the for/await/of + // loop will never call `next()` again. + // + // See // https://2ality.com/2016/10/asynchronous-iteration.html#queuing-next()-invocations - myIterationResult = await E(notifierP) + // for an explicit use that sends `next()` without waiting. + myIterationResultP = E(notifierP) .getUpdateSince(localUpdateCount) .then(({ value, updateCount }) => { localUpdateCount = updateCount; - done = localUpdateCount === undefined; + const done = localUpdateCount === undefined; + if (!done) { + // Once the outstanding question has been answered, stop + // using that answer, so any further `next()` questions + // cause a new `getUpdateSince` request. + // + // But only if more answers are expected. Once the notifier + // is `done`, that was the last answer so reuse it forever. + myIterationResultP = undefined; + } return harden({ value, done }); }); } - return myIterationResult; + return myIterationResultP; }, }); }, diff --git a/packages/notifier/src/notifier.js b/packages/notifier/src/notifier.js index 3b8acdcd93b7..3689f020989f 100644 --- a/packages/notifier/src/notifier.js +++ b/packages/notifier/src/notifier.js @@ -161,105 +161,13 @@ export const makeNotifierKit = (...initialStateArr) => { export const makeNotifierFromAsyncIterable = asyncIterableP => { const iteratorP = E(asyncIterableP)[Symbol.asyncIterator](); - /** @type {WeakMap, Promise<{value, updateCount}>>} */ - const resultPromiseMap = new WeakMap(); - /** @type {Promise<{value: any, done: boolean}>} */ - let latestResultInP; - /** @type {undefined | Promise<{value, updateCount}>} */ - let latestResultOutP; - /** @type {undefined | Promise<{value, updateCount}>} */ - let nextResultOutP; - /** @type {undefined | ((resolution?: any) => void)} */ - let nextResultInR; + /** @type {Promise>|undefined} */ + let optNextPromise; /** @type {UpdateCount & bigint} */ - let latestUpdateCount = 0n; - let finished = false; - let finalResultOut; - - // Consume results as soon as their predecessors settle. - (async function consumeEagerly() { - try { - let done = false; - while (!done) { - // TODO: Fix this typing friction. - // Possibly related: https://github.com/microsoft/TypeScript/issues/38479 - // @ts-expect-error Tolerate done: undefined. - latestResultInP = E(iteratorP).next(); - if (nextResultInR) { - nextResultInR(latestResultInP); - nextResultInR = undefined; - } - // eslint-disable-next-line no-await-in-loop - const latestResultIn = await latestResultInP; - ({ done } = latestResultIn); - } - } catch (err) {} // eslint-disable-line no-empty - if (nextResultInR) { - // @ts-expect-error It really is fine to use latestResultInP here. - nextResultInR(latestResultInP); - nextResultInR = undefined; - } - })(); - - // Create outbound results on-demand, but at most once. - /** - * @param {Promise<{value: any, done: boolean}>} resultInP - * @returns {Promise<{value, updateCount}>} - */ - function translateInboundResult(resultInP) { - return resultInP.then( - ({ value, done }) => { - // If this is resolving a post-finish request, preserve the final result. - if (finished) { - return finalResultOut; - } - - if (done) { - finished = true; - - // If there is a pending next-value promise, resolve it. - if (nextResultInR) { - nextResultInR(/* irrelevant becaused finished is true */); - nextResultInR = undefined; - } - - // Final results have undefined updateCount. - finalResultOut = harden({ value, updateCount: undefined }); - return finalResultOut; - } - - // Discard any pending promise. - // eslint-disable-next-line no-multi-assign - latestResultOutP = nextResultOutP = nextResultInR = undefined; - - latestUpdateCount += 1n; - return harden({ value, updateCount: latestUpdateCount }); - }, - rejection => { - if (!finished) { - finished = true; - - // If there is a pending next-value promise, resolve it. - if (nextResultInR) { - nextResultInR(resultInP); - nextResultInR = undefined; - } - } - throw rejection; - }, - ); - } - function getLatestResultOutP() { - if (!latestResultOutP) { - assert(latestResultInP !== undefined); - latestResultOutP = resultPromiseMap.get(latestResultInP); - if (!latestResultOutP) { - latestResultOutP = translateInboundResult(latestResultInP); - resultPromiseMap.set(latestResultInP, latestResultOutP); - } - } - return latestResultOutP; - } + let currentUpdateCount = 0n; + /** @type {ERef>|undefined} */ + let currentResponse; + let final = false; /** * @template T @@ -267,37 +175,49 @@ export const makeNotifierFromAsyncIterable = asyncIterableP => { */ const baseNotifier = Far('baseNotifier', { getUpdateSince(updateCount = -1n) { - assert( - updateCount <= latestUpdateCount, - 'argument must be a previously-issued updateCount.', - ); - - // If we don't yet have an inbound result or a promise for an outbound result, - // create the latter. - if (!latestResultInP && !latestResultOutP) { - const { promise, resolve } = makePromiseKit(); - nextResultInR = resolve; - nextResultOutP = translateInboundResult(promise); - latestResultOutP = nextResultOutP; + if (updateCount < currentUpdateCount) { + if (currentResponse) { + return Promise.resolve(currentResponse); + } + } else if (updateCount !== currentUpdateCount) { + throw new Error( + 'getUpdateSince argument must be a previously-issued updateCount.', + ); } - if (updateCount < latestUpdateCount) { - // Each returned promise is unique. - return getLatestResultOutP().then(); + // Return a final response if we have one, otherwise a promise for the next state. + if (final) { + assert(currentResponse !== undefined); + return Promise.resolve(currentResponse); } - - if (!nextResultOutP) { - if (finished) { - nextResultOutP = getLatestResultOutP(); - } else { - const { promise, resolve } = makePromiseKit(); - nextResultInR = resolve; - nextResultOutP = translateInboundResult(promise); - } + if (!optNextPromise) { + const nextIterResultP = E(iteratorP).next(); + optNextPromise = E.when( + nextIterResultP, + ({ done, value }) => { + assert(!final); + if (done) { + final = true; + } + currentUpdateCount += 1n; + currentResponse = harden({ + value, + updateCount: done ? undefined : currentUpdateCount, + }); + optNextPromise = undefined; + return currentResponse; + }, + _reason => { + final = true; + currentResponse = + /** @type {Promise>} */ + (nextIterResultP); + optNextPromise = undefined; + return currentResponse; + }, + ); } - - // Each returned promise is unique. - return nextResultOutP.then(); + return optNextPromise; }, }); diff --git a/packages/notifier/src/subscriber.js b/packages/notifier/src/subscriber.js index ae5edcf37760..3cc01d4a34fc 100644 --- a/packages/notifier/src/subscriber.js +++ b/packages/notifier/src/subscriber.js @@ -46,8 +46,8 @@ const makeSubscriptionIterator = tailP => { return Far('SubscriptionIterator', { subscribe: () => makeSubscription(tailP), [Symbol.asyncIterator]: () => makeSubscriptionIterator(tailP), - next: async () => { - const resultP = await E.get(tailP).head; + next: () => { + const resultP = E.get(tailP).head; tailP = E.get(tailP).tail; Promise.resolve(tailP).catch(() => {}); // suppress unhandled rejection error return resultP;