Skip to content

Commit

Permalink
Refactor batching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
leebyron committed Nov 14, 2019
1 parent 29811db commit aa52a25
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 69 deletions.
18 changes: 18 additions & 0 deletions src/__tests__/dataloader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ describe('Primary API', () => {
expect(loadCalls).toEqual([ [ 1 ] ]);
});

it('coalesces identical requests across sized batches', async () => {
const [ identityLoader, loadCalls ] = idLoader<number>({ maxBatchSize: 2 });

const promise1a = identityLoader.load(1);
const promise2 = identityLoader.load(2);
const promise1b = identityLoader.load(1);
const promise3 = identityLoader.load(3);

const [ value1a, value2, value1b, value3 ] =
await Promise.all([ promise1a, promise2, promise1b, promise3 ]);
expect(value1a).toBe(1);
expect(value2).toBe(2);
expect(value1b).toBe(1);
expect(value3).toBe(3);

expect(loadCalls).toEqual([ [ 1, 2 ], [ 3 ] ]);
});

it('caches repeated requests', async () => {
const [ identityLoader, loadCalls ] = idLoader<string>();

Expand Down
136 changes: 67 additions & 69 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ class DataLoader<K, V, C = K> {
this._batchLoadFn = batchLoadFn;
this._options = options;
this._promiseCache = getValidCacheMap(options);
this._queue = [];
this._batch = null;
}

// Private
_batchLoadFn: BatchLoadFn<K, V>;
_options: ?Options<K, V, C>;
_promiseCache: ?CacheMap<C, Promise<V>>;
_queue: LoaderQueue<K, V>;
_batch: Batch<K, V> | null;

/**
* Loads a key, returning a `Promise` for the value represented by that key.
Expand All @@ -76,7 +76,7 @@ class DataLoader<K, V, C = K> {

// Determine options
var options = this._options;
var shouldBatch = !options || options.batch !== false;
var batch = getCurrentBatch(this);
var cache = this._promiseCache;
var cacheKey = getCacheKey(options, key);

Expand All @@ -88,23 +88,11 @@ class DataLoader<K, V, C = K> {
}
}

// Otherwise, produce a new Promise for this value.
// Otherwise, produce a new Promise for this key, and enqueue it to be
// dispatched along with the current batch.
batch.keys.push(key);
var promise = new Promise((resolve, reject) => {
// Enqueue this Promise to be dispatched.
this._queue.push({ key, resolve, reject });

// Determine if a dispatch of this queue should be scheduled.
// A single dispatch should be scheduled per queue at the time when the
// queue changes from "empty" to "full".
if (this._queue.length === 1) {
if (shouldBatch) {
// If batching, schedule a task to dispatch the queue.
enqueuePostPromiseJob(() => dispatchQueue(this));
} else {
// Otherwise dispatch the (queue of one) immediately.
dispatchQueue(this);
}
}
batch.callbacks.push({ resolve, reject });
});

// If caching, cache this promise.
Expand Down Expand Up @@ -239,43 +227,61 @@ var enqueuePostPromiseJob =
// Private: cached resolved Promise instance
var resolvedPromise;

// Private: given the current state of a Loader instance, perform a batch load
// from its current queue.
function dispatchQueue<K, V>(loader: DataLoader<K, V, any>) {
// Take the current loader queue, replacing it with an empty queue.
var queue = loader._queue;
loader._queue = [];

// If a maxBatchSize was provided and the queue is longer, then segment the
// queue into multiple batches, otherwise treat the queue as a single batch.
var maxBatchSize = loader._options && loader._options.maxBatchSize;
if (maxBatchSize && maxBatchSize > 0 && maxBatchSize < queue.length) {
for (var i = 0; i < queue.length / maxBatchSize; i++) {
dispatchQueueBatch(
loader,
queue.slice(i * maxBatchSize, (i + 1) * maxBatchSize)
);
}
} else {
dispatchQueueBatch(loader, queue);
// Private: Describes a batch of requests
type Batch<K, V> = {
hasDispatched: boolean,
keys: Array<K>,
callbacks: Array<{
resolve: (value: V) => void;
reject: (error: Error) => void;
}>
}

// Private: Either returns the current batch, or creates and schedules a
// dispatch of a new batch for the given loader.
function getCurrentBatch<K, V>(loader: DataLoader<K, V, any>): Batch<K, V> {
var options = loader._options;
var maxBatchSize =
(options && options.maxBatchSize) ||
(options && options.batch === false ? 1 : 0);

// If there is an existing batch which has not yet dispatched and is within
// the limit of the batch size, then return it.
var existingBatch = loader._batch;
if (
existingBatch !== null &&
!existingBatch.hasDispatched &&
(maxBatchSize === 0 || existingBatch.keys.length < maxBatchSize)
) {
return existingBatch;
}

// Otherwise, create a new batch for this loader.
var newBatch = { hasDispatched: false, keys: [], callbacks: [] };

// Store it on the loader so it may be reused.
loader._batch = newBatch;

// Then schedule a task to dispatch this batch of requests.
enqueuePostPromiseJob(() => dispatchBatch(loader, newBatch));

return newBatch;
}

function dispatchQueueBatch<K, V>(
function dispatchBatch<K, V>(
loader: DataLoader<K, V, any>,
queue: LoaderQueue<K, V>
batch: Batch<K, V>
) {
// Collect all keys to be loaded in this dispatch
var keys = queue.map(({ key }) => key);
// Mark this batch as having been dispatched.
batch.hasDispatched = true;

// Call the provided batchLoadFn for this loader with the loader queue's keys.
var batchLoadFn = loader._batchLoadFn;
// Call with the loader as the `this` context.
var batchPromise = batchLoadFn.call(loader, keys);
// Call the provided batchLoadFn for this loader with the batch's keys and
// with the loader as the `this` context.
var batchPromise = loader._batchLoadFn(batch.keys);

// Assert the expected response from batchLoadFn
if (!batchPromise || typeof batchPromise.then !== 'function') {
return failedDispatch(loader, queue, new TypeError(
return failedDispatch(loader, batch, new TypeError(
'DataLoader must be constructed with a function which accepts ' +
'Array<key> and returns Promise<Array<value>>, but the function did ' +
`not return a Promise: ${String(batchPromise)}.`
Expand All @@ -293,41 +299,40 @@ function dispatchQueueBatch<K, V>(
`not return a Promise of an Array: ${String(values)}.`
);
}
if (values.length !== keys.length) {
if (values.length !== batch.keys.length) {
throw new TypeError(
'DataLoader must be constructed with a function which accepts ' +
'Array<key> and returns Promise<Array<value>>, but the function did ' +
'not return a Promise of an Array of the same length as the Array ' +
'of keys.' +
`\n\nKeys:\n${String(keys)}` +
`\n\nKeys:\n${String(batch.keys)}` +
`\n\nValues:\n${String(values)}`
);
}

// Step through the values, resolving or rejecting each Promise in the
// loaded queue.
queue.forEach(({ resolve, reject }, index) => {
var value = values[index];
// Step through values, resolving or rejecting each Promise in the batch.
for (var i = 0; i < batch.callbacks.length; i++) {
var value = values[i];
if (value instanceof Error) {
reject(value);
batch.callbacks[i].reject(value);
} else {
resolve(value);
batch.callbacks[i].resolve(value);
}
});
}).catch(error => failedDispatch(loader, queue, error));
}
}).catch(error => failedDispatch(loader, batch, error));
}

// Private: do not cache individual loads if the entire batch dispatch fails,
// but still reject each request so they do not hang.
function failedDispatch<K, V>(
loader: DataLoader<K, V, any>,
queue: LoaderQueue<K, V>,
batch: Batch<K, V>,
error: Error
) {
queue.forEach(({ key, reject }) => {
loader.clear(key);
reject(error);
});
for (var i = 0; i < batch.keys.length; i++) {
loader.clear(batch.keys[i]);
batch.callbacks[i].reject(error);
}
}

// Private: produce a cache key for a given key (and options)
Expand Down Expand Up @@ -362,13 +367,6 @@ function getValidCacheMap<K, V, C>(
return cacheMap;
}

// Private
type LoaderQueue<K, V> = Array<{
key: K;
resolve: (value: V) => void;
reject: (error: Error) => void;
}>;

// Private
function isArrayLike(x: mixed): boolean {
return (
Expand Down

0 comments on commit aa52a25

Please sign in to comment.