Skip to content

Commit

Permalink
In BlockedSource retry blocks if they were
Browse files Browse the repository at this point in the history
aborted via a different signal
  • Loading branch information
constantinius committed Feb 22, 2021
1 parent 07ed101 commit dd3b36f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 25 deletions.
104 changes: 79 additions & 25 deletions src/source/blockedsource.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import LRUCache from 'lru-cache';
import { BaseSource } from './basesource';
import { wait } from '../utils';
import { AbortError, AggregateError, wait, zip } from '../utils';

class Block {
/**
Expand Down Expand Up @@ -100,7 +100,77 @@ export class BlockedSource extends BaseSource {

// allow additional block requests to accumulate
await wait();
this.fetchBlocks(signal);

for (const blockId of missingBlockIds) {
const block = this.blockRequests.get(blockId);
if (!block) {
throw new Error(`Block ${blockId} is not in the block requests`);
}
blockRequests.set(blockId, block);
}

// actually await all pending requests
let results = await Promise.allSettled(blockRequests.values());

// perform retries if a block was interrupted by a previous signal
if (results.some((result) => result.status === 'rejected')) {
const retriedBlockRequests = new Set();
for (const [blockId, result] of zip(blockRequests.keys(), results)) {
const { rejected, reason } = result;
if (rejected) {
// push some blocks back to the to-fetch list if they were
// aborted, but only when a different signal was used
if (reason.name === 'AbortError' && reason.signal !== signal) {
this.blockIdsToFetch.add(blockId);
retriedBlockRequests.add(blockId);
}
}
}

// start the retry of some blocks if required
if (this.blockIdsToFetch.length > 0) {
this.fetchBlocks(signal);
for (const blockId of retriedBlockRequests) {
const block = this.blockRequests.get(blockId);
if (!block) {
throw new Error(`Block ${blockId} is not in the block requests`);
}
blockRequests.set(blockId, block);
}
results = await Promise.allSettled(Array.from(blockRequests.values()));
}
}

// throw an error (either abort error or AggregateError if no abort was done)
if (results.some((result) => result.status === 'rejected')) {
if (signal && signal.aborted) {
throw new AbortError('Request was aborted');
}
throw new AggregateError(
results.filter((result) => result.status === 'rejected').map((result) => result.reason),
'Request failed',
);
}

// extract the actual block responses
const values = results.map((result) => result.value);

// create a final Map, with all required blocks for this request to satisfy
const requiredBlocks = new Map(zip(Array.from(blockRequests.keys()), values));
for (const [blockId, block] of cachedBlocks) {
requiredBlocks.set(blockId, block);
}

// TODO: satisfy each slice
return this.readSliceData(slices, requiredBlocks);
}

/**
*
* @param {AbortSignal} signal
*/
fetchBlocks(signal) {
// check if we still need to
if (this.blockIdsToFetch.size > 0) {
const groups = this.groupBlocks(this.blockIdsToFetch);
Expand All @@ -115,7 +185,7 @@ export class BlockedSource extends BaseSource {
// make an async IIFE for each block
const blockRequest = (async () => {
try {
const response = (await groupRequests)[groupIndex];
const response = (await groupRequests)[groupIndex];
const blockOffset = blockId * this.blockSize;
const o = blockOffset - response.offset;
const t = Math.min(o + this.blockSize, response.data.byteLength);
Expand All @@ -127,6 +197,13 @@ export class BlockedSource extends BaseSource {
);
this.blockCache.set(blockId, block);
return block;
} catch (err) {
if (err.name === 'AbortError') {
// store the signal here, we need it to determine later if an
// error was caused by this signal
err.signal = signal;
}
throw err;
} finally {
this.blockRequests.delete(blockId);
}
Expand All @@ -136,29 +213,6 @@ export class BlockedSource extends BaseSource {
}
this.blockIdsToFetch.clear();
}

for (const blockId of missingBlockIds) {
const block = this.blockRequests.get(blockId);
if (!block) {
throw new Error(`Block ${blockId} is not in the block requests`);
}
blockRequests.set(blockId, block);
}

// TODO: extract
const zip = (a, b) => a.map((k, i) => [k, b[i]]);

// actually await all pending requests
const values = await Promise.all(Array.from(blockRequests.values()));

// create a final Map, with all required blocks for this request to satisfy
const requiredBlocks = new Map(zip(Array.from(blockRequests.keys()), values));
for (const [blockId, block] of cachedBlocks) {
requiredBlocks.set(blockId, block);
}

// TODO: satisfy each slice
return this.readSliceData(slices, requiredBlocks);
}

/**
Expand Down
16 changes: 16 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ export async function wait(milliseconds) {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}

export function zip(a, b) {
const A = Array.isArray(a) ? a : Array.from(a);
const B = Array.isArray(b) ? b : Array.from(b);
return A.map((k, i) => [k, B[i]]);
}


// Based on https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
export class AbortError extends Error {
Expand All @@ -141,3 +147,13 @@ export class AbortError extends Error {
this.name = 'AbortError';
}
}

export class _AggregateError extends Error {
constructor(errors, message) {
this.errors = errors;
this.message = message;
this.name = 'AggregateError';
}
}

export const AggregateError = (typeof AggregateError === 'undefined') ? _AggregateError : AggregateError;

0 comments on commit dd3b36f

Please sign in to comment.