Skip to content

Commit

Permalink
feat(notifier): Add makeNotifierFromSubscriber
Browse files Browse the repository at this point in the history
Fixes #5413
  • Loading branch information
gibson042 committed Jul 8, 2022
1 parent 87c78f6 commit 9d6f025
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 1 deletion.
1 change: 1 addition & 0 deletions packages/notifier/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
makeNotifier,
makeNotifierKit,
makeNotifierFromAsyncIterable,
makeNotifierFromSubscriber,
} from './notifier.js';
export { makeSubscription, makeSubscriptionKit } from './subscriber.js';
export {
Expand Down
102 changes: 101 additions & 1 deletion packages/notifier/src/notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { makePromiseKit } from '@endo/promise-kit';
import { E } from '@endo/eventual-send';
import { Far } from '@endo/marshal';
import { makeAsyncIterableFromNotifier } from './asyncIterableAdaptor.js';
import { subscribeLatest } from './publish-kit.js';

import './types.js';

Expand Down Expand Up @@ -152,7 +153,106 @@ export const makeNotifierKit = (...initialStateArr) => {
};

/**
* Adaptor from async iterable to notifier.
* @template T
* @param {ERef<Subscriber<T>>} subscriberP
* @returns {Promise<Notifier<T>>}
*/
export const makeNotifierFromSubscriber = async subscriberP => {
const iterable = subscribeLatest(subscriberP);
const iterator = iterable[Symbol.asyncIterator]();

/** @type {undefined | Promise<{value, updateCount}>} */
let latestResultOutP;
/** @type {undefined | Promise<{value, updateCount}>} */
let nextResultOutP;
/** @type {UpdateCount & bigint} */
let latestUpdateCount = 0n;
let finished = false;

/**
* @param {Promise<IteratorResult<any, any>>} resultInP
* @returns {Promise<{value, updateCount}>}
*/
function translateInboundResult(resultInP) {
return resultInP.then(
({ value, done }) => {
// Shift the result-out promises.
latestResultOutP = nextResultOutP;
nextResultOutP = undefined;

if (done) {
finished = true;

// Final results have undefined updateCount.
return harden({ value, updateCount: undefined });
}

latestUpdateCount += 1n;
return harden({ value, updateCount: latestUpdateCount });
},
rejection => {
// Shift the result-out promises.
latestResultOutP = nextResultOutP;
nextResultOutP = undefined;

finished = true;
throw rejection;
},
);
}

/**
* @template T
* @type {BaseNotifier<T>}
*/
const baseNotifier = Far('baseNotifier', {
getUpdateSince(updateCount = -1n) {
assert(
updateCount <= latestUpdateCount,
'argument must be a previously-issued updateCount.',
);

// If we don't yet have a promise for the latest outbound result, create it.
if (!latestResultOutP) {
nextResultOutP = translateInboundResult(iterator.next());
latestResultOutP = nextResultOutP;
}

let resultP =
updateCount < latestUpdateCount || finished
? latestResultOutP
: nextResultOutP;

if (!resultP) {
nextResultOutP = translateInboundResult(iterator.next());
resultP = nextResultOutP;
}

// Each returned promise is unique.
return resultP.then();
},
});

/** @type {Notifier<T>} */
const notifier = Far('notifier', {
...makeAsyncIterableFromNotifier(baseNotifier),
...baseNotifier,

/**
* Use this to distribute a Notifier efficiently over the network,
* by obtaining this from the Notifier to be replicated, and applying
* `makeNotifier` to it at the new site to get an equivalent local
* Notifier at that site.
*/
getSharableNotifierInternals: () => baseNotifier,
getStoreKey: () => harden({ notifier }),
});
return notifier;
};
harden(makeNotifierFromSubscriber);

/**
* Deprecated adaptor from async iterable to notifier.
*
* @template T
* @param {ERef<AsyncIterable<T>>} asyncIterableP
Expand Down
212 changes: 212 additions & 0 deletions packages/notifier/test/test-notifier-adaptor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import { test } from './prepare-test-env-ava.js';

// eslint-disable-next-line import/order
import { E } from '@endo/eventual-send';
import {
makeAsyncIterableFromNotifier,
makeEmptyPublishKit,
makeNotifierFromAsyncIterable,
makeNotifierFromSubscriber,
observeIteration,
observeNotifier,
} from '../src/index.js';
Expand Down Expand Up @@ -136,3 +140,211 @@ test('notifier adaptor - update from notifier fails', t => {
const n = makeNotifierFromAsyncIterable(explodingStream);
return observeNotifier(n, u);
});

// /////////////////////////////// makeNotifierFromSubscriber /////////////////////////////////

/** @param {{conclusionMethod: 'finish' | 'fail', conclusionValue: any}} config */
const makeGeometricPublishKit = ({ conclusionMethod, conclusionValue }) => {
const { publisher, subscriber } = makeEmptyPublishKit();

// Publish in geometric batches: [1], [2], [3, 4], [5, 6, 7, 8], ...
let nextValue = 1;
let nextBatchSize = 1;
let done = false;
const initialize = () => {
publisher.publish(nextValue);
nextValue += 1;
};
const publishNextBatch = () => {
if (done) {
return;
}
for (let i = 0; i < nextBatchSize; i += 1) {
publisher.publish(nextValue);
nextValue += 1;
}
nextBatchSize *= 2;
if (nextValue > 64) {
done = true;
if (conclusionMethod === 'fail') {
publisher.fail(conclusionValue);
} else {
publisher.finish(conclusionValue);
}
}
};

return { initialize, publishNextBatch, subscriber };
};

test('makeNotifierFromSubscriber(finishes) - for-await-of iteration', async t => {
const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({
conclusionMethod: 'finish',
conclusionValue: 'done',
});
const notifier = await makeNotifierFromSubscriber(subscriber);
initialize();

const results = [];
for await (const result of notifier) {
results.push(result);
publishNextBatch();
}

t.deepEqual(
results,
[1, 2, 4, 8, 16, 32],
'only the last of values published between iteration steps should be observed',
);
});

test('makeNotifierFromSubscriber(finishes) - getUpdateSince', async t => {
const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({
conclusionMethod: 'finish',
conclusionValue: 'done',
});
const notifier = await makeNotifierFromSubscriber(subscriber);
initialize();

const results = [];
let updateCount;
// eslint-disable-next-line no-constant-condition
while (true) {
// eslint-disable-next-line no-await-in-loop
const result = await notifier.getUpdateSince(updateCount);
({ updateCount } = result);
results.push(result.value);
// eslint-disable-next-line no-await-in-loop
t.deepEqual(await notifier.getUpdateSince(), result);
if (updateCount === undefined) {
break;
}
// eslint-disable-next-line no-await-in-loop
await publishNextBatch();
// eslint-disable-next-line no-await-in-loop
t.deepEqual(await notifier.getUpdateSince(), result);
}

t.deepEqual(
results,
[1, 2, 4, 8, 16, 32, 'done'],
'only the last of values published between iteration steps should be observed',
);
t.deepEqual(await notifier.getUpdateSince(), {
value: 'done',
updateCount: undefined,
});
});

// TODO: makeNotifierFromSubscriber(fails)

test('makeNotifierFromSubscriber - getUpdateSince timing', async t => {
const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({
conclusionMethod: 'finish',
conclusionValue: 'done',
});
const notifier = await makeNotifierFromSubscriber(subscriber);

const sequence = [];
const firstP = notifier.getUpdateSince();
firstP.then(_ => sequence.push('resolve firstP'));
const firstP2 = notifier.getUpdateSince();
firstP2.then(_ => sequence.push('resolve firstP2'));

await E(Promise).resolve();
t.deepEqual(
sequence,
[],
'getUpdateSince() should not settle before a value is published',
);

initialize();
await E(Promise).resolve();
await E(Promise).resolve();
t.deepEqual(
sequence,
['resolve firstP', 'resolve firstP2'],
'getUpdateSince() should settle after a value is published',
);

publishNextBatch();
t.deepEqual(
[await firstP, await firstP2, await notifier.getUpdateSince()].map(
result => result.value,
),
[1, 1, 1],
'early getUpdateSince() should resolve to the first value',
);

publishNextBatch();
const lateResult = await notifier.getUpdateSince();
t.deepEqual(
lateResult.value,
1,
'late getUpdateSince() should still fulfill with the latest value',
);
t.deepEqual(
[
await notifier.getUpdateSince(lateResult.updateCount),
await notifier.getUpdateSince(),
await notifier.getUpdateSince(),
].map(result => result.value),
[4, 4, 4],
'getUpdateSince(updateCount) should advance to the latest value',
);

publishNextBatch();
await E(Promise).resolve();
await E(Promise).resolve();
t.deepEqual(
(await notifier.getUpdateSince(lateResult.updateCount)).value,
4,
'getUpdateSince(oldUpdateCount) should fulfill with the latest value',
);
});

test('makeNotifierFromSubscriber - updateCount validation', async t => {
const { subscriber } = makeGeometricPublishKit({
conclusionMethod: 'finish',
conclusionValue: 'done',
});
const notifier = await makeNotifierFromSubscriber(subscriber);
t.throws(() => notifier.getUpdateSince(1n));
});

test('makeNotifierFromSubscriber - getUpdateSince() promise uniqueness', async t => {
const { initialize, publishNextBatch, subscriber } = makeGeometricPublishKit({
conclusionMethod: 'finish',
conclusionValue: 'done',
});
const notifier = await makeNotifierFromSubscriber(subscriber);
const firstP = notifier.getUpdateSince();
t.not(
notifier.getUpdateSince(),
firstP,
'early getUpdateSince() promises should be distinct',
);

initialize();
await E(Promise).resolve();
await E(Promise).resolve();
const { updateCount } = await firstP;
t.not(
notifier.getUpdateSince(updateCount),
notifier.getUpdateSince(updateCount),
'getUpdateSince(updateCount) promises should be distinct',
);

publishNextBatch();
await notifier.getUpdateSince(updateCount);
t.not(
notifier.getUpdateSince(),
notifier.getUpdateSince(),
'late getUpdateSince() promises should be distinct',
);
t.not(
notifier.getUpdateSince(),
notifier.getUpdateSince(updateCount),
'getUpdateSince() promises should be distinct from getUpdateSince(updateCount) promises',
);
});

0 comments on commit 9d6f025

Please sign in to comment.