Skip to content

Commit

Permalink
fix(core): undo prior manual event detection (#2287)
Browse files Browse the repository at this point in the history
* undo prior art of catching errors manually

* possible solution within closure definition of incremental fetch

* rename
  • Loading branch information
JoviDeCroock authored Feb 17, 2022
1 parent 4792f34 commit abed646
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 103 deletions.
6 changes: 6 additions & 0 deletions .changeset/four-pens-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@urql/core': patch
---

Undo logic to catch errors from incremental fetching and forking the response stream, introduce logic
to detect results
210 changes: 107 additions & 103 deletions packages/core/src/internal/fetchSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,103 +17,6 @@ const toString = (input: Buffer | ArrayBuffer): string =>
? (input as Buffer).toString()
: decoder!.decode(input as ArrayBuffer);

// DERIVATIVE: Copyright (c) 2021 Marais Rossouw <hi@marais.io>
// See: https://github.com/maraisr/meros/blob/219fe95/src/browser.ts
const executeIncrementalFetch = (
onResult: (result: OperationResult) => void,
operation: Operation,
response: Response
): Promise<void> => {
// NOTE: Guarding against fetch polyfills here
const contentType =
(response.headers && response.headers.get('Content-Type')) || '';
if (!/multipart\/mixed/i.test(contentType)) {
return response.json().then(payload => {
onResult(makeResult(operation, payload, response));
});
}

let boundary = '---';
const boundaryHeader = contentType.match(boundaryHeaderRe);
if (boundaryHeader) boundary = '--' + boundaryHeader[1];

let read: () => Promise<ChunkData>;
let cancel = () => {
/*noop*/
};
if (asyncIterator && response[asyncIterator]) {
const iterator = response[asyncIterator]();
read = iterator.next.bind(iterator);
} else if ('body' in response && response.body) {
const reader = response.body.getReader();
cancel = reader.cancel.bind(reader);
read = reader.read.bind(reader);
} else {
throw new TypeError('Streaming requests unsupported');
}

let buffer = '';
let isPreamble = true;
let nextResult: OperationResult | null = null;
let prevResult: OperationResult | null = null;

function next(data: ChunkData): Promise<void> | void {
if (!data.done) {
const chunk = toString(data.value);
let boundaryIndex = chunk.indexOf(boundary);
if (boundaryIndex > -1) {
boundaryIndex += buffer.length;
} else {
boundaryIndex = buffer.indexOf(boundary);
}

buffer += chunk;
while (boundaryIndex > -1) {
const current = buffer.slice(0, boundaryIndex);
const next = buffer.slice(boundaryIndex + boundary.length);

if (isPreamble) {
isPreamble = false;
} else {
const headersEnd = current.indexOf('\r\n\r\n') + 4;
const headers = current.slice(0, headersEnd);
const body = current.slice(headersEnd, current.lastIndexOf('\r\n'));

let payload: any;
if (jsonHeaderRe.test(headers)) {
try {
payload = JSON.parse(body);
nextResult = prevResult = prevResult
? mergeResultPatch(prevResult, payload, response)
: makeResult(operation, payload, response);
} catch (_error) {}
}

if (next.slice(0, 2) === '--' || (payload && !payload.hasNext)) {
if (!prevResult)
return onResult(makeResult(operation, {}, response));
break;
}
}

buffer = next;
boundaryIndex = buffer.indexOf(boundary);
}
}

if (nextResult) {
onResult(nextResult);
nextResult = null;
}

if (!data.done && (!prevResult || prevResult.hasNext)) {
return read().then(next);
}
}

return read().then(next).finally(cancel);
};

export const makeFetchSource = (
operation: Operation,
url: string,
Expand All @@ -129,6 +32,111 @@ export const makeFetchSource = (
fetchOptions.signal = abortController.signal;
}

let hasResults = false;
// DERIVATIVE: Copyright (c) 2021 Marais Rossouw <hi@marais.io>
// See: https://github.com/maraisr/meros/blob/219fe95/src/browser.ts
const executeIncrementalFetch = (
onResult: (result: OperationResult) => void,
operation: Operation,
response: Response
): Promise<void> => {
// NOTE: Guarding against fetch polyfills here
const contentType =
(response.headers && response.headers.get('Content-Type')) || '';
if (!/multipart\/mixed/i.test(contentType)) {
return response.json().then(payload => {
const result = makeResult(operation, payload, response);
hasResults = true;
onResult(result);
});
}

let boundary = '---';
const boundaryHeader = contentType.match(boundaryHeaderRe);
if (boundaryHeader) boundary = '--' + boundaryHeader[1];

let read: () => Promise<ChunkData>;
let cancel = () => {
/*noop*/
};
if (asyncIterator && response[asyncIterator]) {
const iterator = response[asyncIterator]();
read = iterator.next.bind(iterator);
} else if ('body' in response && response.body) {
const reader = response.body.getReader();
cancel = reader.cancel.bind(reader);
read = reader.read.bind(reader);
} else {
throw new TypeError('Streaming requests unsupported');
}

let buffer = '';
let isPreamble = true;
let nextResult: OperationResult | null = null;
let prevResult: OperationResult | null = null;

function next(data: ChunkData): Promise<void> | void {
if (!data.done) {
const chunk = toString(data.value);
let boundaryIndex = chunk.indexOf(boundary);
if (boundaryIndex > -1) {
boundaryIndex += buffer.length;
} else {
boundaryIndex = buffer.indexOf(boundary);
}

buffer += chunk;
while (boundaryIndex > -1) {
const current = buffer.slice(0, boundaryIndex);
const next = buffer.slice(boundaryIndex + boundary.length);

if (isPreamble) {
isPreamble = false;
} else {
const headersEnd = current.indexOf('\r\n\r\n') + 4;
const headers = current.slice(0, headersEnd);
const body = current.slice(
headersEnd,
current.lastIndexOf('\r\n')
);

let payload: any;
if (jsonHeaderRe.test(headers)) {
try {
payload = JSON.parse(body);
nextResult = prevResult = prevResult
? mergeResultPatch(prevResult, payload, response)
: makeResult(operation, payload, response);
} catch (_error) {}
}

if (next.slice(0, 2) === '--' || (payload && !payload.hasNext)) {
if (!prevResult)
return onResult(makeResult(operation, {}, response));
break;
}
}

buffer = next;
boundaryIndex = buffer.indexOf(boundary);
}
} else {
hasResults = true;
}

if (nextResult) {
onResult(nextResult);
nextResult = null;
}

if (!data.done && (!prevResult || prevResult.hasNext)) {
return read().then(next);
}
}

return read().then(next).finally(cancel);
};

let ended = false;
let statusNotOk = false;
let response: Response;
Expand All @@ -146,12 +154,8 @@ export const makeFetchSource = (
})
.then(complete)
.catch((error: Error) => {
if (error.name === 'SyntaxError' || error.name === 'TypeError') {
const e = new Error(error.message);
e.stack =
e.stack!.split('\n').slice(0, 2).join('\n') + '\n' + error.stack;
e.constructor = error.constructor;
throw e;
if (hasResults) {
throw error;
}

if (error.name !== 'AbortError') {
Expand Down

0 comments on commit abed646

Please sign in to comment.