Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(notifier)!: Stop retaining durable publish kit values in RAM #7459

Merged
merged 6 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 87 additions & 66 deletions packages/notifier/src/publish-kit.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const makeQuietRejection = reason => {
void E.when(rejection, sink, sink);
return rejection;
};
const tooFarRejection = makeQuietRejection(
harden(new Error('Cannot read past end of iteration.')),
);

export const PublisherI = M.interface('Publisher', {
publish: M.call(M.any()).returns(),
Expand Down Expand Up @@ -97,9 +100,7 @@ export const makePublishKit = () => {
const resolveCurrent = tailR;

if (done) {
tailP = makeQuietRejection(
new Error('Cannot read past end of iteration.'),
);
tailP = tooFarRejection;
tailR = undefined;
} else {
({ promise: tailP, resolve: tailR } = makePromiseKit());
Expand Down Expand Up @@ -216,75 +217,90 @@ const initDurablePublishKitState = (options = {}) => {
*/
const getEphemeralKey = facets => facets.publisher;

/** @type {WeakMap<DurablePublishKitEphemeralKey, {currentP, tailP, tailR}>} */
/**
* @typedef DurablePublishKitEphemeralData
* @property {Promise<*> | undefined} currentP The current-result promise
* (undefined unless resolved with unrecoverable ephemeral data)
* @property {Promise<*>} tailP The next-result promise
* @property {((value: any) => void) | undefined} tailR The next-result resolver
* (undefined when the publish kit has terminated)
*/

/** @type {WeakMap<DurablePublishKitEphemeralKey, DurablePublishKitEphemeralData>} */
const durablePublishKitEphemeralData = new WeakMap();

/**
* Returns the current/next-result promises and next-result resolver
* associated with a given durable publish kit.
* They are lost on upgrade, but recreated on-demand.
* Such recreation preserves the value in (but not the identity of) the
* current { value, done } result when possible, which is always the
* case when that value is terminal (i.e., from `finish` or `fail`) or
* when the durable publish kit is configured with
* `valueDurability: 'mandatory'`.
* Returns the current-result promise associated with a given durable
* publish kit, recreated unless we already have one with retained
* ephemeral data.
*
* @param {DurablePublishKitState} state
* @param {PublishKit<*>} facets
* @param {Promise<*>} tail
* @returns {Promise<*>}
*/
const provideCurrentP = (state, facets, tail) => {
const ephemeralKey = getEphemeralKey(facets);
const foundData = durablePublishKitEphemeralData.get(ephemeralKey);
const currentP = foundData && foundData.currentP;
if (currentP) {
return currentP;
}

const { publishCount, status, hasValue, value } = state;
if (!hasValue) {
assert(status === 'live');
return tail;
}
if (status === 'live' || status === 'finished') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use switch (status) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considered, but rejected because I want provideCurrentP and provideDurablePublishKitEphemeralData to remain compact.

const cell = harden({
head: { value, done: status !== 'live' },
publishCount,
tail,
});
return E.resolve(cell);
} else if (status === 'failed') {
return makeQuietRejection(value);
} else {
throw Fail`Invalid durable promise kit status: ${q(status)}`;
}
};

/**
* Returns the next-result promise and resolver associated with a given
* durable publish kit.
* These are lost on upgrade but recreated on-demand, preserving the
* value in (but not the identity of) the current { value, done } result
* when possible, which is always the case when that value is terminal
* (i.e., from `finish` or `fail`) or when the durable publish kit is
* configured with `valueDurability: 'mandatory'`.
*
* @param {DurablePublishKitState} state
* @param {PublishKit<*>} facets
* @returns {DurablePublishKitEphemeralData}
*/
const provideDurablePublishKitEphemeralData = (state, facets) => {
const ephemeralKey = getEphemeralKey(facets);
const foundData = durablePublishKitEphemeralData.get(ephemeralKey);
if (foundData) {
return foundData;
}
const { status, publishCount } = state;
/** @type {object} */
let newData;
if (status === 'failed') {
newData = {
currentP: makeQuietRejection(state.value),
tailP: makeQuietRejection(
new Error('Cannot read past end of iteration.'),
),
tailR: undefined,
};
} else if (status === 'finished') {
const tailP = makeQuietRejection(
new Error('Cannot read past end of iteration.'),
);
newData = {
currentP: E.resolve(
harden({
head: { value: state.value, done: true },
publishCount,
tail: tailP,
}),
),
tailP,
tailR: undefined,
};
} else if (status === 'live') {
const { promise: tailP, resolve: tailR } = makePromiseKit();

const { status } = state;
let tailP;
let tailR;
if (status === 'live') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again?

({ promise: tailP, resolve: tailR } = makePromiseKit());
void E.when(tailP, sink, sink);
newData = {
currentP: state.hasValue
? E.resolve(
harden({
head: { value: state.value, done: false },
publishCount,
tail: tailP,
}),
)
: tailP,
tailP,
tailR,
};
} else if (status === 'finished' || status === 'failed') {
tailP = tooFarRejection;
} else {
Fail`Invalid durable promise kit status: ${q(status)}`;
throw Fail`Invalid durable promise kit status: ${q(status)}`;
}
durablePublishKitEphemeralData.set(ephemeralKey, harden(newData));
return newData;
// currentP is not ephemeral when restoring from persisted state.
const obj = harden({ currentP: undefined, tailP, tailR });
durablePublishKitEphemeralData.set(ephemeralKey, obj);
return obj;
};

/**
Expand All @@ -304,32 +320,30 @@ const advanceDurablePublishKit = (context, value, targetStatus = 'live') => {
if (done || valueDurability === 'mandatory') {
canBeDurable(value) || Fail`Cannot accept non-durable value: ${value}`;
}
const { tailP: currentP, tailR: resolveCurrent } =
const { tailP: oldTailP, tailR: resolveOldTail } =
provideDurablePublishKitEphemeralData(state, facets);
assert.typeof(resolveOldTail, 'function');

const publishCount = state.publishCount + 1n;
state.publishCount = publishCount;

let tailP;
let tailR;

if (done) {
state.status = targetStatus;
tailP = makeQuietRejection(new Error('Cannot read past end of iteration.'));
tailP = tooFarRejection;
tailR = undefined;
} else {
({ promise: tailP, resolve: tailR } = makePromiseKit());
void E.when(tailP, sink, sink);
}
durablePublishKitEphemeralData.set(
getEphemeralKey(facets),
harden({ currentP, tailP, tailR }),
);

let currentP;
if (targetStatus === 'failed') {
state.hasValue = true;
state.value = value;
const rejection = makeQuietRejection(value);
resolveCurrent(rejection);
resolveOldTail(rejection);
} else {
// Persist a terminal value, or a non-terminal value
// if configured as 'mandatory' or 'opportunistic'.
Expand All @@ -339,16 +353,23 @@ const advanceDurablePublishKit = (context, value, targetStatus = 'live') => {
} else {
state.hasValue = false;
state.value = undefined;
// Retain any promise with non-durable resolution.
currentP = oldTailP;
}

resolveCurrent(
resolveOldTail(
harden({
head: { value, done },
publishCount,
tail: tailP,
}),
);
}

durablePublishKitEphemeralData.set(
getEphemeralKey(facets),
harden({ currentP, tailP, tailR }),
);
};

/**
Expand Down Expand Up @@ -396,7 +417,7 @@ export const prepareDurablePublishKit = (baggage, kindName) => {
if (publishCount === currentPublishCount) {
return tailP;
} else if (publishCount < currentPublishCount) {
return currentP;
return currentP || provideCurrentP(state, facets, tailP);
} else {
throw new Error(
'subscribeAfter argument must be a previously-issued publishCount.',
Expand Down
22 changes: 16 additions & 6 deletions packages/notifier/test/test-publish-kit.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,23 @@ const assertCells = (t, label, cells, publishCount, expected, options = {}) => {
t.is(firstCell.publishCount, publishCount, `${label} cell publishCount`);

if (strict) {
t.deepEqual(
new Set(cells),
new Set([firstCell]),
`all ${label} cells must referentially match`,
);
const { head, ...otherProps } = firstCell;
for (const [headKey, headValue] of Object.entries(head)) {
t.deepEqual(
new Set(cells.map(cell => cell.head[headKey])),
new Set([headValue]),
`the head ${q(headKey)} of each ${label} cell must referentially match`,
);
}
for (const [key, value] of Object.entries(otherProps)) {
t.deepEqual(
new Set(cells.map(cell => cell[key])),
new Set([value]),
`the ${q(key)} of each ${label} cell must referentially match`,
);
}
} else {
const { tail: _tail, ...props } = cells[0];
const { tail: _tail, ...props } = firstCell;
cells.slice(1).forEach((cell, i) => {
t.like(cell, props, `${label} cell ${i + 1} must match cell 0`);
});
Expand Down