Skip to content

Commit

Permalink
bit of progress on batch execution
Browse files Browse the repository at this point in the history
looks a bit funny, though
  • Loading branch information
yaacovCR committed Apr 12, 2021
1 parent d10cfd3 commit c118992
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/batch-execute/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
},
"dependencies": {
"@graphql-tools/utils": "^7.7.0",
"@graphql-tools/pubsub": "^7.0.0",
"dataloader": "2.0.0",
"is-promise": "4.0.0",
"tslib": "~2.2.0"
Expand Down
31 changes: 25 additions & 6 deletions packages/batch-execute/src/splitResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ExecutionResult, GraphQLError } from 'graphql';
import isPromise from 'is-promise';

import { AsyncExecutionResult, isAsyncIterable, relocatedError } from '@graphql-tools/utils';
import { InMemoryChannel } from '@graphql-tools/pubsub';

import { parseKey } from './prefix';

Expand All @@ -16,8 +17,8 @@ export function splitResult(
numResults: number
): Array<
| ExecutionResult
| AsyncIterableIterator<ExecutionResult>
| Promise<ExecutionResult | AsyncIterableIterator<ExecutionResult>>
| AsyncIterableIterator<AsyncExecutionResult>
| Promise<ExecutionResult | AsyncIterableIterator<AsyncExecutionResult>>
> {
if (isPromise(mergedResult)) {
const result = mergedResult.then(r => splitExecutionResultOrAsyncIterableIterator(r, numResults));
Expand All @@ -32,13 +33,31 @@ export function splitResult(
return splitExecutionResultOrAsyncIterableIterator(mergedResult, numResults);
}

async function iterate(
mergedResult: AsyncIterableIterator<AsyncExecutionResult>,
channel: InMemoryChannel<AsyncExecutionResult>
): Promise<void> {
for await (const asyncResult of mergedResult) {
channel.publish(asyncResult);
}
}

export function splitExecutionResultOrAsyncIterableIterator(
mergedResult: ExecutionResult | AsyncIterableIterator<ExecutionResult>,
mergedResult: ExecutionResult | AsyncIterableIterator<AsyncExecutionResult>,
numResults: number
): Array<ExecutionResult | AsyncIterableIterator<ExecutionResult>> {
): Array<ExecutionResult | AsyncIterableIterator<AsyncExecutionResult>> {
if (isAsyncIterable(mergedResult)) {
// TODO: add implementation
return undefined;
const channel = new InMemoryChannel();

const asyncIterables: Array<AsyncIterableIterator<AsyncExecutionResult>> = [];
for (let i = 0; i < numResults; i++) {
// TODO: add filter and map functionality
asyncIterables.push(channel.subscribe());
}

setImmediate(() => iterate(mergedResult, channel));

return asyncIterables;
}

return splitExecutionResult(mergedResult, numResults);
Expand Down

0 comments on commit c118992

Please sign in to comment.