Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add large blob storage to Cache #7198

Merged
merged 24 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f99e407
Add large blob methods to cache
lettertwo Oct 26, 2021
f6c22d7
Cache request graph as large blob
lettertwo Oct 26, 2021
8f0a103
Fix LMDBCache serialization
lettertwo Oct 27, 2021
aec63a8
Prepare LMDBCache fs for serialization
lettertwo Oct 27, 2021
2189909
Merge branch 'v2' into lettertwo/cache-large-blob
lettertwo Oct 29, 2021
6681eef
Cache graph request results as large blobs
lettertwo Nov 2, 2021
9fa5e09
Merge branch 'v2' into lettertwo/cache-large-blob
lettertwo Nov 2, 2021
6c88b5d
Revert configurable LMDBCache FS
lettertwo Nov 2, 2021
74b4dd4
Fall back to FS for large blobs in LMDBCache
lettertwo Nov 3, 2021
28c5b66
Merge branch 'v2' into lettertwo/cache-large-blob
lettertwo Nov 3, 2021
5734312
Remove stale value when auto storing a large blob
lettertwo Nov 3, 2021
da0a359
Stream large blobs out of fs cache
lettertwo Nov 3, 2021
847f8be
Merge branch 'v2' into lettertwo/cache-large-blob
lettertwo Nov 15, 2021
e13e8d0
Merge branch 'v2' into lettertwo/cache-large-blob
devongovett Nov 16, 2021
a53b39b
Revert fall back to FS for large blobs in LMDBCache
lettertwo Nov 16, 2021
ce8fe8e
Prevent large blobs from being stored in cache
lettertwo Nov 16, 2021
6d9f276
Set/get streams to/from FS in LMDBCache
lettertwo Nov 16, 2021
63f6f2c
Only stream large assets from cache
lettertwo Nov 17, 2021
50dfd3b
Merge branch 'v2' into lettertwo/cache-large-blob
lettertwo Nov 18, 2021
0776a9e
Remove default empty stream from `getStream`
lettertwo Nov 18, 2021
d9fc18f
Differentiate large blobs in FSCache
lettertwo Nov 18, 2021
ed1b602
Fix type error
lettertwo Nov 18, 2021
88b1e22
Merge branch 'v2' into lettertwo/cache-large-blob
lettertwo Nov 22, 2021
3f3c831
Remove vestigial isLargeBlob check
lettertwo Nov 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions packages/core/cache/src/FSCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ export class FSCache implements Cache {
}

getStream(key: string): Readable {
return this.fs.createReadStream(this._getCachePath(key));
return this.fs.createReadStream(this._getCachePath(`${key}-large`));
}

setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
stream
.pipe(this.fs.createWriteStream(this._getCachePath(key)))
.pipe(this.fs.createWriteStream(this._getCachePath(`${key}-large`)))
.on('error', reject)
.on('finish', resolve);
});
Expand Down Expand Up @@ -77,6 +77,18 @@ export class FSCache implements Cache {
}
}

hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(this._getCachePath(`${key}-large`));
}

getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(this._getCachePath(`${key}-large`));
}

async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(this._getCachePath(`${key}-large`), contents);
}

async get<T>(key: string): Promise<?T> {
try {
let data = await this.fs.readFile(this._getCachePath(key));
Expand Down
33 changes: 26 additions & 7 deletions packages/core/cache/src/LMDBCache.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
// @flow strict-local
import type {Readable} from 'stream';
import type {FilePath} from '@parcel/types';
import type {Cache} from './types';

import {Readable} from 'stream';
import path from 'path';
import {serialize, deserialize, registerSerializableClass} from '@parcel/core';
import {blobToStream, bufferStream} from '@parcel/utils';
import {NodeFS} from '@parcel/fs';
// flowlint-next-line untyped-import:off
import packageJson from '../package.json';
// $FlowFixMe
import lmdb from 'lmdb-store';

export class LMDBCache implements Cache {
fs: NodeFS;
dir: FilePath;
// $FlowFixMe
store: any;

constructor(cacheDir: FilePath) {
this.fs = new NodeFS();
this.dir = cacheDir;

this.store = lmdb.open(cacheDir, {
Expand Down Expand Up @@ -53,16 +56,20 @@ export class LMDBCache implements Cache {
}

async set(key: string, value: mixed): Promise<void> {
await this.store.put(key, serialize(value));
await this.setBlob(key, serialize(value));
}

getStream(key: string): Readable {
return blobToStream(this.store.get(key));
return this.fs.createReadStream(path.join(this.dir, key));
}

async setStream(key: string, stream: Readable): Promise<void> {
let buf = await bufferStream(stream);
await this.store.put(key, buf);
setStream(key: string, stream: Readable): Promise<void> {
return new Promise((resolve, reject) => {
stream
.pipe(this.fs.createWriteStream(path.join(this.dir, key)))
.on('error', reject)
.on('finish', resolve);
});
}

getBlob(key: string): Promise<Buffer> {
Expand All @@ -79,6 +86,18 @@ export class LMDBCache implements Cache {
getBuffer(key: string): Promise<?Buffer> {
return Promise.resolve(this.store.get(key));
}

hasLargeBlob(key: string): Promise<boolean> {
return this.fs.exists(path.join(this.dir, key));
}

getLargeBlob(key: string): Promise<Buffer> {
return this.fs.readFile(path.join(this.dir, key));
}

async setLargeBlob(key: string, contents: Buffer | string): Promise<void> {
await this.fs.writeFile(path.join(this.dir, key), contents);
}
}

registerSerializableClass(`${packageJson.version}:LMDBCache`, LMDBCache);
3 changes: 3 additions & 0 deletions packages/core/cache/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ export interface Cache {
setStream(key: string, stream: Readable): Promise<void>;
getBlob(key: string): Promise<Buffer>;
setBlob(key: string, contents: Buffer | string): Promise<void>;
hasLargeBlob(key: string): Promise<boolean>;
getLargeBlob(key: string): Promise<Buffer>;
setLargeBlob(key: string, contents: Buffer | string): Promise<void>;
getBuffer(key: string): Promise<?Buffer>;
}
6 changes: 5 additions & 1 deletion packages/core/core/src/CommittedAsset.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ export default class CommittedAsset {
getContent(): Blob | Promise<Buffer | string> {
if (this.content == null) {
if (this.value.contentKey != null) {
return this.options.cache.getStream(this.value.contentKey);
if (this.value.isLargeBlob) {
return this.options.cache.getStream(this.value.contentKey);
} else {
return this.options.cache.getBlob(this.value.contentKey);
}
} else if (this.value.astKey != null) {
return streamFromPromise(
generateFromAST(this).then(({content}) => {
Expand Down
16 changes: 13 additions & 3 deletions packages/core/core/src/PackagerRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export type BundleInfo = {|
+hashReferences: Array<string>,
+time?: number,
+cacheKeys: CacheKeyMap,
+isLargeBlob: boolean,
|};

type CacheKeyMap = {|
Expand Down Expand Up @@ -596,16 +597,22 @@ export default class PackagerRunner {
let contentKey = PackagerRunner.getContentKey(cacheKey);
let mapKey = PackagerRunner.getMapKey(cacheKey);

let contentExists = await this.options.cache.has(contentKey);
let isLargeBlob = await this.options.cache.hasLargeBlob(contentKey);
let contentExists =
isLargeBlob || (await this.options.cache.has(contentKey));
if (!contentExists) {
return null;
}

let mapExists = await this.options.cache.has(mapKey);

return {
contents: this.options.cache.getStream(contentKey),
map: mapExists ? this.options.cache.getStream(mapKey) : null,
contents: isLargeBlob
? this.options.cache.getStream(contentKey)
: blobToStream(await this.options.cache.getBlob(contentKey)),
map: mapExists
? blobToStream(await this.options.cache.getBlob(mapKey))
: null,
};
}

Expand All @@ -618,9 +625,11 @@ export default class PackagerRunner {
let size = 0;
let hash;
let hashReferences = [];
let isLargeBlob = false;

// TODO: don't replace hash references in binary files??
if (contents instanceof Readable) {
isLargeBlob = true;
let boundaryStr = '';
let h = new Hash();
await this.options.cache.setStream(
Expand Down Expand Up @@ -659,6 +668,7 @@ export default class PackagerRunner {
hash,
hashReferences,
cacheKeys,
isLargeBlob,
};
await this.options.cache.set(cacheKeys.info, info);
return info;
Expand Down
26 changes: 17 additions & 9 deletions packages/core/core/src/RequestTracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from '@parcel/utils';
import {hashString} from '@parcel/hash';
import {ContentGraph} from '@parcel/graph';
import {deserialize, serialize} from './serializer';
import {assertSignalNotAborted, hashFromOption} from './utils';
import {
type ProjectPath,
Expand Down Expand Up @@ -856,10 +857,11 @@ export default class RequestTracker {
let result: T = (node.value.result: any);
return result;
} else if (node.value.resultCacheKey != null && ifMatch == null) {
let cachedResult: T = (nullthrows(
await this.options.cache.get(node.value.resultCacheKey),
// $FlowFixMe
): any);
let key = node.value.resultCacheKey;
invariant(this.options.cache.hasLargeBlob(key));
let cachedResult: T = deserialize(
await this.options.cache.getLargeBlob(key),
);
node.value.result = cachedResult;
return cachedResult;
}
Expand Down Expand Up @@ -1050,13 +1052,18 @@ export default class RequestTracker {
let resultCacheKey = node.value.resultCacheKey;
if (resultCacheKey != null && node.value.result != null) {
promises.push(
this.options.cache.set(resultCacheKey, node.value.result),
this.options.cache.setLargeBlob(
resultCacheKey,
serialize(node.value.result),
),
);
delete node.value.result;
}
}

promises.push(this.options.cache.set(requestGraphKey, this.graph));
promises.push(
this.options.cache.setLargeBlob(requestGraphKey, serialize(this.graph)),
);

let opts = getWatcherOptions(this.options);
let snapshotPath = path.join(this.options.cacheDir, snapshotKey + '.txt');
Expand Down Expand Up @@ -1100,9 +1107,10 @@ async function loadRequestGraph(options): Async<RequestGraph> {

let cacheKey = getCacheKey(options);
let requestGraphKey = hashString(`${cacheKey}:requestGraph`);
let requestGraph = await options.cache.get<RequestGraph>(requestGraphKey);

if (requestGraph) {
if (await options.cache.hasLargeBlob(requestGraphKey)) {
let requestGraph: RequestGraph = deserialize(
await options.cache.getLargeBlob(requestGraphKey),
);
let opts = getWatcherOptions(options);
let snapshotKey = hashString(`${cacheKey}:snapshot`);
let snapshotPath = path.join(options.cacheDir, snapshotKey + '.txt');
Expand Down
4 changes: 3 additions & 1 deletion packages/core/core/src/Transformation.js
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@ export default class Transformation {
cachedAssets.map(async (value: AssetValue) => {
let content =
value.contentKey != null
? this.options.cache.getStream(value.contentKey)
? value.isLargeBlob
? this.options.cache.getStream(value.contentKey)
: await this.options.cache.getBlob(value.contentKey)
: null;
let mapBuffer =
value.astKey != null
Expand Down
1 change: 1 addition & 0 deletions packages/core/core/src/UncommittedAsset.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export default class UncommittedAsset {
this.value.stats.size = size;
}

this.value.isLargeBlob = this.content instanceof Readable;
this.value.committed = true;
}

Expand Down
13 changes: 10 additions & 3 deletions packages/core/core/src/requests/WriteBundleRequest.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {HASH_REF_PREFIX, HASH_REF_REGEX} from '../constants';
import nullthrows from 'nullthrows';
import path from 'path';
import {NamedBundle} from '../public/Bundle';
import {TapStream} from '@parcel/utils';
import {blobToStream, TapStream} from '@parcel/utils';
import {Readable, Transform, pipeline} from 'stream';
import {
fromProjectPath,
Expand Down Expand Up @@ -123,7 +123,14 @@ async function run({input, options, api}: RunInput) {
: {
mode: (await inputFS.stat(mainEntry.filePath)).mode,
};
let contentStream = options.cache.getStream(cacheKeys.content);
let contentStream: Readable;
if (info.isLargeBlob) {
contentStream = options.cache.getStream(cacheKeys.content);
} else {
contentStream = blobToStream(
await options.cache.getBlob(cacheKeys.content),
);
}
let size = 0;
contentStream = contentStream.pipe(
new TapStream(buf => {
Expand Down Expand Up @@ -159,7 +166,7 @@ async function run({input, options, api}: RunInput) {
(await options.cache.has(mapKey))
) {
await writeFiles(
options.cache.getStream(mapKey),
blobToStream(await options.cache.getBlob(mapKey)),
info,
hashRefToNameHash,
options,
Expand Down
1 change: 1 addition & 0 deletions packages/core/core/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ export type Asset = {|
configPath?: ProjectPath,
plugin: ?PackageName,
configKeyPath?: string,
isLargeBlob?: boolean,
|};

export type InternalGlob = ProjectPath;
Expand Down