Skip to content

Commit

Permalink
Manual iterators to work around unstoppable async generator
Browse files Browse the repository at this point in the history
  • Loading branch information
benjie committed Oct 8, 2021
1 parent 384d495 commit b216efc
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 37 deletions.
76 changes: 45 additions & 31 deletions packages/dataplan-pg/src/PgSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,55 @@ export class PgSubscriber<
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this;
const { eventEmitter, topics } = this;
const asyncIterableIterator = (async function* () {
let waiting: Deferred<any> | null = null;
const stack: any[] = [];
function recv(payload: any) {
if (waiting) {
waiting.resolve(payload);
waiting = null;
} else {
stack.push(payload);
let waiting: Deferred<any> | null = null;
const stack: any[] = [];

function doFinally() {
eventEmitter.removeListener(topic as string, recv);
// Every code path above this has to go through a `yield` and thus
// `asyncIterableIterator` will definitely be defined.
const idx = topics[topic]?.indexOf(asyncIterableIterator);
if (idx != null && idx >= 0) {
topics[topic]!.splice(idx, 1);
if (topics[topic]!.length === 0) {
delete topics[topic];
that.unlisten(topic);
}
}
eventEmitter.addListener(topic as string, recv);
try {
while (true) {
if (stack.length) {
yield stack.shift();
} else {
waiting = defer();
yield await waiting;
}
}
} finally {
eventEmitter.removeListener(topic as string, recv);
// Every code path above this has to go through a `yield` and thus
// `asyncIterableIterator` will definitely be defined.
const idx = topics[topic]?.indexOf(asyncIterableIterator!);
if (idx != null && idx >= 0) {
topics[topic]!.splice(idx, 1);
if (topics[topic]!.length === 0) {
delete topics[topic];
that.unlisten(topic);
}
}

const asyncIterableIterator: AsyncIterableIterator<any> = {
[Symbol.asyncIterator]() {
return this;
},
async next() {
if (stack.length) {
return { done: false, value: stack.shift() };
} else {
waiting = defer();
return { done: false, value: waiting };
}
},
async return(value) {
doFinally();
return { done: true, value: value };
},
async throw() {
doFinally();
return { done: true, value: undefined };
},
};

function recv(payload: any) {
if (waiting) {
waiting.resolve(payload);
waiting = null;
} else {
stack.push(payload);
}
})();
}
eventEmitter.addListener(topic as string, recv);

if (!topics[topic]) {
topics[topic] = [asyncIterableIterator];
this.listen(topic);
Expand Down
2 changes: 2 additions & 0 deletions packages/dataplan-pg/src/plans/pgSelect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,8 @@ export class PgSelectPlan<TDataSource extends PgSource<any, any, any, any>>
// Munge the initialCount records into the streams

return streams.map((stream, idx) => {
// TODO: Merge the initial results and the stream together manually to
// avoid unstoppable async generator problem.
return (async function* () {
const l = initialFetchResult[idx].length;
try {
Expand Down
33 changes: 27 additions & 6 deletions packages/graphile-crystal/src/aether.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2332,15 +2332,20 @@ export class Aether<
// TODO: we should be optimise this to call executeLayers once, rather than once per crystalLayerObject.
return executeLayers(rest, newCLOs, mapResult);
} else if (isAsyncIterable(listResult)) {
return (async function* () {
for await (const result of listResult) {
const listResultIterator = listResult[Symbol.asyncIterator]();
const asyncIterator: AsyncIterableIterator<any> = {
[Symbol.asyncIterator]() {
return this;
},
async next() {
const nextPromise = listResultIterator.next();
const copy = new PlanResults(
planResultsByCommonAncestorPathIdentity,
);
copy.set(
layerPlan.commonAncestorPathIdentity,
layerPlan.id,
result,
await nextPromise, // TODO: Make this abortable?
);
const newCLO = newCrystalLayerObject(
parentCrystalObject,
Expand All @@ -2352,9 +2357,25 @@ export class Aether<
[newCLO],
mapResult,
);
yield value;
}
})();
return { done: false, value };
},
return(value) {
return (
listResultIterator.return?.(value) ||
Promise.resolve({
done: true,
value: undefined,
})
);
},
throw(e) {
return (
listResultIterator.throw?.(e) ||
Promise.resolve({ done: true, value: undefined })
);
},
};
return asyncIterator;
} else {
if (listResult != null) {
console.error(
Expand Down

0 comments on commit b216efc

Please sign in to comment.