From 9d6f025415749fe82cededfe4aa4d3e0f65fcdb3 Mon Sep 17 00:00:00 2001 From: Richard Gibson Date: Thu, 7 Jul 2022 20:40:33 -0400 Subject: [PATCH] feat(notifier): Add makeNotifierFromSubscriber Fixes #5413 --- packages/notifier/src/index.js | 1 + packages/notifier/src/notifier.js | 102 ++++++++- .../notifier/test/test-notifier-adaptor.js | 212 ++++++++++++++++++ 3 files changed, 314 insertions(+), 1 deletion(-) diff --git a/packages/notifier/src/index.js b/packages/notifier/src/index.js index dae1ada62e1c..24a23cc42857 100644 --- a/packages/notifier/src/index.js +++ b/packages/notifier/src/index.js @@ -10,6 +10,7 @@ export { makeNotifier, makeNotifierKit, makeNotifierFromAsyncIterable, + makeNotifierFromSubscriber, } from './notifier.js'; export { makeSubscription, makeSubscriptionKit } from './subscriber.js'; export { diff --git a/packages/notifier/src/notifier.js b/packages/notifier/src/notifier.js index 3689f020989f..b4e5e0348356 100644 --- a/packages/notifier/src/notifier.js +++ b/packages/notifier/src/notifier.js @@ -6,6 +6,7 @@ import { makePromiseKit } from '@endo/promise-kit'; import { E } from '@endo/eventual-send'; import { Far } from '@endo/marshal'; import { makeAsyncIterableFromNotifier } from './asyncIterableAdaptor.js'; +import { subscribeLatest } from './publish-kit.js'; import './types.js'; @@ -152,7 +153,106 @@ export const makeNotifierKit = (...initialStateArr) => { }; /** - * Adaptor from async iterable to notifier. + * @template T + * @param {ERef>} subscriberP + * @returns {Promise>} + */ +export const makeNotifierFromSubscriber = async subscriberP => { + const iterable = subscribeLatest(subscriberP); + const iterator = iterable[Symbol.asyncIterator](); + + /** @type {undefined | Promise<{value, updateCount}>} */ + let latestResultOutP; + /** @type {undefined | Promise<{value, updateCount}>} */ + let nextResultOutP; + /** @type {UpdateCount & bigint} */ + let latestUpdateCount = 0n; + let finished = false; + + /** + * @param {Promise>} resultInP + * @returns {Promise<{value, updateCount}>} + */ + function translateInboundResult(resultInP) { + return resultInP.then( + ({ value, done }) => { + // Shift the result-out promises. + latestResultOutP = nextResultOutP; + nextResultOutP = undefined; + + if (done) { + finished = true; + + // Final results have undefined updateCount. + return harden({ value, updateCount: undefined }); + } + + latestUpdateCount += 1n; + return harden({ value, updateCount: latestUpdateCount }); + }, + rejection => { + // Shift the result-out promises. + latestResultOutP = nextResultOutP; + nextResultOutP = undefined; + + finished = true; + throw rejection; + }, + ); + } + + /** + * @template T + * @type {BaseNotifier} + */ + const baseNotifier = Far('baseNotifier', { + getUpdateSince(updateCount = -1n) { + assert( + updateCount <= latestUpdateCount, + 'argument must be a previously-issued updateCount.', + ); + + // If we don't yet have a promise for the latest outbound result, create it. + if (!latestResultOutP) { + nextResultOutP = translateInboundResult(iterator.next()); + latestResultOutP = nextResultOutP; + } + + let resultP = + updateCount < latestUpdateCount || finished + ? latestResultOutP + : nextResultOutP; + + if (!resultP) { + nextResultOutP = translateInboundResult(iterator.next()); + resultP = nextResultOutP; + } + + // Each returned promise is unique. + return resultP.then(); + }, + }); + + /** @type {Notifier} */ + const notifier = Far('notifier', { + ...makeAsyncIterableFromNotifier(baseNotifier), + ...baseNotifier, + + /** + * Use this to distribute a Notifier efficiently over the network, + * by obtaining this from the Notifier to be replicated, and applying + * `makeNotifier` to it at the new site to get an equivalent local + * Notifier at that site. + */ + getSharableNotifierInternals: () => baseNotifier, + getStoreKey: () => harden({ notifier }), + }); + return notifier; +}; +harden(makeNotifierFromSubscriber); + +/** + * Deprecated adaptor from async iterable to notifier. * * @template T * @param {ERef>} asyncIterableP diff --git a/packages/notifier/test/test-notifier-adaptor.js b/packages/notifier/test/test-notifier-adaptor.js index 59d38e5fdc5d..14202a908750 100644 --- a/packages/notifier/test/test-notifier-adaptor.js +++ b/packages/notifier/test/test-notifier-adaptor.js @@ -2,9 +2,13 @@ import { test } from './prepare-test-env-ava.js'; +// eslint-disable-next-line import/order +import { E } from '@endo/eventual-send'; import { makeAsyncIterableFromNotifier, + makeEmptyPublishKit, makeNotifierFromAsyncIterable, + makeNotifierFromSubscriber, observeIteration, observeNotifier, } from '../src/index.js'; @@ -136,3 +140,211 @@ test('notifier adaptor - update from notifier fails', t => { const n = makeNotifierFromAsyncIterable(explodingStream); return observeNotifier(n, u); }); + +// /////////////////////////////// makeNotifierFromSubscriber ///////////////////////////////// + +/** @param {{conclusionMethod: 'finish' | 'fail', conclusionValue: any}} config */ +const makeGeometricPublishKit = ({ conclusionMethod, conclusionValue }) => { + const { publisher, subscriber } = makeEmptyPublishKit(); + + // Publish in geometric batches: [1], [2], [3, 4], [5, 6, 7, 8], ... + let nextValue = 1; + let nextBatchSize = 1; + let done = false; + const initialize = () => { + publisher.publish(nextValue); + nextValue += 1; + }; + const publishNextBatch = () => { + if (done) { + return; + } + for (let i = 0; i < nextBatchSize; i += 1) { + publisher.publish(nextValue); + nextValue += 1; + } + nextBatchSize *= 2; + if (nextValue > 64) { + done = true; + if (conclusionMethod === 'fail') { + publisher.fail(conclusionValue); + } else { + publisher.finish(conclusionValue); + } + } + }; + + return { initialize, publishNextBatch, subscriber }; +}; + +test('makeNotifierFromSubscriber(finishes) - for-await-of iteration', async t => { + const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + initialize(); + + const results = []; + for await (const result of notifier) { + results.push(result); + publishNextBatch(); + } + + t.deepEqual( + results, + [1, 2, 4, 8, 16, 32], + 'only the last of values published between iteration steps should be observed', + ); +}); + +test('makeNotifierFromSubscriber(finishes) - getUpdateSince', async t => { + const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + initialize(); + + const results = []; + let updateCount; + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + const result = await notifier.getUpdateSince(updateCount); + ({ updateCount } = result); + results.push(result.value); + // eslint-disable-next-line no-await-in-loop + t.deepEqual(await notifier.getUpdateSince(), result); + if (updateCount === undefined) { + break; + } + // eslint-disable-next-line no-await-in-loop + await publishNextBatch(); + // eslint-disable-next-line no-await-in-loop + t.deepEqual(await notifier.getUpdateSince(), result); + } + + t.deepEqual( + results, + [1, 2, 4, 8, 16, 32, 'done'], + 'only the last of values published between iteration steps should be observed', + ); + t.deepEqual(await notifier.getUpdateSince(), { + value: 'done', + updateCount: undefined, + }); +}); + +// TODO: makeNotifierFromSubscriber(fails) + +test('makeNotifierFromSubscriber - getUpdateSince timing', async t => { + const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + + const sequence = []; + const firstP = notifier.getUpdateSince(); + firstP.then(_ => sequence.push('resolve firstP')); + const firstP2 = notifier.getUpdateSince(); + firstP2.then(_ => sequence.push('resolve firstP2')); + + await E(Promise).resolve(); + t.deepEqual( + sequence, + [], + 'getUpdateSince() should not settle before a value is published', + ); + + initialize(); + await E(Promise).resolve(); + await E(Promise).resolve(); + t.deepEqual( + sequence, + ['resolve firstP', 'resolve firstP2'], + 'getUpdateSince() should settle after a value is published', + ); + + publishNextBatch(); + t.deepEqual( + [await firstP, await firstP2, await notifier.getUpdateSince()].map( + result => result.value, + ), + [1, 1, 1], + 'early getUpdateSince() should resolve to the first value', + ); + + publishNextBatch(); + const lateResult = await notifier.getUpdateSince(); + t.deepEqual( + lateResult.value, + 1, + 'late getUpdateSince() should still fulfill with the latest value', + ); + t.deepEqual( + [ + await notifier.getUpdateSince(lateResult.updateCount), + await notifier.getUpdateSince(), + await notifier.getUpdateSince(), + ].map(result => result.value), + [4, 4, 4], + 'getUpdateSince(updateCount) should advance to the latest value', + ); + + publishNextBatch(); + await E(Promise).resolve(); + await E(Promise).resolve(); + t.deepEqual( + (await notifier.getUpdateSince(lateResult.updateCount)).value, + 4, + 'getUpdateSince(oldUpdateCount) should fulfill with the latest value', + ); +}); + +test('makeNotifierFromSubscriber - updateCount validation', async t => { + const { subscriber } = makeGeometricPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + t.throws(() => notifier.getUpdateSince(1n)); +}); + +test('makeNotifierFromSubscriber - getUpdateSince() promise uniqueness', async t => { + const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({ + conclusionMethod: 'finish', + conclusionValue: 'done', + }); + const notifier = await makeNotifierFromSubscriber(subscriber); + const firstP = notifier.getUpdateSince(); + t.not( + notifier.getUpdateSince(), + firstP, + 'early getUpdateSince() promises should be distinct', + ); + + initialize(); + await E(Promise).resolve(); + await E(Promise).resolve(); + const { updateCount } = await firstP; + t.not( + notifier.getUpdateSince(updateCount), + notifier.getUpdateSince(updateCount), + 'getUpdateSince(updateCount) promises should be distinct', + ); + + publishNextBatch(); + await notifier.getUpdateSince(updateCount); + t.not( + notifier.getUpdateSince(), + notifier.getUpdateSince(), + 'late getUpdateSince() promises should be distinct', + ); + t.not( + notifier.getUpdateSince(), + notifier.getUpdateSince(updateCount), + 'getUpdateSince() promises should be distinct from getUpdateSince(updateCount) promises', + ); +});