From 05604990d7531e5bd348abbc435b3dc5d49ad7fd Mon Sep 17 00:00:00 2001 From: Jake Lane Date: Fri, 16 Feb 2024 14:01:46 +1100 Subject: [PATCH] Break up request graph cache serialisation and run after build completion (#9384) * Break up request graph cache serialisation and run after build completion * Fix test to abort before writing to cache * Write cache to disk when build fails * Move cache aborting to build queue * Fix cache node shallow copy * Resolve windows cache path issue * Simplify cache chunking by moving implementation to FSCache * Use promise queue to manage cache writes * Resolve unhandled errors with promise queue * Only serialise and write cache chunks to disk when changed * Add getRequestGraphNodeKey function and refactor for loop * Move to set to track cached requests to disk * Bring back catch on queue add * Update unit test for RequestTracker * Update progress for cache write and handle node invalidation * Update unit test to use new graph set * Invalidate written cache on disk on request completion --- flow-libs/cli-progress.js.flow | 377 ------------------ packages/core/cache/src/FSCache.js | 57 ++- packages/core/cache/src/LMDBCache.js | 48 +-- packages/core/cache/src/types.js | 7 +- packages/core/core/src/Parcel.js | 29 +- packages/core/core/src/RequestTracker.js | 225 ++++++++--- .../core/core/test/RequestTracker.test.js | 30 ++ packages/core/utils/src/PromiseQueue.js | 13 + packages/core/utils/test/PromiseQueue.test.js | 28 ++ .../dev/repl/SimplePackageInstaller/index.js | 2 +- .../examples/react-refresh/src/index.html | 2 +- packages/reporters/cli/package.json | 1 - packages/reporters/cli/src/CLIReporter.js | 22 +- yarn.lock | 7 - 14 files changed, 336 insertions(+), 512 deletions(-) delete mode 100644 flow-libs/cli-progress.js.flow diff --git a/flow-libs/cli-progress.js.flow b/flow-libs/cli-progress.js.flow deleted file mode 100644 index 8ebc6c7f9ea..00000000000 --- a/flow-libs/cli-progress.js.flow +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Flowtype definitions for index - * Generated by Flowgen from a Typescript Definition - * Flowgen v1.21.0 - * @flow - */ -declare module 'cli-progress' { - declare export interface Params { - progress: number; - eta: number; - startTime: number; - stopTime: number | null; - total: number; - value: number; - maxWidth: number; - } - declare export interface Options { - /** - * progress bar output format. - * The progressbar can be customized by using the following build-in placeholders. They can be combined in any order. - * {bar} - the progress bar, customizable by the options barsize, barCompleteString and barIncompleteString - * {percentage} - the current progress in percent (0-100) - * {total} - the end value - * {value} - the current value set by last update() call - * {eta} - expected time of accomplishment in seconds - * {duration} - elapsed time in seconds - * {eta_formatted} - expected time of accomplishment formatted into appropriate units - * {duration_formatted} - elapsed time formatted into appropriate units - * - * Example: - * progress [{bar}] {percentage}% | ETA: {eta}s | {value}/{total} - * is rendered as - * progress [========================================] 100% | ETA: 0s | 200/200 - */ - format?: string | GenericFormatter | void; - - /** - * a custom bar formatter function which renders the bar-element (default: format-bar.js) - */ - formatBar?: BarFormatter | void; - - /** - * a custom timer formatter function which renders the formatted time elements like eta_formatted and duration-formatted (default: format-time.js) - */ - formatTime?: TimeFormatter | void; - - /** - * a custom value formatter function which renders all other values (default: format-value.js) - */ - formatValue?: ValueFormatter | void; - - /** - * the maximum update rate (default: 10) - */ - fps?: number | void; - - /** - * output stream to use (default: process.stderr) - */ - stream?: WritableStream | void; - - /** - * automatically call stop() when the value reaches the total (default: false) - */ - stopOnComplete?: boolean | void; - - /** - * clear the progress bar on complete / stop() call (default: false) - */ - clearOnComplete?: boolean | void; - - /** - * the length of the progress bar in chars (default: 40) - */ - barsize?: number | void; - - /** - * position of the progress bar - 'left' (default), 'right' or 'center - */ - align?: 'left' | 'right' | 'center' | void; - - /** - * character to use as "complete" indicator in the bar (default: "=") - */ - barCompleteString?: string | void; - - /** - * character to use as "incomplete" indicator in the bar (default: "-") - */ - barIncompleteString?: string | void; - - /** - * character to use as "complete" indicator in the bar (default: "=") - */ - barCompleteChar?: string | void; - - /** - * character to use as "incomplete" indicator in the bar (default: "-") - */ - barIncompleteChar?: string | void; - - /** - * hide the cursor during progress operation; restored on complete (default: false) - * - pass `null` to keep terminal settings - */ - hideCursor?: boolean | null | void; - - /** - * glue sequence (control chars) between bar elements (default: '') - */ - barGlue?: string | void; - - /** - * number of updates with which to calculate the eta; higher numbers give a more stable eta (default: 10) - */ - etaBuffer?: number | void; - - /** - * trigger an eta calculation update during asynchronous rendering trigger using the current value - * - should only be used for long running processes in conjunction with lof `fps` values and large `etaBuffer` - * @default false - */ - etaAsynchronousUpdate?: boolean | void; - - /** - * progress calculation relative to start value ? default start at 0 (default: false) - */ - progressCalculationRelative?: boolean | void; - - /** - * disable line wrapping (default: false) - pass null to keep terminal settings; pass true to trim the output to terminal width - */ - linewrap?: boolean | null | void; - - /** - * trigger redraw during update() in case threshold time x2 is exceeded (default: true) - limited to single bar usage - */ - synchronousUpdate?: boolean | void; - - /** - * enable scheduled output to notty streams - e.g. redirect to files (default: false) - */ - noTTYOutput?: boolean | void; - - /** - * set the output schedule/interval for notty output in ms (default: 2000ms) - */ - notTTYSchedule?: number | void; - - /** - * display progress bars with 'total' of zero(0) as empty, not full (default: false) - */ - emptyOnZero?: boolean | void; - - /** - * trigger redraw on every frame even if progress remains the same; can be useful if progress bar gets overwritten by other concurrent writes to the terminal (default: false) - */ - forceRedraw?: boolean | void; - - /** - * add padding chars to formatted time and percentage to force fixed width (default: false) - */ - autopadding?: boolean | void; - - /** - * the character sequence used for autopadding (default: " ") - */ - autopaddingChar?: string | void; - - /** - * stop bar on SIGINT/SIGTERM to restore cursor settings (default: true) - */ - gracefulExit?: boolean | void; - } - declare export interface Preset { - barCompleteChar: string; - barIncompleteChar: string; - - /** - * Example: 'progress [{bar}] {percentage}% | ETA: {eta}s | {value}/{total}' - * - * {bar} - the progress bar, customizable by the options barsize, barCompleteString and barIncompleteString - * - * {percentage} - the current progress in percent (0-100) - * - * {total} - the end value - * - * {value} - the current value set by last update() call - * - * {eta} - expected time of accomplishment in seconds (limited to 115days, otherwise INF is displayed) - * - * {duration} - elapsed time in seconds - * - * {eta_formatted} - expected time of accomplishment formatted into appropriate units - * - * {duration_formatted} - elapsed time formatted into appropriate units - */ - format: string; - } - declare export class GenericBar mixins events$EventEmitter { - /** - * Initialize a new Progress bar. An instance can be used multiple times! it's not required to re-create it! - */ - constructor(opt: Options, preset?: Preset): this; - - /** - * Internal render function - */ - render(forceRendering?: boolean): void; - - /** - * Starts the progress bar and set the total and initial value - */ - start( - total: number, - startValue: number, - payload?: {[key: string]: any}, - ): void; - - /** - * Stops the progress bar and go to next line - */ - stop(): void; - - /** - * Sets the current progress value and optionally the payload with values of custom tokens as a second parameter - */ - update(current: number, payload?: {[key: string]: any}): void; - update(payload: {[key: string]: any}): void; - - /** - * Calculate the actual progress value - */ - getProgress(): number; - - /** - * Increases the current progress value by a specified amount (default +1). Update payload optionally - */ - increment(step?: number, payload?: {[key: string]: any}): void; - increment(payload: {[key: string]: any}): void; - - /** - * Get the total (limit) value - */ - getTotal(): number; - - /** - * Sets the total progress value while progressbar is active. Especially useful handling dynamic tasks. - */ - setTotal(total: number): void; - - /** - * Force eta calculation update (long running processes) without altering the progress values. - */ - updateETA(): void; - } - declare export class SingleBar mixins GenericBar { - /** - * Initialize a new Progress bar. An instance can be used multiple times! it's not required to re-create it! - */ - constructor(opt: Options, preset?: Preset): this; - - /** - * Internal render function - */ - render(): void; - - /** - * Sets the current progress value and optionally the payload with values of custom tokens as a second parameter - */ - update(current: number, payload?: {[key: string]: any}): void; - update(payload: {[key: string]: any}): void; - - /** - * Starts the progress bar and set the total and initial value - */ - start( - total: number, - startValue: number, - payload?: {[key: string]: any}, - ): void; - - /** - * Stops the progress bar and go to next line - */ - stop(): void; - } - declare export class MultiBar mixins events$EventEmitter { - constructor(opt: Options, preset?: Preset): this; - - /** - * add a new bar to the stack - */ - create( - total: number, - startValue: number, - payload?: any, - barOptions?: Options, - ): SingleBar; - - /** - * remove a bar from the stack - */ - remove(bar: SingleBar): boolean; - - /** - * internal update routine - */ - update(): void; - stop(): void; - - /** - * log output above the progress bars; string must end with newline character! - */ - log(data: string): void; - } - declare export var Presets: { - /** - * Styles as of cli-progress v1.3.0 - */ - legacy: Preset, - - /** - * Unicode Rectangles - */ - rect: Preset, - - /** - * Unicode background shades are used for the bar - */ - shades_classic: Preset, - - /** - * Unicode background shades with grey bar - */ - shades_grey: Preset, - ... - }; - declare export interface GenericFormatter { - (options: Options, params: Params, payload: any): string; - } - declare export interface TimeFormatter { - (t: number, options: Options, roundToMultipleOf: number): string; - } - declare export interface ValueFormatter { - (v: number, options: Options, type: ValueType): string; - } - declare export interface BarFormatter { - (progress: number, options: Options): string; - } - declare export type ValueType = - | 'percentage' - | 'total' - | 'value' - | 'eta' - | 'duration'; - declare var defaultFormatter: GenericFormatter; - declare var formatBar: BarFormatter; - declare var formatValue: ValueFormatter; - declare var formatTime: TimeFormatter; - declare export var Format: { - Formatter: typeof defaultFormatter, - BarFormat: typeof formatBar, - ValueFormat: typeof formatValue, - TimeFormat: typeof formatTime, - ... - }; - declare export class Bar mixins SingleBar {} - declare export interface cliProgress { - Bar: typeof Bar; - SingleBar: typeof SingleBar; - MultiBar: typeof MultiBar; - Presets: typeof Presets; - Format: typeof Format; - } - declare export default cliProgress; -} diff --git a/packages/core/cache/src/FSCache.js b/packages/core/cache/src/FSCache.js index e963a02ff60..dd807336dc9 100644 --- a/packages/core/cache/src/FSCache.js +++ b/packages/core/cache/src/FSCache.js @@ -5,6 +5,7 @@ import type {FilePath} from '@parcel/types'; import type {FileSystem} from '@parcel/fs'; import type {Cache} from './types'; +import type {AbortSignal} from 'abortcontroller-polyfill/dist/cjs-ponyfill'; import stream from 'stream'; import path from 'path'; import {promisify} from 'util'; @@ -12,7 +13,6 @@ import logger from '@parcel/logger'; import {serialize, deserialize, registerSerializableClass} from '@parcel/core'; // flowlint-next-line untyped-import:off import packageJson from '../package.json'; - import {WRITE_LIMIT_CHUNK} from './constants'; const pipeline: (Readable, Writable) => Promise = promisify( @@ -87,6 +87,15 @@ export class FSCache implements Cache { return path.join(this.dir, `${key}-${index}`); } + async #unlinkChunks(key: string, index: number): Promise { + try { + await this.fs.unlink(this.#getFilePath(key, index)); + await this.#unlinkChunks(key, index + 1); + } catch (err) { + // If there's an error, no more chunks are left to delete + } + } + hasLargeBlob(key: string): Promise { return this.fs.exists(this.#getFilePath(key, 0)); } @@ -102,30 +111,44 @@ export class FSCache implements Cache { return Buffer.concat(await Promise.all(buffers)); } - async setLargeBlob(key: string, contents: Buffer | string): Promise { + async setLargeBlob( + key: string, + contents: Buffer | string, + options?: {|signal?: AbortSignal|}, + ): Promise { const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK); + const writePromises: Promise[] = []; if (chunks === 1) { // If there's one chunk, don't slice the content - await this.fs.writeFile(this.#getFilePath(key, 0), contents); - return; - } - - const writePromises: Promise[] = []; - for (let i = 0; i < chunks; i += 1) { writePromises.push( - this.fs.writeFile( - this.#getFilePath(key, i), - typeof contents === 'string' - ? contents.slice(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK) - : contents.subarray( - i * WRITE_LIMIT_CHUNK, - (i + 1) * WRITE_LIMIT_CHUNK, - ), - ), + this.fs.writeFile(this.#getFilePath(key, 0), contents, { + signal: options?.signal, + }), ); + } else { + for (let i = 0; i < chunks; i += 1) { + writePromises.push( + this.fs.writeFile( + this.#getFilePath(key, i), + typeof contents === 'string' + ? contents.slice( + i * WRITE_LIMIT_CHUNK, + (i + 1) * WRITE_LIMIT_CHUNK, + ) + : contents.subarray( + i * WRITE_LIMIT_CHUNK, + (i + 1) * WRITE_LIMIT_CHUNK, + ), + {signal: options?.signal}, + ), + ); + } } + // If there's already a files following this chunk, it's old and should be removed + writePromises.push(this.#unlinkChunks(key, chunks)); + await Promise.all(writePromises); } diff --git a/packages/core/cache/src/LMDBCache.js b/packages/core/cache/src/LMDBCache.js index eab9dd24bb3..382fa29cb95 100644 --- a/packages/core/cache/src/LMDBCache.js +++ b/packages/core/cache/src/LMDBCache.js @@ -3,6 +3,7 @@ import type {FilePath} from '@parcel/types'; import type {Cache} from './types'; import type {Readable, Writable} from 'stream'; +import type {AbortSignal} from 'abortcontroller-polyfill/dist/cjs-ponyfill'; import stream from 'stream'; import path from 'path'; import {promisify} from 'util'; @@ -12,7 +13,8 @@ import {NodeFS} from '@parcel/fs'; import packageJson from '../package.json'; // $FlowFixMe import lmdb from 'lmdb'; -import {WRITE_LIMIT_CHUNK} from './constants'; + +import {FSCache} from './FSCache'; const pipeline: (Readable, Writable) => Promise = promisify( stream.pipeline, @@ -23,10 +25,12 @@ export class LMDBCache implements Cache { dir: FilePath; // $FlowFixMe store: any; + fsCache: FSCache; constructor(cacheDir: FilePath) { this.fs = new NodeFS(); this.dir = cacheDir; + this.fsCache = new FSCache(this.fs, cacheDir); this.store = lmdb.open(cacheDir, { name: 'parcel-cache', @@ -100,42 +104,18 @@ export class LMDBCache implements Cache { return this.fs.exists(this.#getFilePath(key, 0)); } + // eslint-disable-next-line require-await async getLargeBlob(key: string): Promise { - const buffers: Promise[] = []; - for (let i = 0; await this.fs.exists(this.#getFilePath(key, i)); i += 1) { - const file: Promise = this.fs.readFile(this.#getFilePath(key, i)); - - buffers.push(file); - } - - return Buffer.concat(await Promise.all(buffers)); + return this.fsCache.getLargeBlob(key); } - async setLargeBlob(key: string, contents: Buffer | string): Promise { - const chunks = Math.ceil(contents.length / WRITE_LIMIT_CHUNK); - - if (chunks === 1) { - // If there's one chunk, don't slice the content - await this.fs.writeFile(this.#getFilePath(key, 0), contents); - return; - } - - const writePromises: Promise[] = []; - for (let i = 0; i < chunks; i += 1) { - writePromises.push( - this.fs.writeFile( - this.#getFilePath(key, i), - typeof contents === 'string' - ? contents.slice(i * WRITE_LIMIT_CHUNK, (i + 1) * WRITE_LIMIT_CHUNK) - : contents.subarray( - i * WRITE_LIMIT_CHUNK, - (i + 1) * WRITE_LIMIT_CHUNK, - ), - ), - ); - } - - await Promise.all(writePromises); + // eslint-disable-next-line require-await + async setLargeBlob( + key: string, + contents: Buffer | string, + options?: {|signal?: AbortSignal|}, + ): Promise { + return this.fsCache.setLargeBlob(key, contents, options); } refresh(): void { diff --git a/packages/core/cache/src/types.js b/packages/core/cache/src/types.js index 163b3d57644..56a545b40ff 100644 --- a/packages/core/cache/src/types.js +++ b/packages/core/cache/src/types.js @@ -1,5 +1,6 @@ // @flow import type {Readable} from 'stream'; +import type {AbortSignal} from 'abortcontroller-polyfill/dist/cjs-ponyfill'; export interface Cache { ensure(): Promise; @@ -12,7 +13,11 @@ export interface Cache { setBlob(key: string, contents: Buffer | string): Promise; hasLargeBlob(key: string): Promise; getLargeBlob(key: string): Promise; - setLargeBlob(key: string, contents: Buffer | string): Promise; + setLargeBlob( + key: string, + contents: Buffer | string, + options?: {|signal?: AbortSignal|}, + ): Promise; getBuffer(key: string): Promise; /** * In a multi-threaded environment, where there are potentially multiple Cache diff --git a/packages/core/core/src/Parcel.js b/packages/core/core/src/Parcel.js index 560afb6df09..23815f6f846 100644 --- a/packages/core/core/src/Parcel.js +++ b/packages/core/core/src/Parcel.js @@ -163,6 +163,8 @@ export default class Parcel { } let result = await this._build({startTime}); + + await this.#requestTracker.writeToCache(); await this._end(); if (result.type === 'buildFailure') { @@ -175,10 +177,31 @@ export default class Parcel { async _end(): Promise { this.#initialized = false; - await this.#requestTracker.writeToCache(); await this.#disposable.dispose(); } + async writeRequestTrackerToCache(): Promise { + if (this.#watchQueue.getNumWaiting() === 0) { + // If there's no queued events, we are safe to write the request graph to disk + const abortController = new AbortController(); + + const unsubscribe = this.#watchQueue.subscribeToAdd(() => { + abortController.abort(); + }); + + try { + await this.#requestTracker.writeToCache(abortController.signal); + } catch (err) { + if (!abortController.signal.aborted) { + // We expect abort errors if we interrupt the cache write + throw err; + } + } + + unsubscribe(); + } + } + async _startNextBuild(): Promise { this.#watchAbortController = new AbortController(); await this.#farm.callAllWorkers('clearConfigCache', []); @@ -198,6 +221,9 @@ export default class Parcel { if (!(err instanceof BuildAbortError)) { throw err; } + } finally { + // If the build passes or fails, we want to cache the request graph + await this.writeRequestTrackerToCache(); } } @@ -372,7 +398,6 @@ export default class Parcel { }; await this.#reporterRunner.report(event); - return event; } finally { if (this.isProfiling) { diff --git a/packages/core/core/src/RequestTracker.js b/packages/core/core/src/RequestTracker.js index 50098400a37..ff405cd54bf 100644 --- a/packages/core/core/src/RequestTracker.js +++ b/packages/core/core/src/RequestTracker.js @@ -51,6 +51,7 @@ import { } from './constants'; import {report} from './ReporterRunner'; +import {PromiseQueue} from '@parcel/utils'; export const requestGraphEdgeTypes = { subrequest: 2, @@ -72,6 +73,7 @@ type RequestGraphOpts = {| optionNodeIds: Set, unpredicatableNodeIds: Set, invalidateOnBuildNodeIds: Set, + cachedRequestChunks: Set, |}; type SerializedRequestGraph = {| @@ -83,6 +85,7 @@ type SerializedRequestGraph = {| optionNodeIds: Set, unpredicatableNodeIds: Set, invalidateOnBuildNodeIds: Set, + cachedRequestChunks: Set, |}; const FILE: 0 = 0; @@ -242,6 +245,7 @@ export class RequestGraph extends ContentGraph< // filesystem changes alone. They should rerun on each startup of Parcel. unpredicatableNodeIds: Set = new Set(); invalidateOnBuildNodeIds: Set = new Set(); + cachedRequestChunks: Set = new Set(); // $FlowFixMe[prop-missing] static deserialize(opts: RequestGraphOpts): RequestGraph { @@ -254,6 +258,7 @@ export class RequestGraph extends ContentGraph< deserialized.optionNodeIds = opts.optionNodeIds; deserialized.unpredicatableNodeIds = opts.unpredicatableNodeIds; deserialized.invalidateOnBuildNodeIds = opts.invalidateOnBuildNodeIds; + deserialized.cachedRequestChunks = opts.cachedRequestChunks; return deserialized; } @@ -268,6 +273,7 @@ export class RequestGraph extends ContentGraph< optionNodeIds: this.optionNodeIds, unpredicatableNodeIds: this.unpredicatableNodeIds, invalidateOnBuildNodeIds: this.invalidateOnBuildNodeIds, + cachedRequestChunks: this.cachedRequestChunks, }; } @@ -345,6 +351,9 @@ export class RequestGraph extends ContentGraph< for (let parentNode of parentNodes) { this.invalidateNode(parentNode, reason); } + + // If the node is invalidated, the cached request chunk on disk needs to be re-written + this.removeCachedRequestChunkForNode(nodeId); } invalidateUnpredictableNodes() { @@ -844,8 +853,24 @@ export class RequestGraph extends ContentGraph< return didInvalidate && this.invalidNodeIds.size > 0; } + + hasCachedRequestChunk(index: number): boolean { + return this.cachedRequestChunks.has(index); + } + + setCachedRequestChunk(index: number): void { + this.cachedRequestChunks.add(index); + } + + removeCachedRequestChunkForNode(nodeId: number): void { + this.cachedRequestChunks.delete(Math.floor(nodeId / NODES_PER_BLOB)); + } } +// This constant is chosen by local profiling the time to serialise n nodes and tuning until an average time of ~50 ms per blob. +// The goal is to free up the event loop periodically to allow interruption by the user. +const NODES_PER_BLOB = 2 ** 14; + export default class RequestTracker { graph: RequestGraph; farm: WorkerFarm; @@ -946,6 +971,7 @@ export default class RequestTracker { if (node && node.type === REQUEST) { node.invalidateReason = VALID; } + this.graph.removeCachedRequestChunkForNode(nodeId); } rejectRequest(nodeId: NodeId) { @@ -1108,75 +1134,139 @@ export default class RequestTracker { return {api, subRequestContentKeys}; } - async writeToCache() { + async writeToCache(signal?: AbortSignal) { let cacheKey = getCacheKey(this.options); - let requestGraphKey = - hashString(`${cacheKey}:requestGraph`) + '-RequestGraph'; - let snapshotKey = hashString(`${cacheKey}:snapshot`); + let hashedCacheKey = hashString(cacheKey); + let requestGraphKey = `requestGraph-${hashedCacheKey}`; + let snapshotKey = `snapshot-${hashedCacheKey}`; if (this.options.shouldDisableCache) { return; } - let total = 2; + + let serialisedGraph = this.graph.serialize(); + + let total = 0; + const serialiseAndSet = async ( + key: string, + // $FlowFixMe serialise input is any type + contents: any, + ): Promise => { + if (signal?.aborted) { + throw new Error('Serialization was aborted'); + } + + await this.options.cache.setLargeBlob( + key, + serialize(contents), + signal + ? { + signal: signal, + } + : undefined, + ); + + total += 1; + + report({ + type: 'cache', + phase: 'write', + total, + size: this.graph.nodes.length, + }); + }; + + let queue = new PromiseQueue({ + maxConcurrent: 32, + }); + report({ type: 'cache', phase: 'start', total, size: this.graph.nodes.length, }); - let promises = []; - for (let node of this.graph.nodes) { - if (!node || node.type !== REQUEST) { - continue; + + // Preallocating a sparse array is faster than pushing when N is high enough + let cacheableNodes = new Array(serialisedGraph.nodes.length); + for (let i = 0; i < serialisedGraph.nodes.length; i += 1) { + let node = serialisedGraph.nodes[i]; + + let resultCacheKey = node?.resultCacheKey; + if ( + node?.type === REQUEST && + resultCacheKey != null && + node?.result != null + ) { + queue + .add(() => serialiseAndSet(resultCacheKey, node.result)) + .catch(() => { + // Handle promise rejection + }); + + // eslint-disable-next-line no-unused-vars + let {result: _, ...newNode} = node; + cacheableNodes[i] = newNode; + } else { + cacheableNodes[i] = node; } + } - let resultCacheKey = node.resultCacheKey; - if (resultCacheKey != null && node.result != null) { - promises.push( - this.options.cache.setLargeBlob( - resultCacheKey, - serialize(node.result), - ), - ); - total++; - report({ - type: 'cache', - phase: 'write', - total, - size: this.graph.nodes.length, - }); - delete node.result; + for (let i = 0; i * NODES_PER_BLOB < cacheableNodes.length; i += 1) { + if (!this.graph.hasCachedRequestChunk(i)) { + // We assume the request graph nodes are immutable and won't change + queue + .add(() => + serialiseAndSet( + getRequestGraphNodeKey(i, hashedCacheKey), + cacheableNodes.slice( + i * NODES_PER_BLOB, + (i + 1) * NODES_PER_BLOB, + ), + ).then(() => { + // Succeeded in writing to disk, save that we have completed this chunk + this.graph.setCachedRequestChunk(i); + }), + ) + .catch(() => { + // Handle promise rejection + }); } } - promises.push( - this.options.cache.setLargeBlob(requestGraphKey, serialize(this.graph)), - ); - report({ - type: 'cache', - phase: 'write', - total, - size: this.graph.nodes.length, - }); + queue + .add(() => + serialiseAndSet(requestGraphKey, { + ...serialisedGraph, + nodes: undefined, + }), + ) + .catch(() => { + // Handle promise rejection + }); let opts = getWatcherOptions(this.options); let snapshotPath = path.join(this.options.cacheDir, snapshotKey + '.txt'); - promises.push( - this.options.inputFS.writeSnapshot( - this.options.projectRoot, - snapshotPath, - opts, - ), - ); - report({ - type: 'cache', - phase: 'write', - total, - size: this.graph.nodes.length, - }); - report({type: 'cache', phase: 'end', total, size: this.graph.nodes.length}); + queue + .add(() => + this.options.inputFS.writeSnapshot( + this.options.projectRoot, + snapshotPath, + opts, + ), + ) + .catch(() => { + // Handle promise rejection + }); + + try { + await queue.run(); + } catch (err) { + // If we have aborted, ignore the error and continue + if (!signal?.aborted) throw err; + } - await Promise.all(promises); + report({type: 'cache', phase: 'end', total, size: this.graph.nodes.length}); } static async init({ @@ -1203,26 +1293,51 @@ function getCacheKey(options) { }:${options.shouldBuildLazily ? 'lazy' : 'eager'}`; } +function getRequestGraphNodeKey(index: number, hashedCacheKey: string) { + return `requestGraph-nodes-${index}-${hashedCacheKey}`; +} + async function loadRequestGraph(options): Async { if (options.shouldDisableCache) { return new RequestGraph(); } let cacheKey = getCacheKey(options); - let requestGraphKey = - hashString(`${cacheKey}:requestGraph`) + '-RequestGraph'; + let hashedCacheKey = hashString(cacheKey); + let requestGraphKey = `requestGraph-${hashedCacheKey}`; if (await options.cache.hasLargeBlob(requestGraphKey)) { - let requestGraph: RequestGraph = deserialize( - await options.cache.getLargeBlob(requestGraphKey), - ); + const getAndDeserialize = async (key: string) => { + return deserialize(await options.cache.getLargeBlob(key)); + }; + + let i = 0; + let nodePromises = []; + while ( + await options.cache.hasLargeBlob( + getRequestGraphNodeKey(i, hashedCacheKey), + ) + ) { + nodePromises.push( + getAndDeserialize(getRequestGraphNodeKey(i, hashedCacheKey)), + ); + i += 1; + } + + let serializedRequestGraph = await getAndDeserialize(requestGraphKey); + let requestGraph = RequestGraph.deserialize({ + ...serializedRequestGraph, + nodes: (await Promise.all(nodePromises)).flatMap(nodeChunk => nodeChunk), + }); + let opts = getWatcherOptions(options); - let snapshotKey = hashString(`${cacheKey}:snapshot`); + let snapshotKey = `snapshot-${hashedCacheKey}`; let snapshotPath = path.join(options.cacheDir, snapshotKey + '.txt'); let events = await options.inputFS.getEventsSince( options.watchDir, snapshotPath, opts, ); + requestGraph.invalidateUnpredictableNodes(); requestGraph.invalidateOnBuildNodes(); requestGraph.invalidateEnvNodes(options.env); diff --git a/packages/core/core/test/RequestTracker.test.js b/packages/core/core/test/RequestTracker.test.js index b4f5c0cf68f..c729768b934 100644 --- a/packages/core/core/test/RequestTracker.test.js +++ b/packages/core/core/test/RequestTracker.test.js @@ -1,6 +1,7 @@ // @flow strict-local import assert from 'assert'; +import {AbortController} from 'abortcontroller-polyfill/dist/cjs-ponyfill'; import nullthrows from 'nullthrows'; import RequestTracker, {type RunAPI} from '../src/RequestTracker'; import WorkerFarm from '@parcel/workers'; @@ -184,6 +185,35 @@ describe('RequestTracker', () => { ); }); + it('should write cache to disk and store index', async () => { + let tracker = new RequestTracker({farm, options}); + + await tracker.runRequest({ + id: 'abc', + type: 7, + run: async ({api}: {api: RunAPI, ...}) => { + let result = await Promise.resolve(); + api.storeResult(result); + }, + input: null, + }); + + await tracker.writeToCache(); + + assert(tracker.graph.cachedRequestChunks.size > 0); + }); + + it('should not write to cache when the abort controller aborts', async () => { + let tracker = new RequestTracker({farm, options}); + + const abortController = new AbortController(); + abortController.abort(); + + await tracker.writeToCache(abortController.signal); + + assert(tracker.graph.cachedRequestChunks.size === 0); + }); + it('should not requeue requests if the previous request is still running', async () => { let tracker = new RequestTracker({farm, options}); diff --git a/packages/core/utils/src/PromiseQueue.js b/packages/core/utils/src/PromiseQueue.js index bbea044fefe..bc711ec2b97 100644 --- a/packages/core/utils/src/PromiseQueue.js +++ b/packages/core/utils/src/PromiseQueue.js @@ -13,6 +13,7 @@ export default class PromiseQueue { _error: mixed; _count: number = 0; _results: Array = []; + _addSubscriptions: Set<() => void> = new Set(); constructor(opts: PromiseQueueOpts = {maxConcurrent: Infinity}) { if (opts.maxConcurrent <= 0) { @@ -43,12 +44,24 @@ export default class PromiseQueue { this._queue.push(wrapped); + for (const addFn of this._addSubscriptions) { + addFn(); + } + if (this._numRunning > 0 && this._numRunning < this._maxConcurrent) { this._next(); } }); } + subscribeToAdd(fn: () => void): () => void { + this._addSubscriptions.add(fn); + + return () => { + this._addSubscriptions.delete(fn); + }; + } + run(): Promise> { if (this._runPromise != null) { return this._runPromise; diff --git a/packages/core/utils/test/PromiseQueue.test.js b/packages/core/utils/test/PromiseQueue.test.js index b98b0e284cb..f2a93d4716e 100644 --- a/packages/core/utils/test/PromiseQueue.test.js +++ b/packages/core/utils/test/PromiseQueue.test.js @@ -3,6 +3,7 @@ import assert from 'assert'; import randomInt from 'random-int'; import PromiseQueue from '../src/PromiseQueue'; +import sinon from 'sinon'; describe('PromiseQueue', () => { it('run() should resolve when all async functions in queue have completed', async () => { @@ -72,4 +73,31 @@ describe('PromiseQueue', () => { await queue.run(); }); + + it('.add() should notify subscribers', async () => { + const queue = new PromiseQueue(); + + const subscribedFn = sinon.spy(); + queue.subscribeToAdd(subscribedFn); + + const promise = queue.add(() => Promise.resolve()); + await queue.run(); + await promise; + + assert(subscribedFn.called); + }); + + it('.subscribeToAdd() should allow unsubscribing', async () => { + const queue = new PromiseQueue(); + + const subscribedFn = sinon.spy(); + const unsubscribe = queue.subscribeToAdd(subscribedFn); + unsubscribe(); + + const promise = queue.add(() => Promise.resolve()); + await queue.run(); + await promise; + + assert(!subscribedFn.called); + }); }); diff --git a/packages/dev/repl/SimplePackageInstaller/index.js b/packages/dev/repl/SimplePackageInstaller/index.js index f5c45ba95dd..059e8aba89c 100644 --- a/packages/dev/repl/SimplePackageInstaller/index.js +++ b/packages/dev/repl/SimplePackageInstaller/index.js @@ -116,7 +116,7 @@ export default class SimplePackageInstaller implements PackageInstaller { if (!res.arrayBuffer) { // node var bufs = []; - res.body.on('data', function(d) { + res.body.on('data', function (d) { bufs.push(d); }); diff --git a/packages/examples/react-refresh/src/index.html b/packages/examples/react-refresh/src/index.html index a7051f5da3b..0545ac42c9f 100644 --- a/packages/examples/react-refresh/src/index.html +++ b/packages/examples/react-refresh/src/index.html @@ -1,2 +1,2 @@
- + diff --git a/packages/reporters/cli/package.json b/packages/reporters/cli/package.json index 9a62d1537ea..697f8dafc7d 100644 --- a/packages/reporters/cli/package.json +++ b/packages/reporters/cli/package.json @@ -35,7 +35,6 @@ "@parcel/types": "2.11.0", "@parcel/utils": "2.11.0", "chalk": "^4.1.0", - "cli-progress": "^3.12.0", "term-size": "^2.2.1" }, "devDependencies": { diff --git a/packages/reporters/cli/src/CLIReporter.js b/packages/reporters/cli/src/CLIReporter.js index 8ec6d2835e4..627eb73cfd5 100644 --- a/packages/reporters/cli/src/CLIReporter.js +++ b/packages/reporters/cli/src/CLIReporter.js @@ -26,7 +26,6 @@ import { } from './render'; import * as emoji from './emoji'; import wrapAnsi from 'wrap-ansi'; -import cliProgress from 'cli-progress'; const THROTTLE_DELAY = 100; const seenWarnings = new Set(); @@ -39,7 +38,6 @@ let pendingIncrementalBuild = false; let statusThrottle = throttle((message: string) => { updateSpinner(message); }, THROTTLE_DELAY); -let bar; // Exported only for test export async function _report( @@ -159,22 +157,14 @@ export async function _report( if (event.size > 500000) { switch (event.phase) { case 'start': - if (!bar) { - bar = new cliProgress.SingleBar( - {}, - cliProgress.Presets.shades_classic, - ); - } - writeOut('Writing to cache...'); - bar.start(event.total, 0); - break; - case 'write': - bar.setTotal(event.total); - bar.increment(); + updateSpinner('Writing cache to disk'); break; case 'end': - bar.stop(); - writeOut('Done.'); + persistSpinner( + 'cache', + 'success', + chalk.grey.bold(`Cache written to disk`), + ); break; } } diff --git a/yarn.lock b/yarn.lock index 176aea19495..1a71f5333b9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4641,13 +4641,6 @@ cli-cursor@^3.1.0: dependencies: restore-cursor "^3.1.0" -cli-progress@^3.12.0: - version "3.12.0" - resolved "https://registry.yarnpkg.com/cli-progress/-/cli-progress-3.12.0.tgz#807ee14b66bcc086258e444ad0f19e7d42577942" - integrity sha512-tRkV3HJ1ASwm19THiiLIXLO7Im7wlTuKnvkYaTkyoAPefqjNg7W7DHKUlGRxy9vxDvbyCYQkQozvptuMkGCg8A== - dependencies: - string-width "^4.2.3" - cli-spinners@^2.5.0: version "2.5.0" resolved "https://registry.yarnpkg.com/cli-spinners/-/cli-spinners-2.5.0.tgz#12763e47251bf951cb75c201dfa58ff1bcb2d047"