Skip to content

Commit

Permalink
feat(rosetta): handle tablets with a streaming JSON codec (#4034)
Browse files Browse the repository at this point in the history
Tablet files can grow pretty large, and cache files even larger. In order to avoir running into the maximum string length node supports when encoding or decoding those, use a streaming JSON encoder / decoder.

Backports aws/jsii-rosetta#43



---

By submitting this pull request, I confirm that my contribution is made under the terms of the [Apache 2.0 license].

[Apache 2.0 license]: https://www.apache.org/licenses/LICENSE-2.0
  • Loading branch information
RomainMuller authored Mar 30, 2023
1 parent b459b1d commit d2ecb6d
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 12 deletions.
47 changes: 47 additions & 0 deletions packages/jsii-rosetta/lib/json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Readable, Writable, pipeline } from 'node:stream';
import { promisify } from 'node:util';
import { parser } from 'stream-json';
import * as Assembler from 'stream-json/Assembler';
import { disassembler } from 'stream-json/Disassembler';
import { stringer } from 'stream-json/Stringer';

// NB: In node 15+, there is a node:stream.promises object that has this built-in.
const asyncPipeline = promisify(pipeline);

/**
* Asynchronously parses a single JSON value from the provided reader. The JSON
* text might be longer than what could fit in a single string value, since the
* processing is done in a streaming manner.
*
* Prefer using JSON.parse if you know the entire JSON text is always small
* enough to fit in a string value, as this would have better performance.
*
* @param reader the reader from which to consume JSON text.
*
* @returns the parse JSON value as a Javascript value.
*/
export async function parse(reader: Readable): Promise<any> {
const assembler = new Assembler();
const jsonParser = parser();
assembler.connectTo(jsonParser);
return asyncPipeline(reader, jsonParser).then(() => assembler.current);
}

/**
* Serializes a possibly large object into the provided writer. The object may
* be large enough that the JSON text cannot fit in a single string value.
*
* Prefer using JSON.stringify if you know the object is always small enough
* that the JSON text can fit in a single string value, as this would have
* better performance.
*
* @param value the value to be serialized.
* @param writer the write in which to write the JSON text.
*/
export async function stringify(value: any, writer: Writable): Promise<void> {
const reader = new Readable({ objectMode: true });
reader.push(value);
reader.push(null);

return asyncPipeline(reader, disassembler(), stringer(), writer);
}
38 changes: 26 additions & 12 deletions packages/jsii-rosetta/lib/tablets/tablets.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { existsSync, promises as fs } from 'fs';
import { createReadStream, createWriteStream, existsSync, promises as fs } from 'fs';
import * as path from 'path';
import { Readable, Writable } from 'stream';
import * as zlib from 'zlib';

import { parse, stringify } from '../json';
import { TargetLanguage } from '../languages';
import * as logging from '../logging';
import { TypeScriptSnippet, SnippetLocation, completeSource } from '../snippet';
Expand Down Expand Up @@ -141,15 +143,17 @@ export class LanguageTablet {
* compressed and decompress accordingly.
*/
public async load(filename: string) {
let data = await fs.readFile(filename);
// Gzip objects start with 1f 8b 08
if (data[0] === 0x1f && data[1] === 0x8b && data[2] === 0x08) {
// This is a gz object, so we decompress it now...
data = zlib.gunzipSync(data);
let readStream: Readable;
if (await isGzipped(filename)) {
const gunzip = zlib.createGunzip();
createReadStream(filename).pipe(gunzip, { end: true });
readStream = gunzip;
this.compressedSource = true;
} else {
readStream = createReadStream(filename);
}

const obj: TabletSchema = JSON.parse(data.toString('utf-8'));
const obj: TabletSchema = await parse(readStream);

if (!obj.toolVersion || !obj.snippets) {
throw new Error(`File '${filename}' does not seem to be a Tablet file`);
Expand Down Expand Up @@ -181,12 +185,11 @@ export class LanguageTablet {
public async save(filename: string, compress = false) {
await fs.mkdir(path.dirname(filename), { recursive: true });

let schema = Buffer.from(JSON.stringify(this.toSchema(), null, 2));
if (compress) {
schema = zlib.gzipSync(schema);
}
const writeStream: Writable = createWriteStream(filename, { flags: 'w' });
const gzip = compress ? zlib.createGzip() : undefined;
gzip?.pipe(writeStream, { end: true });

await fs.writeFile(filename, schema);
return stringify(this.toSchema(), gzip ?? writeStream);
}

private toSchema(): TabletSchema {
Expand Down Expand Up @@ -316,3 +319,14 @@ export interface Translation {
language: string;
didCompile?: boolean;
}

async function isGzipped(filename: string) {
const openFile = await fs.open(filename, 'r');
try {
// Assumes that we can always read 3 bytes if there's that many in the file...
const { bytesRead, buffer } = await openFile.read(Buffer.alloc(4), 0, 3, 0);
return bytesRead >= 3 && buffer[0] === 0x1f && buffer[1] === 0x8b && buffer[2] === 0x08;
} finally {
await openFile.close();
}
}
2 changes: 2 additions & 0 deletions packages/jsii-rosetta/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"devDependencies": {
"@types/commonmark": "^0.27.6",
"@types/mock-fs": "^4.13.1",
"@types/stream-json": "^1.7.3",
"@types/workerpool": "^6.4.0",
"@types/semver": "^7.3.13",
"jsii-build-tools": "0.0.0",
Expand All @@ -33,6 +34,7 @@
"@xmldom/xmldom": "^0.8.6",
"workerpool": "^6.4.0",
"yargs": "^16.2.0",
"stream-json": "^1.7.5",
"semver": "^7.3.8",
"semver-intersect": "^1.4.0",
"fast-glob": "^3.2.12",
Expand Down
121 changes: 121 additions & 0 deletions packages/jsii-rosetta/test/json.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { kStringMaxLength } from 'node:buffer';
import { PassThrough, Readable, Writable } from 'node:stream';

import { parse, stringify } from '../lib/json';

describe(parse, () => {
test('small value', async () => {
const value = { foo: 'bar', baz: 123 };
const jsonText = JSON.stringify(value);

const readable = new PassThrough();
readable.end(jsonText);

expect(await parse(readable)).toEqual(value);
});

test('value is too large to fit in a single string', async () => {
// We'll leverage the fact JSON can contain multiple definitions of the same key multiple times...
const expected = { foo: 'bar', baz: 123, bool: true, null: null, long: 'X'.repeat(102_400) };

const readable = Readable.from(
(function* () {
const chunks = Object.entries(expected).map(
([key, value]) => ` ${JSON.stringify(key)}: ${JSON.stringify(value)}`,
);

yield '{\n';
let counter = 2;
let emitComma = false;
while (counter < kStringMaxLength) {
for (const chunk of chunks) {
if (emitComma) {
yield ',\n';
counter += 2;
}
yield chunk;
counter += chunk.length;
emitComma = true;
}
}
yield '\n}\n';
})(),
);

const actual = await parse(readable);
expect(actual).toEqual(expected);
});

test('invalid JSON input', () => {
const readable = new PassThrough();
readable.end('{"bad": "JSON",');

return expect(parse(readable)).rejects.toThrowErrorMatchingInlineSnapshot(
`"Parser cannot parse input: expected an object key"`,
);
});
});

describe(stringify, () => {
test('small value', async () => {
const value = { foo: 'bar', baz: 123 };
const jsonText = JSON.stringify(value);

const chunks = new Array<Buffer>();
const writable = new Writable({
write: (chunk, _encoding, callback) => {
chunks.push(Buffer.from(chunk));
callback(null);
},
});

await stringify(value, writable);
expect(Buffer.concat(chunks).toString('utf-8')).toBe(jsonText);
});

test('value too large for JSON text to fit in a string', async () => {
const value = { key: 'X'.repeat(kStringMaxLength) };

const chunks = new Array<Buffer>();
const writable = new Writable({
write: (chunk, _encoding, callback) => {
chunks.push(Buffer.from(chunk));
callback(null);
},
});

await stringify(value, writable);

expect(headBytes(chunks, 10).toString('utf-8')).toBe('{"key":"XX');
expect(tailBytes(chunks, 10).toString('utf-8')).toBe('XXXXXXXX"}');
});
});

function headBytes(chunks: readonly Buffer[], count: number): Buffer {
if (chunks.length === 0) {
return Buffer.alloc(0);
}
const [head, ...tail] = chunks;
const headSlice = head.slice(0, count);
if (headSlice.length === count) {
return headSlice;
}

const tailSlice = headBytes(tail, count - headSlice.length);
return Buffer.concat([headSlice, tailSlice]);
}

function tailBytes(chunks: readonly Buffer[], count: number): Buffer {
if (chunks.length === 0) {
return Buffer.alloc(0);
}

const tail = chunks[chunks.length - 1];
const tailSlice = tail.slice(Math.max(0, tail.length - count), tail.length);
if (tailSlice.length === count) {
return tailSlice;
}

const headSlice = tailBytes(chunks.slice(0, chunks.length - 1), count - tailSlice.length);
return Buffer.concat([headSlice, tailSlice]);
}
27 changes: 27 additions & 0 deletions yarn.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d2ecb6d

Please sign in to comment.