From 5e703b2f867d90a0e6af948da06de482f6f202df Mon Sep 17 00:00:00 2001 From: Lee Byron Date: Thu, 14 Nov 2019 10:05:18 -0800 Subject: [PATCH] Refactor batching logic --- src/__tests__/dataloader.test.js | 17 ++++ src/index.js | 136 +++++++++++++++---------------- 2 files changed, 84 insertions(+), 69 deletions(-) diff --git a/src/__tests__/dataloader.test.js b/src/__tests__/dataloader.test.js index aebf5f7..b7a0ac9 100644 --- a/src/__tests__/dataloader.test.js +++ b/src/__tests__/dataloader.test.js @@ -106,6 +106,23 @@ describe('Primary API', () => { expect(loadCalls).toEqual([ [ 1 ] ]); }); + it('coalesces identical requests across sized batches', async () => { + const [ identityLoader, loadCalls ] = idLoader({ maxBatchSize: 2 }); + + const promise1a = identityLoader.load(1); + const promise2 = identityLoader.load(2); + const promise1b = identityLoader.load(1); + const promise3 = identityLoader.load(3); + + const [ value1a, value2, value1b, value3 ] = await Promise.all([ promise1a, promise2, promise1b, promise3 ]); + expect(value1a).toBe(1); + expect(value2).toBe(2); + expect(value1b).toBe(1); + expect(value3).toBe(3); + + expect(loadCalls).toEqual([ [ 1, 2 ], [ 3 ] ]); + }); + it('caches repeated requests', async () => { const [ identityLoader, loadCalls ] = idLoader(); diff --git a/src/index.js b/src/index.js index f109d48..694fa7b 100644 --- a/src/index.js +++ b/src/index.js @@ -54,14 +54,14 @@ class DataLoader { this._batchLoadFn = batchLoadFn; this._options = options; this._promiseCache = getValidCacheMap(options); - this._queue = []; + this._batch = null; } // Private _batchLoadFn: BatchLoadFn; _options: ?Options; _promiseCache: ?CacheMap>; - _queue: LoaderQueue; + _batch: Batch | null; /** * Loads a key, returning a `Promise` for the value represented by that key. @@ -76,7 +76,7 @@ class DataLoader { // Determine options var options = this._options; - var shouldBatch = !options || options.batch !== false; + var batch = getCurrentBatch(this); var cache = this._promiseCache; var cacheKey = getCacheKey(options, key); @@ -88,23 +88,11 @@ class DataLoader { } } - // Otherwise, produce a new Promise for this value. + // Otherwise, produce a new Promise for this key, and enqueue it to be + // dispatched along with the current batch. + batch.keys.push(key); var promise = new Promise((resolve, reject) => { - // Enqueue this Promise to be dispatched. - this._queue.push({ key, resolve, reject }); - - // Determine if a dispatch of this queue should be scheduled. - // A single dispatch should be scheduled per queue at the time when the - // queue changes from "empty" to "full". - if (this._queue.length === 1) { - if (shouldBatch) { - // If batching, schedule a task to dispatch the queue. - enqueuePostPromiseJob(() => dispatchQueue(this)); - } else { - // Otherwise dispatch the (queue of one) immediately. - dispatchQueue(this); - } - } + batch.callbacks.push({ resolve, reject }); }); // If caching, cache this promise. @@ -234,43 +222,61 @@ var enqueuePostPromiseJob = // Private: cached resolved Promise instance var resolvedPromise; -// Private: given the current state of a Loader instance, perform a batch load -// from its current queue. -function dispatchQueue(loader: DataLoader) { - // Take the current loader queue, replacing it with an empty queue. - var queue = loader._queue; - loader._queue = []; - - // If a maxBatchSize was provided and the queue is longer, then segment the - // queue into multiple batches, otherwise treat the queue as a single batch. - var maxBatchSize = loader._options && loader._options.maxBatchSize; - if (maxBatchSize && maxBatchSize > 0 && maxBatchSize < queue.length) { - for (var i = 0; i < queue.length / maxBatchSize; i++) { - dispatchQueueBatch( - loader, - queue.slice(i * maxBatchSize, (i + 1) * maxBatchSize) - ); - } - } else { - dispatchQueueBatch(loader, queue); +// Private: Describes a batch of requests +type Batch = { + hasDispatched: boolean, + keys: Array, + callbacks: Array<{ + resolve: (value: V) => void; + reject: (error: Error) => void; + }> +} + +// Private: Either returns the current batch, or creates and schedules a +// dispatch of a new batch for the given loader. +function getCurrentBatch(loader: DataLoader): Batch { + var options = loader._options; + var maxBatchSize = + (options && options.maxBatchSize) || + (options && options.batch === false ? 1 : 0); + + // If there is an existing batch which has not yet dispatched and is within + // the limit of the batch size, then return it. + var existingBatch = loader._batch; + if ( + existingBatch !== null && + !existingBatch.hasDispatched && + (maxBatchSize === 0 || existingBatch.keys.length < maxBatchSize) + ) { + return existingBatch; } + + // Otherwise, create a new batch for this loader. + var newBatch = { hasDispatched: false, keys: [], callbacks: [] }; + + // Store it on the loader so it may be reused. + loader._batch = newBatch; + + // Then schedule a task to dispatch this batch of requests. + enqueuePostPromiseJob(() => dispatchBatch(loader, newBatch)); + + return newBatch; } -function dispatchQueueBatch( +function dispatchBatch( loader: DataLoader, - queue: LoaderQueue + batch: Batch ) { - // Collect all keys to be loaded in this dispatch - var keys = queue.map(({ key }) => key); + // Mark this batch as having been dispatched. + batch.hasDispatched = true; - // Call the provided batchLoadFn for this loader with the loader queue's keys. - var batchLoadFn = loader._batchLoadFn; - // Call with the loader as the `this` context. - var batchPromise = batchLoadFn.call(loader, keys); + // Call the provided batchLoadFn for this loader with the batch's keys and + // with the loader as the `this` context. + var batchPromise = loader._batchLoadFn(batch.keys); // Assert the expected response from batchLoadFn if (!batchPromise || typeof batchPromise.then !== 'function') { - return failedDispatch(loader, queue, new TypeError( + return failedDispatch(loader, batch, new TypeError( 'DataLoader must be constructed with a function which accepts ' + 'Array and returns Promise>, but the function did ' + `not return a Promise: ${String(batchPromise)}.` @@ -288,41 +294,40 @@ function dispatchQueueBatch( `not return a Promise of an Array: ${String(values)}.` ); } - if (values.length !== keys.length) { + if (values.length !== batch.keys.length) { throw new TypeError( 'DataLoader must be constructed with a function which accepts ' + 'Array and returns Promise>, but the function did ' + 'not return a Promise of an Array of the same length as the Array ' + 'of keys.' + - `\n\nKeys:\n${String(keys)}` + + `\n\nKeys:\n${String(batch.keys)}` + `\n\nValues:\n${String(values)}` ); } - // Step through the values, resolving or rejecting each Promise in the - // loaded queue. - queue.forEach(({ resolve, reject }, index) => { - var value = values[index]; + // Step through values, resolving or rejecting each Promise in the batch. + for (var i = 0; i < batch.callbacks.length; i++) { + var value = values[i]; if (value instanceof Error) { - reject(value); + batch.callbacks[i].reject(value); } else { - resolve(value); + batch.callbacks[i].resolve(value); } - }); - }).catch(error => failedDispatch(loader, queue, error)); + } + }).catch(error => failedDispatch(loader, batch, error)); } // Private: do not cache individual loads if the entire batch dispatch fails, // but still reject each request so they do not hang. function failedDispatch( loader: DataLoader, - queue: LoaderQueue, + batch: Batch, error: Error ) { - queue.forEach(({ key, reject }) => { - loader.clear(key); - reject(error); - }); + for (var i = 0; i < batch.keys.length; i++) { + loader.clear(batch.keys[i]); + batch.callbacks[i].reject(error); + } } // Private: produce a cache key for a given key (and options) @@ -357,13 +362,6 @@ function getValidCacheMap( return cacheMap; } -// Private -type LoaderQueue = Array<{ - key: K; - resolve: (value: V) => void; - reject: (error: Error) => void; -}>; - // Private function isArrayLike(x: mixed): boolean { return (