Skip to content

Commit

Permalink
[filesystem] streaming support by fs providers
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Kosyakov <anton.kosyakov@typefox.io>
  • Loading branch information
akosyakov committed Jul 10, 2020
1 parent c9a2dac commit d55019f
Show file tree
Hide file tree
Showing 9 changed files with 752 additions and 93 deletions.
37 changes: 35 additions & 2 deletions packages/core/src/common/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
*--------------------------------------------------------------------------------------------*/
// based on https://github.com/microsoft/vscode/blob/04c36be045a94fee58e5f8992d3e3fd980294a84/src/vs/base/common/buffer.ts

/* eslint-disable no-null/no-null */

import { Buffer as SaferBuffer } from 'safer-buffer';
import * as iconv from 'iconv-lite';
import * as streams from './stream';
Expand Down Expand Up @@ -175,6 +177,19 @@ export namespace BinaryBufferReadable {
export function fromBuffer(buffer: BinaryBuffer): BinaryBufferReadable {
return streams.toReadable<BinaryBuffer>(buffer);
}
export function fromReadable(readable: streams.Readable<string>): BinaryBufferReadable {
return {
read(): BinaryBuffer | null {
const value = readable.read();

if (typeof value === 'string') {
return BinaryBuffer.fromString(value);
}

return null;
}
};
}
}

export interface BinaryBufferReadableStream extends streams.ReadableStream<BinaryBuffer> { }
Expand All @@ -187,9 +202,27 @@ export namespace BinaryBufferReadableStream {
}
}

export interface BinaryBufferReadableBufferedStream extends streams.ReadableBufferedStream<BinaryBuffer> { }
export namespace BinaryBufferReadableBufferedStream {
export async function toBuffer(bufferedStream: streams.ReadableBufferedStream<BinaryBuffer>): Promise<BinaryBuffer> {
if (bufferedStream.ended) {
return BinaryBuffer.concat(bufferedStream.buffer);
}

return BinaryBuffer.concat([

// Include already read chunks...
...bufferedStream.buffer,

// ...and all additional chunks
await BinaryBufferReadableStream.toBuffer(bufferedStream.stream)
]);
}
}

export interface BinaryBufferWriteableStream extends streams.WriteableStream<BinaryBuffer> { }
export namespace BinaryBufferWriteableStream {
export function create(): BinaryBufferWriteableStream {
return streams.newWriteableStream<BinaryBuffer>(chunks => BinaryBuffer.concat(chunks));
export function create(options?: streams.WriteableStreamOptions): BinaryBufferWriteableStream {
return streams.newWriteableStream<BinaryBuffer>(chunks => BinaryBuffer.concat(chunks), options);
}
}
159 changes: 157 additions & 2 deletions packages/core/src/common/encoding-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

// based on https://github.com/microsoft/vscode/blob/04c36be045a94fee58e5f8992d3e3fd980294a84/src/vs/workbench/services/textfile/common/encoding.ts

/* eslint-disable no-null/no-null */

import * as iconv from 'iconv-lite';
import { Buffer } from 'safer-buffer';
import { injectable } from 'inversify';
import { BinaryBuffer } from './buffer';
import { BinaryBuffer, BinaryBufferReadableStream, BinaryBufferReadable } from './buffer';
import { UTF8, UTF8_with_bom, UTF16be, UTF16le, UTF16be_BOM, UTF16le_BOM, UTF8_BOM } from './encodings';
import { newWriteableStream, ReadableStream, Readable } from './stream';

const ZERO_BYTE_DETECTION_BUFFER_MAX_LEN = 512; // number of bytes to look at to decide about a file being binary or not
const NO_ENCODING_GUESS_MIN_BYTES = 512; // when not auto guessing the encoding, small number of bytes are enough
const AUTO_ENCODING_GUESS_MIN_BYTES = 512 * 8; // with auto guessing we want a lot more content to be read for guessing
const AUTO_ENCODING_GUESS_MAX_BYTES = 512 * 128; // set an upper limit for the number of bytes we pass on to jschardet

// we explicitly ignore a specific set of encodings from auto guessing
Expand All @@ -46,6 +50,17 @@ export interface DetectedEncoding {
seemsBinary?: boolean
}

export interface DecodeStreamOptions {
guessEncoding?: boolean;
minBytesRequiredForDetection?: number;

overwriteEncoding(detectedEncoding: string | undefined): Promise<string>;
}
export interface DecodeStreamResult {
stream: ReadableStream<string>;
detected: DetectedEncoding;
}

@injectable()
export class EncodingService {

Expand Down Expand Up @@ -221,4 +236,144 @@ export class EncodingService {
return this.toIconvEncoding(guessed.encoding);
}

decodeStream(source: BinaryBufferReadableStream, options: DecodeStreamOptions): Promise<DecodeStreamResult> {
const minBytesRequiredForDetection = options.minBytesRequiredForDetection ?? options.guessEncoding ? AUTO_ENCODING_GUESS_MIN_BYTES : NO_ENCODING_GUESS_MIN_BYTES;

return new Promise<DecodeStreamResult>((resolve, reject) => {
const target = newWriteableStream<string>(strings => strings.join(''));

const bufferedChunks: BinaryBuffer[] = [];
let bytesBuffered = 0;

let decoder: iconv.DecoderStream | undefined = undefined;

const createDecoder = async () => {
try {

// detect encoding from buffer
const detected = await this.detectEncoding(BinaryBuffer.concat(bufferedChunks), options.guessEncoding);

// ensure to respect overwrite of encoding
detected.encoding = await options.overwriteEncoding(detected.encoding);

// decode and write buffered content
decoder = iconv.getDecoder(this.toIconvEncoding(detected.encoding));
const decoded = decoder.write(Buffer.from(BinaryBuffer.concat(bufferedChunks).buffer));
target.write(decoded);

bufferedChunks.length = 0;
bytesBuffered = 0;

// signal to the outside our detected encoding and final decoder stream
resolve({
stream: target,
detected
});
} catch (error) {
reject(error);
}
};

// Stream error: forward to target
source.on('error', error => target.error(error));

// Stream data
source.on('data', async chunk => {

// if the decoder is ready, we just write directly
if (decoder) {
target.write(decoder.write(Buffer.from(chunk.buffer)));
} else {
bufferedChunks.push(chunk);
bytesBuffered += chunk.byteLength;

// buffered enough data for encoding detection, create stream
if (bytesBuffered >= minBytesRequiredForDetection) {

// pause stream here until the decoder is ready
source.pause();

await createDecoder();

// resume stream now that decoder is ready but
// outside of this stack to reduce recursion
setTimeout(() => source.resume());
}
}
});

// Stream end
source.on('end', async () => {

// we were still waiting for data to do the encoding
// detection. thus, wrap up starting the stream even
// without all the data to get things going
if (!decoder) {
await createDecoder();
}

// end the target with the remainders of the decoder
target.end(decoder?.end());
});
});
}

encodeStream(value: string | Readable<string>, options?: ResourceEncoding): Promise<BinaryBuffer | BinaryBufferReadable>
encodeStream(value?: string | Readable<string>, options?: ResourceEncoding): Promise<BinaryBuffer | BinaryBufferReadable | undefined>;
async encodeStream(value: string | Readable<string> | undefined, options?: ResourceEncoding): Promise<BinaryBuffer | BinaryBufferReadable | undefined> {
let encoding = options?.encoding;
const addBOM = options?.hasBOM;
encoding = this.toIconvEncoding(encoding);
if (encoding === UTF8 && !addBOM) {
return value === undefined ? undefined : typeof value === 'string' ?
BinaryBuffer.fromString(value) : BinaryBufferReadable.fromReadable(value);
}

value = value || '';
const readable = typeof value === 'string' ? Readable.fromString(value) : value;
const encoder = iconv.getEncoder(encoding, { addBOM });

let bytesRead = 0;
let done = false;

return {
read(): BinaryBuffer | null {
if (done) {
return null;
}

const chunk = readable.read();
if (typeof chunk !== 'string') {
done = true;

// If we are instructed to add a BOM but we detect that no
// bytes have been read, we must ensure to return the BOM
// ourselves so that we comply with the contract.
if (bytesRead === 0 && addBOM) {
switch (encoding) {
case UTF8:
case UTF8_with_bom:
return BinaryBuffer.wrap(Uint8Array.from(UTF8_BOM));
case UTF16be:
return BinaryBuffer.wrap(Uint8Array.from(UTF16be_BOM));
case UTF16le:
return BinaryBuffer.wrap(Uint8Array.from(UTF16le_BOM));
}
}

const leftovers = encoder.end();
if (leftovers && leftovers.length > 0) {
return BinaryBuffer.wrap(leftovers);
}

return null;
}

bytesRead += chunk.length;

return BinaryBuffer.wrap(encoder.write(chunk));
}
};
}

}
Loading

0 comments on commit d55019f

Please sign in to comment.