From fc9f78059d460a0cc980c9db631742c1f4d0e751 Mon Sep 17 00:00:00 2001 From: Anton Kosyakov Date: Wed, 8 Jul 2020 12:10:50 +0000 Subject: [PATCH 1/4] [filesystem]: prompt opening binary files Signed-off-by: Anton Kosyakov --- .../filesystem/src/browser/file-resource.ts | 37 +++++++++++++++++-- .../filesystem/src/browser/file-service.ts | 29 ++++++++++++++- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/packages/filesystem/src/browser/file-resource.ts b/packages/filesystem/src/browser/file-resource.ts index 51024639811e2..845b817fab951 100644 --- a/packages/filesystem/src/browser/file-resource.ts +++ b/packages/filesystem/src/browser/file-resource.ts @@ -20,7 +20,7 @@ import { DisposableCollection } from '@theia/core/lib/common/disposable'; import { Emitter, Event } from '@theia/core/lib/common/event'; import URI from '@theia/core/lib/common/uri'; import { FileOperation, FileOperationError, FileOperationResult, ETAG_DISABLED, FileSystemProviderCapabilities } from '../common/files'; -import { FileService } from './file-service'; +import { FileService, TextFileOperationError, TextFileOperationResult } from './file-service'; import { ConfirmDialog } from '@theia/core/lib/browser/dialogs'; import { LabelProvider } from '@theia/core/lib/browser/label-provider'; @@ -37,10 +37,13 @@ export namespace FileResourceVersion { export interface FileResourceOptions { shouldOverwrite: () => Promise + shouldOpenAsText: (error: string) => Promise } export class FileResource implements Resource { + protected acceptTextOnly = true; + protected readonly toDispose = new DisposableCollection(); protected readonly onDidChangeContentsEmitter = new Emitter(); readonly onDidChangeContents: Event = this.onDidChangeContentsEmitter.event; @@ -89,7 +92,11 @@ export class FileResource implements Resource { async readContents(options?: { encoding?: string }): Promise { try { const encoding = options?.encoding || this.version?.encoding; - const stat = await this.fileService.read(this.uri, { encoding, etag: ETAG_DISABLED }); + const stat = await this.fileService.read(this.uri, { + encoding, + etag: ETAG_DISABLED, + acceptTextOnly: this.acceptTextOnly + }); this._version = { encoding: stat.encoding, etag: stat.etag, @@ -97,6 +104,14 @@ export class FileResource implements Resource { }; return stat.value; } catch (e) { + if (e instanceof TextFileOperationError && e.textFileOperationResult === TextFileOperationResult.FILE_IS_BINARY) { + if (await this.shouldOpenAsText(e.message)) { + this.acceptTextOnly = false; + return this.readContents(options); + } else { + throw e; + } + } if (e instanceof FileOperationError && e.fileOperationResult === FileOperationResult.FILE_NOT_FOUND) { this._version = undefined; const { message, stack } = e; @@ -181,6 +196,7 @@ export class FileResource implements Resource { }; async guessEncoding(): Promise { + // TODO limit size const content = await this.fileService.read(this.uri, { autoGuessEncoding: true }); return content.encoding; } @@ -204,6 +220,10 @@ export class FileResource implements Resource { return this.options.shouldOverwrite(); } + protected async shouldOpenAsText(error: string): Promise { + return this.options.shouldOpenAsText(error); + } + } @injectable() @@ -228,7 +248,8 @@ export class FileResourceResolver implements ResourceResolver { throw new Error('The given uri is a directory: ' + this.labelProvider.getLongName(uri)); } return new FileResource(uri, this.fileService, { - shouldOverwrite: () => this.shouldOverwrite(uri) + shouldOverwrite: () => this.shouldOverwrite(uri), + shouldOpenAsText: error => this.shouldOpenAsText(uri, error) }); } @@ -242,4 +263,14 @@ export class FileResourceResolver implements ResourceResolver { return !!await dialog.open(); } + protected async shouldOpenAsText(uri: URI, error: string): Promise { + const dialog = new ConfirmDialog({ + title: error, + msg: `Do you want to open '${this.labelProvider.getLongName(uri)}' anyway?`, + ok: 'Yes', + cancel: 'No' + }); + return !!await dialog.open(); + } + } diff --git a/packages/filesystem/src/browser/file-service.ts b/packages/filesystem/src/browser/file-service.ts index e83fb62e82f94..7d928162a9ea9 100644 --- a/packages/filesystem/src/browser/file-service.ts +++ b/packages/filesystem/src/browser/file-service.ts @@ -106,7 +106,13 @@ export interface WriteEncodingOptions { overwriteEncoding?: boolean; } -export interface ReadTextFileOptions extends ReadEncodingOptions, ReadFileOptions { } +export interface ReadTextFileOptions extends ReadEncodingOptions, ReadFileOptions { + /** + * The optional acceptTextOnly parameter allows to fail this request early if the file + * contents are not textual. + */ + acceptTextOnly?: boolean; +} export interface TextFileContent extends BaseStatWithMetadata { @@ -188,6 +194,23 @@ export interface FileSystemProviderActivationEvent extends WaitUntilEvent { scheme: string; } +export const enum TextFileOperationResult { + FILE_IS_BINARY +} + +export class TextFileOperationError extends FileOperationError { + + constructor( + message: string, + public textFileOperationResult: TextFileOperationResult, + public options?: ReadTextFileOptions & WriteTextFileOptions + ) { + super(message, FileOperationResult.FILE_OTHER_ERROR); + Object.setPrototypeOf(this, TextFileOperationError.prototype); + } + +} + @injectable() export class FileService { @@ -547,7 +570,11 @@ export class FileService { autoGuessEncoding: typeof options?.autoGuessEncoding === 'boolean' ? options.autoGuessEncoding : this.preferences['files.autoGuessEncoding'] }; const content = await this.readFile(resource, options); + // TODO stream const detected = await this.encodingService.detectEncoding(content.value, options.autoGuessEncoding); + if (options?.acceptTextOnly && detected.seemsBinary) { + throw new TextFileOperationError('File seems to be binary and cannot be opened as text', TextFileOperationResult.FILE_IS_BINARY, options); + } const encoding = await this.getReadEncoding(resource, options, detected.encoding); const value = this.encodingService.decode(content.value, encoding); return { ...content, encoding, value }; From 649b0899ee15a1e65697c2205e61b61d87ce35d0 Mon Sep 17 00:00:00 2001 From: Anton Kosyakov Date: Wed, 8 Jul 2020 13:27:01 +0000 Subject: [PATCH 2/4] fix #4731: prevent opening too large files Signed-off-by: Anton Kosyakov --- .../filesystem/src/browser/file-resource.ts | 25 ++++++++---- .../filesystem/src/browser/file-service.ts | 39 +++++++++++++++++-- .../src/browser/filesystem-preferences.ts | 13 +++++++ packages/filesystem/src/common/files.ts | 33 ++++++++++++++++ 4 files changed, 98 insertions(+), 12 deletions(-) diff --git a/packages/filesystem/src/browser/file-resource.ts b/packages/filesystem/src/browser/file-resource.ts index 845b817fab951..274267dfaea88 100644 --- a/packages/filesystem/src/browser/file-resource.ts +++ b/packages/filesystem/src/browser/file-resource.ts @@ -19,10 +19,11 @@ import { Resource, ResourceVersion, ResourceResolver, ResourceError, ResourceSav import { DisposableCollection } from '@theia/core/lib/common/disposable'; import { Emitter, Event } from '@theia/core/lib/common/event'; import URI from '@theia/core/lib/common/uri'; -import { FileOperation, FileOperationError, FileOperationResult, ETAG_DISABLED, FileSystemProviderCapabilities } from '../common/files'; +import { FileOperation, FileOperationError, FileOperationResult, ETAG_DISABLED, FileSystemProviderCapabilities, FileReadStreamOptions, BinarySize } from '../common/files'; import { FileService, TextFileOperationError, TextFileOperationResult } from './file-service'; import { ConfirmDialog } from '@theia/core/lib/browser/dialogs'; import { LabelProvider } from '@theia/core/lib/browser/label-provider'; +import { GENERAL_MAX_FILE_SIZE_MB } from './filesystem-preferences'; export interface FileResourceVersion extends ResourceVersion { readonly encoding: string; @@ -43,6 +44,7 @@ export interface FileResourceOptions { export class FileResource implements Resource { protected acceptTextOnly = true; + protected limits: FileReadStreamOptions['limits']; protected readonly toDispose = new DisposableCollection(); protected readonly onDidChangeContentsEmitter = new Emitter(); @@ -95,7 +97,8 @@ export class FileResource implements Resource { const stat = await this.fileService.read(this.uri, { encoding, etag: ETAG_DISABLED, - acceptTextOnly: this.acceptTextOnly + acceptTextOnly: this.acceptTextOnly, + limits: this.limits }); this._version = { encoding: stat.encoding, @@ -105,14 +108,20 @@ export class FileResource implements Resource { return stat.value; } catch (e) { if (e instanceof TextFileOperationError && e.textFileOperationResult === TextFileOperationResult.FILE_IS_BINARY) { - if (await this.shouldOpenAsText(e.message)) { + if (await this.shouldOpenAsText('The file is either binary or uses an unsupported text encoding.')) { this.acceptTextOnly = false; return this.readContents(options); - } else { - throw e; } - } - if (e instanceof FileOperationError && e.fileOperationResult === FileOperationResult.FILE_NOT_FOUND) { + } else if (e instanceof FileOperationError && e.fileOperationResult === FileOperationResult.FILE_TOO_LARGE) { + const stat = await this.fileService.resolve(this.uri, { resolveMetadata: true }); + const maxFileSize = GENERAL_MAX_FILE_SIZE_MB * 1024 * 1024; + if (this.limits?.size !== maxFileSize && await this.shouldOpenAsText(`The file is too large (${BinarySize.formatSize(stat.size)}).`)) { + this.limits = { + size: maxFileSize + }; + return this.readContents(options); + } + } else if (e instanceof FileOperationError && e.fileOperationResult === FileOperationResult.FILE_NOT_FOUND) { this._version = undefined; const { message, stack } = e; throw ResourceError.NotFound({ @@ -266,7 +275,7 @@ export class FileResourceResolver implements ResourceResolver { protected async shouldOpenAsText(uri: URI, error: string): Promise { const dialog = new ConfirmDialog({ title: error, - msg: `Do you want to open '${this.labelProvider.getLongName(uri)}' anyway?`, + msg: `Opening it might take some time and might make the IDE unresponsive. Do you want to open '${this.labelProvider.getLongName(uri)}' anyway?`, ok: 'Yes', cancel: 'No' }); diff --git a/packages/filesystem/src/browser/file-service.ts b/packages/filesystem/src/browser/file-service.ts index 7d928162a9ea9..294ac4b084c9a 100644 --- a/packages/filesystem/src/browser/file-service.ts +++ b/packages/filesystem/src/browser/file-service.ts @@ -63,6 +63,7 @@ import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-proto import { EncodingRegistry } from '@theia/core/lib/browser/encoding-registry'; import { UTF8, UTF8_with_bom } from '@theia/core/lib/common/encodings'; import { EncodingService, ResourceEncoding } from '@theia/core/lib/common/encoding-service'; +import { Mutable } from '@theia/core/lib/common/types'; export interface FileOperationParticipant { @@ -565,10 +566,7 @@ export class FileService { } async read(resource: URI, options?: ReadTextFileOptions): Promise { - options = { - ...options, - autoGuessEncoding: typeof options?.autoGuessEncoding === 'boolean' ? options.autoGuessEncoding : this.preferences['files.autoGuessEncoding'] - }; + options = this.resolveReadOptions(options); const content = await this.readFile(resource, options); // TODO stream const detected = await this.encodingService.detectEncoding(content.value, options.autoGuessEncoding); @@ -580,6 +578,18 @@ export class FileService { return { ...content, encoding, value }; } + protected resolveReadOptions(options?: ReadTextFileOptions): ReadTextFileOptions { + options = { + ...options, + autoGuessEncoding: typeof options?.autoGuessEncoding === 'boolean' ? options.autoGuessEncoding : this.preferences['files.autoGuessEncoding'] + }; + const limits: Mutable = options.limits = options.limits || {}; + if (typeof limits.size !== 'number') { + limits.size = this.preferences['files.maxFileSizeMB'] * 1024 * 1024; + } + return options; + } + async update(resource: URI, changes: TextDocumentContentChangeEvent[], options: UpdateTextFileOptions): Promise { const provider = this.throwIfFileSystemIsReadonly(await this.withWriteProvider(resource), resource); try { @@ -828,9 +838,30 @@ export class FileService { throw new FileOperationError('File not modified since', FileOperationResult.FILE_NOT_MODIFIED_SINCE, options); } + // Throw if file is too large to load + this.validateReadFileLimits(resource, stat.size, options); + return stat; } + private validateReadFileLimits(resource: URI, size: number, options?: ReadFileOptions): void { + if (options?.limits) { + let tooLargeErrorResult: FileOperationResult | undefined = undefined; + + if (typeof options.limits.memory === 'number' && size > options.limits.memory) { + tooLargeErrorResult = FileOperationResult.FILE_EXCEEDS_MEMORY_LIMIT; + } + + if (typeof options.limits.size === 'number' && size > options.limits.size) { + tooLargeErrorResult = FileOperationResult.FILE_TOO_LARGE; + } + + if (typeof tooLargeErrorResult === 'number') { + throw new FileOperationError(`Unable to read file '${this.resourceForError(resource)}' that is too large to open`, tooLargeErrorResult); + } + } + } + // #endregion // #region Move/Copy/Delete/Create Folder diff --git a/packages/filesystem/src/browser/filesystem-preferences.ts b/packages/filesystem/src/browser/filesystem-preferences.ts index 57044a67fd0c9..5c8e872b6dc14 100644 --- a/packages/filesystem/src/browser/filesystem-preferences.ts +++ b/packages/filesystem/src/browser/filesystem-preferences.ts @@ -23,6 +23,13 @@ import { PreferenceContribution } from '@theia/core/lib/browser/preferences'; import { SUPPORTED_ENCODINGS } from '@theia/core/lib/browser/supported-encodings'; +import { environment } from '@theia/application-package/lib/environment'; + +// See https://github.com/Microsoft/vscode/issues/30180 +export const WIN32_MAX_FILE_SIZE_MB = 300; // 300 MB +export const GENERAL_MAX_FILE_SIZE_MB = 16 * 1024; // 16 GB + +export const MAX_FILE_SIZE_MB = environment.electron.is() ? process.arch === 'ia32' ? WIN32_MAX_FILE_SIZE_MB : GENERAL_MAX_FILE_SIZE_MB : 32; export const filesystemPreferenceSchema: PreferenceSchema = { 'type': 'object', @@ -66,6 +73,11 @@ These have precedence over the default associations of the languages installed.' type: 'number', default: 5000, markdownDescription: 'Timeout in milliseconds after which file participants for create, rename, and delete are cancelled. Use `0` to disable participants.' + }, + 'files.maxFileSizeMB': { + type: 'number', + default: MAX_FILE_SIZE_MB, + markdownDescription: 'Controls the max file size in MB which is possible to open.' } } }; @@ -78,6 +90,7 @@ export interface FileSystemConfiguration { 'files.encoding': string; 'files.autoGuessEncoding': boolean; 'files.participants.timeout': number; + 'files.maxFileSizeMB': number; } export const FileSystemPreferences = Symbol('FileSystemPreferences'); diff --git a/packages/filesystem/src/common/files.ts b/packages/filesystem/src/common/files.ts index 193c2c66526cf..b521ca006fa61 100644 --- a/packages/filesystem/src/common/files.ts +++ b/packages/filesystem/src/common/files.ts @@ -443,6 +443,14 @@ export interface FileReadStreamOptions { * will be read. */ readonly length?: number; + + /** + * If provided, the size of the file will be checked against the limits. + */ + limits?: { + readonly size?: number; + readonly memory?: number; + }; } export interface FileUpdateOptions { @@ -695,3 +703,28 @@ export function etag(stat: { mtime: number | undefined, size: number | undefined return stat.mtime.toString(29) + stat.size.toString(31); } +/** + * Helper to format a raw byte size into a human readable label. + */ +export class BinarySize { + static readonly KB = 1024; + static readonly MB = BinarySize.KB * BinarySize.KB; + static readonly GB = BinarySize.MB * BinarySize.KB; + static readonly TB = BinarySize.GB * BinarySize.KB; + + static formatSize(size: number): string { + if (size < BinarySize.KB) { + return size + 'B'; + } + if (size < BinarySize.MB) { + return (size / BinarySize.KB).toFixed(2) + 'KB'; + } + if (size < BinarySize.GB) { + return (size / BinarySize.MB).toFixed(2) + 'MB'; + } + if (size < BinarySize.TB) { + return (size / BinarySize.GB).toFixed(2) + 'GB'; + } + return (size / BinarySize.TB).toFixed(2) + 'TB'; + } +} From a46bba88e0d98a5dcd77a0648620073b53c66b92 Mon Sep 17 00:00:00 2001 From: Anton Kosyakov Date: Thu, 9 Jul 2020 14:12:03 +0000 Subject: [PATCH 3/4] [filesystem] streaming support by fs providers Signed-off-by: Anton Kosyakov --- packages/core/src/common/buffer.ts | 37 ++- packages/core/src/common/encoding-service.ts | 160 +++++++++++- packages/core/src/common/stream.ts | 236 ++++++++++++++++-- .../filesystem/src/browser/file-service.ts | 208 +++++++++++---- .../common/delegating-file-system-provider.ts | 14 +- packages/filesystem/src/common/files.ts | 21 ++ packages/filesystem/src/common/io.ts | 69 +++-- .../src/common/remote-file-system-provider.ts | 80 +++++- .../src/node/disk-file-system-provider.ts | 19 +- 9 files changed, 752 insertions(+), 92 deletions(-) diff --git a/packages/core/src/common/buffer.ts b/packages/core/src/common/buffer.ts index 58c01d26837b2..b7763cf454f27 100644 --- a/packages/core/src/common/buffer.ts +++ b/packages/core/src/common/buffer.ts @@ -19,6 +19,8 @@ *--------------------------------------------------------------------------------------------*/ // based on https://github.com/microsoft/vscode/blob/04c36be045a94fee58e5f8992d3e3fd980294a84/src/vs/base/common/buffer.ts +/* eslint-disable no-null/no-null */ + import { Buffer as SaferBuffer } from 'safer-buffer'; import * as iconv from 'iconv-lite'; import * as streams from './stream'; @@ -175,6 +177,19 @@ export namespace BinaryBufferReadable { export function fromBuffer(buffer: BinaryBuffer): BinaryBufferReadable { return streams.toReadable(buffer); } + export function fromReadable(readable: streams.Readable): BinaryBufferReadable { + return { + read(): BinaryBuffer | null { + const value = readable.read(); + + if (typeof value === 'string') { + return BinaryBuffer.fromString(value); + } + + return null; + } + }; + } } export interface BinaryBufferReadableStream extends streams.ReadableStream { } @@ -187,9 +202,27 @@ export namespace BinaryBufferReadableStream { } } +export interface BinaryBufferReadableBufferedStream extends streams.ReadableBufferedStream { } +export namespace BinaryBufferReadableBufferedStream { + export async function toBuffer(bufferedStream: streams.ReadableBufferedStream): Promise { + if (bufferedStream.ended) { + return BinaryBuffer.concat(bufferedStream.buffer); + } + + return BinaryBuffer.concat([ + + // Include already read chunks... + ...bufferedStream.buffer, + + // ...and all additional chunks + await BinaryBufferReadableStream.toBuffer(bufferedStream.stream) + ]); + } +} + export interface BinaryBufferWriteableStream extends streams.WriteableStream { } export namespace BinaryBufferWriteableStream { - export function create(): BinaryBufferWriteableStream { - return streams.newWriteableStream(chunks => BinaryBuffer.concat(chunks)); + export function create(options?: streams.WriteableStreamOptions): BinaryBufferWriteableStream { + return streams.newWriteableStream(chunks => BinaryBuffer.concat(chunks), options); } } diff --git a/packages/core/src/common/encoding-service.ts b/packages/core/src/common/encoding-service.ts index c50148007f048..847936af41c39 100644 --- a/packages/core/src/common/encoding-service.ts +++ b/packages/core/src/common/encoding-service.ts @@ -17,16 +17,20 @@ * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ - // based on https://github.com/microsoft/vscode/blob/04c36be045a94fee58e5f8992d3e3fd980294a84/src/vs/workbench/services/textfile/common/encoding.ts +/* eslint-disable no-null/no-null */ + import * as iconv from 'iconv-lite'; import { Buffer } from 'safer-buffer'; import { injectable } from 'inversify'; -import { BinaryBuffer } from './buffer'; +import { BinaryBuffer, BinaryBufferReadableStream, BinaryBufferReadable } from './buffer'; import { UTF8, UTF8_with_bom, UTF16be, UTF16le, UTF16be_BOM, UTF16le_BOM, UTF8_BOM } from './encodings'; +import { newWriteableStream, ReadableStream, Readable } from './stream'; const ZERO_BYTE_DETECTION_BUFFER_MAX_LEN = 512; // number of bytes to look at to decide about a file being binary or not +const NO_ENCODING_GUESS_MIN_BYTES = 512; // when not auto guessing the encoding, small number of bytes are enough +const AUTO_ENCODING_GUESS_MIN_BYTES = 512 * 8; // with auto guessing we want a lot more content to be read for guessing const AUTO_ENCODING_GUESS_MAX_BYTES = 512 * 128; // set an upper limit for the number of bytes we pass on to jschardet // we explicitly ignore a specific set of encodings from auto guessing @@ -46,6 +50,17 @@ export interface DetectedEncoding { seemsBinary?: boolean } +export interface DecodeStreamOptions { + guessEncoding?: boolean; + minBytesRequiredForDetection?: number; + + overwriteEncoding(detectedEncoding: string | undefined): Promise; +} +export interface DecodeStreamResult { + stream: ReadableStream; + detected: DetectedEncoding; +} + @injectable() export class EncodingService { @@ -221,4 +236,145 @@ export class EncodingService { return this.toIconvEncoding(guessed.encoding); } + decodeStream(source: BinaryBufferReadableStream, options: DecodeStreamOptions): Promise { + const minBytesRequiredForDetection = options.minBytesRequiredForDetection ?? options.guessEncoding ? AUTO_ENCODING_GUESS_MIN_BYTES : NO_ENCODING_GUESS_MIN_BYTES; + + return new Promise((resolve, reject) => { + const target = newWriteableStream(strings => strings.join('')); + + const bufferedChunks: BinaryBuffer[] = []; + let bytesBuffered = 0; + + let decoder: iconv.DecoderStream | undefined = undefined; + + const createDecoder = async () => { + try { + + // detect encoding from buffer + const detected = await this.detectEncoding(BinaryBuffer.concat(bufferedChunks), options.guessEncoding); + + // ensure to respect overwrite of encoding + detected.encoding = await options.overwriteEncoding(detected.encoding); + + // decode and write buffered content + decoder = iconv.getDecoder(this.toIconvEncoding(detected.encoding)); + const decoded = decoder.write(Buffer.from(BinaryBuffer.concat(bufferedChunks).buffer)); + target.write(decoded); + + bufferedChunks.length = 0; + bytesBuffered = 0; + + // signal to the outside our detected encoding and final decoder stream + resolve({ + stream: target, + detected + }); + } catch (error) { + reject(error); + } + }; + + // Stream error: forward to target + source.on('error', error => target.error(error)); + + // Stream data + source.on('data', async chunk => { + + // if the decoder is ready, we just write directly + if (decoder) { + target.write(decoder.write(Buffer.from(chunk.buffer))); + } else { + bufferedChunks.push(chunk); + bytesBuffered += chunk.byteLength; + + // buffered enough data for encoding detection, create stream + if (bytesBuffered >= minBytesRequiredForDetection) { + + // pause stream here until the decoder is ready + source.pause(); + + await createDecoder(); + + // resume stream now that decoder is ready but + // outside of this stack to reduce recursion + setTimeout(() => source.resume()); + } + } + }); + + // Stream end + source.on('end', async () => { + + // we were still waiting for data to do the encoding + // detection. thus, wrap up starting the stream even + // without all the data to get things going + if (!decoder) { + await createDecoder(); + } + + // end the target with the remainders of the decoder + target.end(decoder?.end()); + }); + }); + } + + encodeStream(value: string | Readable, options?: ResourceEncoding): Promise + encodeStream(value?: string | Readable, options?: ResourceEncoding): Promise; + async encodeStream(value: string | Readable | undefined, options?: ResourceEncoding): Promise { + let encoding = options?.encoding; + const addBOM = options?.hasBOM; + encoding = this.toIconvEncoding(encoding); + if (encoding === UTF8 && !addBOM) { + return value === undefined ? undefined : typeof value === 'string' ? + BinaryBuffer.fromString(value) : BinaryBufferReadable.fromReadable(value); + } + + value = value || ''; + const readable = typeof value === 'string' ? Readable.fromString(value) : value; + const encoder = iconv.getEncoder(encoding, { addBOM }); + + let bytesWritten = false; + let done = false; + + return { + read(): BinaryBuffer | null { + if (done) { + return null; + } + + const chunk = readable.read(); + if (typeof chunk !== 'string') { + done = true; + + // If we are instructed to add a BOM but we detect that no + // bytes have been written, we must ensure to return the BOM + // ourselves so that we comply with the contract. + if (!bytesWritten && addBOM) { + switch (encoding) { + case UTF8: + case UTF8_with_bom: + return BinaryBuffer.wrap(Uint8Array.from(UTF8_BOM)); + case UTF16be: + return BinaryBuffer.wrap(Uint8Array.from(UTF16be_BOM)); + case UTF16le: + return BinaryBuffer.wrap(Uint8Array.from(UTF16le_BOM)); + } + } + + const leftovers = encoder.end(); + if (leftovers && leftovers.length > 0) { + bytesWritten = true; + return BinaryBuffer.wrap(leftovers); + } + + return null; + } + + bytesWritten = true; + + return BinaryBuffer.wrap(encoder.write(chunk)); + } + }; + } + } diff --git a/packages/core/src/common/stream.ts b/packages/core/src/common/stream.ts index 8ddc9e2e95387..615065d6f8054 100644 --- a/packages/core/src/common/stream.ts +++ b/packages/core/src/common/stream.ts @@ -25,6 +25,8 @@ /* eslint-disable @typescript-eslint/tslint/config */ /* eslint-disable @typescript-eslint/no-explicit-any */ +import { DisposableCollection, Disposable } from './disposable'; + export interface ReadableStreamEvents { /** @@ -66,6 +68,11 @@ export interface ReadableStream extends ReadableStreamEvents { * Destroys the stream and stops emitting any event. */ destroy(): void; + + /** + * Allows to remove a listener that was previously added. + */ + removeListener(event: string, callback: Function): void; } /** @@ -80,6 +87,23 @@ export interface Readable { */ read(): T | null; } +export namespace Readable { + export function fromString(value: string): Readable { + let done = false; + + return { + read(): string | null { + if (!done) { + done = true; + + return value; + } + + return null; + } + }; + } +} /** * A interface that emulates the API shape of a node.js writeable @@ -91,8 +115,14 @@ export interface WriteableStream extends ReadableStream { * Writing data to the stream will trigger the on('data') * event listener if the stream is flowing and buffer the * data otherwise until the stream is flowing. + * + * If a `highWaterMark` is configured and writing to the + * stream reaches this mark, a promise will be returned + * that should be awaited on before writing more data. + * Otherwise there is a risk of buffering a large number + * of data chunks without consumer. */ - write(data: T): void; + write(data: T): void | Promise; /** * Signals an error to the consumer of the stream via the @@ -112,33 +142,74 @@ export interface WriteableStream extends ReadableStream { end(result?: T | Error): void; } +/** + * A stream that has a buffer already read. Returns the original stream + * that was read as well as the chunks that got read. + * + * The `ended` flag indicates if the stream has been fully consumed. + */ +export interface ReadableBufferedStream { + + /** + * The original stream that is being read. + */ + stream: ReadableStream; + + /** + * An array of chunks already read from this stream. + */ + buffer: T[]; + + /** + * Signals if the stream has ended or not. If not, consumers + * should continue to read from the stream until consumed. + */ + ended: boolean; +} + export function isReadableStream(obj: unknown): obj is ReadableStream { const candidate = obj as ReadableStream; return candidate && [candidate.on, candidate.pause, candidate.resume, candidate.destroy].every(fn => typeof fn === 'function'); } -export interface IReducer { +export function isReadableBufferedStream(obj: unknown): obj is ReadableBufferedStream { + const candidate = obj as ReadableBufferedStream; + + return candidate && isReadableStream(candidate.stream) && Array.isArray(candidate.buffer) && typeof candidate.ended === 'boolean'; +} + +export interface Reducer { (data: T[]): T; } -export interface IDataTransformer { +export interface DataTransformer { (data: Original): Transformed; } -export interface IErrorTransformer { +export interface ErrorTransformer { (error: Error): Error; } export interface ITransformer { - data: IDataTransformer; - error?: IErrorTransformer; + data: DataTransformer; + error?: ErrorTransformer; } -export function newWriteableStream(reducer: IReducer): WriteableStream { +export function newWriteableStream(reducer: Reducer, options?: WriteableStreamOptions): WriteableStream { return new WriteableStreamImpl(reducer); } +export interface WriteableStreamOptions { + + /** + * The number of objects to buffer before WriteableStream#write() + * signals back that the buffer is full. Can be used to reduce + * the memory pressure when the stream is not flowing. + */ + highWaterMark?: number; +} + class WriteableStreamImpl implements WriteableStream { private readonly state = { @@ -158,7 +229,9 @@ class WriteableStreamImpl implements WriteableStream { end: [] as { (): void }[] }; - constructor(private reducer: IReducer) { } + private readonly pendingWritePromises: Function[] = []; + + constructor(private reducer: Reducer, private options?: WriteableStreamOptions) { } pause(): void { if (this.state.destroyed) { @@ -183,7 +256,7 @@ class WriteableStreamImpl implements WriteableStream { } } - write(data: T): void { + write(data: T): void | Promise { if (this.state.destroyed) { return; } @@ -196,6 +269,11 @@ class WriteableStreamImpl implements WriteableStream { // not yet flowing: buffer data until flowing else { this.buffer.data.push(data); + + // highWaterMark: if configured, signal back when buffer reached limits + if (typeof this.options?.highWaterMark === 'number' && this.buffer.data.length > this.options.highWaterMark) { + return new Promise(resolve => this.pendingWritePromises.push(resolve)); + } } } @@ -284,6 +362,35 @@ class WriteableStreamImpl implements WriteableStream { } } + removeListener(event: string, callback: Function): void { + if (this.state.destroyed) { + return; + } + + let listeners: unknown[] | undefined = undefined; + + switch (event) { + case 'data': + listeners = this.listeners.data; + break; + + case 'end': + listeners = this.listeners.end; + break; + + case 'error': + listeners = this.listeners.error; + break; + } + + if (listeners) { + const index = listeners.indexOf(callback); + if (index >= 0) { + listeners.splice(index, 1); + } + } + } + private flowData(): void { if (this.buffer.data.length > 0) { const fullDataBuffer = this.reducer(this.buffer.data); @@ -291,6 +398,11 @@ class WriteableStreamImpl implements WriteableStream { this.listeners.data.forEach(listener => listener(fullDataBuffer)); this.buffer.data.length = 0; + + // When the buffer is empty, resolve all pending writers + const pendingWritePromises = [...this.pendingWritePromises]; + this.pendingWritePromises.length = 0; + pendingWritePromises.forEach(pendingWritePromise => pendingWritePromise()); } } @@ -325,6 +437,8 @@ class WriteableStreamImpl implements WriteableStream { this.listeners.data.length = 0; this.listeners.error.length = 0; this.listeners.end.length = 0; + + this.pendingWritePromises.length = 0; } } } @@ -332,7 +446,7 @@ class WriteableStreamImpl implements WriteableStream { /** * Helper to fully read a T readable into a T. */ -export function consumeReadable(readable: Readable, reducer: IReducer): T { +export function consumeReadable(readable: Readable, reducer: Reducer): T { const chunks: T[] = []; let chunk: T | null; @@ -348,7 +462,55 @@ export function consumeReadable(readable: Readable, reducer: IReducer): * reached, will return a readable instead to ensure all data can still * be read. */ -export function consumeReadableWithLimit(readable: Readable, reducer: IReducer, maxChunks: number): T | Readable { +export function consumeReadableWithLimit(readable: Readable, reducer: Reducer, maxChunks: number): T | Readable { + const chunks: T[] = []; + + let chunk: T | null | undefined = undefined; + while ((chunk = readable.read()) !== null && chunks.length < maxChunks) { + chunks.push(chunk); + } + + // If the last chunk is null, it means we reached the end of + // the readable and return all the data at once + if (chunk === null && chunks.length > 0) { + return reducer(chunks); + } + + // Otherwise, we still have a chunk, it means we reached the maxChunks + // value and as such we return a new Readable that first returns + // the existing read chunks and then continues with reading from + // the underlying readable. + return { + read: () => { + + // First consume chunks from our array + if (chunks.length > 0) { + return chunks.shift()!; + } + + // Then ensure to return our last read chunk + if (typeof chunk !== 'undefined') { + const lastReadChunk = chunk; + + // explicitly use undefined here to indicate that we consumed + // the chunk, which could have either been null or valued. + chunk = undefined; + + return lastReadChunk; + } + + // Finally delegate back to the Readable + return readable.read(); + } + }; +} + +/** + * Helper to read a T readable up to a maximum of chunks. If the limit is + * reached, will return a readable instead to ensure all data can still + * be read. + */ +export function peekReadable(readable: Readable, reducer: Reducer, maxChunks: number): T | Readable { const chunks: T[] = []; let chunk: T | null | undefined = undefined; @@ -394,7 +556,7 @@ export function consumeReadableWithLimit(readable: Readable, reducer: IRed /** * Helper to fully read a T stream into a T. */ -export function consumeStream(stream: ReadableStream, reducer: IReducer): Promise { +export function consumeStream(stream: ReadableStream, reducer: Reducer): Promise { return new Promise((resolve, reject) => { const chunks: T[] = []; @@ -404,12 +566,56 @@ export function consumeStream(stream: ReadableStream, reducer: IReducer }); } +/** + * Helper to peek up to `maxChunks` into a stream. The return type signals if + * the stream has ended or not. If not, caller needs to add a `data` listener + * to continue reading. + */ +export function peekStream(stream: ReadableStream, maxChunks: number): Promise> { + return new Promise((resolve, reject) => { + const streamListeners = new DisposableCollection(); + + // Data Listener + const buffer: T[] = []; + const dataListener = (chunk: T) => { + + // Add to buffer + buffer.push(chunk); + + // We reached maxChunks and thus need to return + if (buffer.length > maxChunks) { + + // Dispose any listeners and ensure to pause the + // stream so that it can be consumed again by caller + streamListeners.dispose(); + stream.pause(); + + return resolve({ stream, buffer, ended: false }); + } + }; + + streamListeners.push(Disposable.create(() => stream.removeListener('data', dataListener))); + stream.on('data', dataListener); + + // Error Listener + const errorListener = (error: Error) => reject(error); + + streamListeners.push(Disposable.create(() => stream.removeListener('error', errorListener))); + stream.on('error', errorListener); + + const endListener = () => resolve({ stream, buffer, ended: true }); + + streamListeners.push(Disposable.create(() => stream.removeListener('end', endListener))); + stream.on('end', endListener); + }); +} + /** * Helper to read a T stream up to a maximum of chunks. If the limit is * reached, will return a stream instead to ensure all data can still * be read. */ -export function consumeStreamWithLimit(stream: ReadableStream, reducer: IReducer, maxChunks: number): Promise> { +export function consumeStreamWithLimit(stream: ReadableStream, reducer: Reducer, maxChunks: number): Promise> { return new Promise((resolve, reject) => { const chunks: T[] = []; @@ -463,7 +669,7 @@ export function consumeStreamWithLimit(stream: ReadableStream, reducer: IR /** * Helper to create a readable stream from an existing T. */ -export function toStream(t: T, reducer: IReducer): ReadableStream { +export function toStream(t: T, reducer: Reducer): ReadableStream { const stream = newWriteableStream(reducer); stream.end(t); @@ -493,7 +699,7 @@ export function toReadable(t: T): Readable { /** * Helper to transform a readable stream into another stream. */ -export function transform(stream: ReadableStreamEvents, transformer: ITransformer, reducer: IReducer): ReadableStream { +export function transform(stream: ReadableStreamEvents, transformer: ITransformer, reducer: Reducer): ReadableStream { const target = newWriteableStream(reducer); stream.on('data', data => target.write(transformer.data(data))); diff --git a/packages/filesystem/src/browser/file-service.ts b/packages/filesystem/src/browser/file-service.ts index 294ac4b084c9a..6b5c8b9e58bdd 100644 --- a/packages/filesystem/src/browser/file-service.ts +++ b/packages/filesystem/src/browser/file-service.ts @@ -50,11 +50,11 @@ import { Stat, WatchOptions, WriteFileOptions, toFileOperationResult, toFileSystemProviderErrorCode, ResolveFileResult, ResolveFileResultWithMetadata, - MoveFileOptions, CopyFileOptions, BaseStatWithMetadata, FileDeleteOptions, FileOperationOptions, hasAccessCapability, hasUpdateCapability + MoveFileOptions, CopyFileOptions, BaseStatWithMetadata, FileDeleteOptions, FileOperationOptions, hasAccessCapability, hasUpdateCapability, + hasFileReadStreamCapability, FileSystemProviderWithFileReadStreamCapability } from '../common/files'; -import { createReadStream } from '../common/io'; -import { BinaryBuffer, BinaryBufferReadable, BinaryBufferReadableStream } from '@theia/core/lib/common/buffer'; -import { isReadableStream, ReadableStreamEvents, transform, consumeStreamWithLimit, consumeReadableWithLimit } from '@theia/core/lib/common/stream'; +import { BinaryBuffer, BinaryBufferReadable, BinaryBufferReadableStream, BinaryBufferReadableBufferedStream, BinaryBufferWriteableStream } from '@theia/core/lib/common/buffer'; +import { ReadableStream, isReadableStream, isReadableBufferedStream, transform, consumeStream, peekStream, peekReadable, Readable } from '@theia/core/lib/common/stream'; import { LabelProvider } from '@theia/core/lib/browser/label-provider'; import { FileSystemPreferences } from './filesystem-preferences'; import { ProgressService } from '@theia/core/lib/common/progress-service'; @@ -62,8 +62,9 @@ import { DelegatingFileSystemProvider } from '../common/delegating-file-system-p import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol'; import { EncodingRegistry } from '@theia/core/lib/browser/encoding-registry'; import { UTF8, UTF8_with_bom } from '@theia/core/lib/common/encodings'; -import { EncodingService, ResourceEncoding } from '@theia/core/lib/common/encoding-service'; +import { EncodingService, ResourceEncoding, DecodeStreamResult } from '@theia/core/lib/common/encoding-service'; import { Mutable } from '@theia/core/lib/common/types'; +import { readFileIntoStream } from '../common/io'; export interface FileOperationParticipant { @@ -115,12 +116,15 @@ export interface ReadTextFileOptions extends ReadEncodingOptions, ReadFileOption acceptTextOnly?: boolean; } -export interface TextFileContent extends BaseStatWithMetadata { +interface BaseTextFileContent extends BaseStatWithMetadata { - /** - * The encoding of the content if known. - */ + /** + * The encoding of the content if known. + */ encoding: string; +} + +export interface TextFileContent extends BaseTextFileContent { /** * The content of a text file. @@ -128,6 +132,14 @@ export interface TextFileContent extends BaseStatWithMetadata { value: string; } +export interface TextFileStreamContent extends BaseTextFileContent { + + /** + * The line grouped content of a text file. + */ + value: ReadableStream; +} + export interface CreateTextFileOptions extends WriteEncodingOptions, CreateFileOptions { } export interface WriteTextFileOptions extends WriteEncodingOptions, WriteFileOptions { } @@ -531,7 +543,7 @@ export class FileService { // #region Text File Reading/Writing - async create(resource: URI, value?: string, options?: CreateTextFileOptions): Promise { + async create(resource: URI, value?: string | Readable, options?: CreateTextFileOptions): Promise { if (options?.fromUserGesture === false) { return this.doCreate(resource, value, options); } @@ -553,29 +565,72 @@ export class FileService { return stat; } - protected async doCreate(resource: URI, value?: string, options?: CreateTextFileOptions): Promise { + protected async doCreate(resource: URI, value?: string | Readable, options?: CreateTextFileOptions): Promise { const encoding = await this.getWriteEncoding(resource, options); - const encoded = this.encodingService.encode(value || '', encoding); + const encoded = await this.encodingService.encodeStream(value, encoding); return this.createFile(resource, encoded, options); } - async write(resource: URI, value: string, options?: WriteTextFileOptions): Promise { + async write(resource: URI, value: string | Readable, options?: WriteTextFileOptions): Promise { const encoding = await this.getWriteEncoding(resource, options); - const encoded = this.encodingService.encode(value, encoding); + const encoded = await this.encodingService.encodeStream(value, encoding); return Object.assign(await this.writeFile(resource, encoded, options), { encoding: encoding.encoding }); } async read(resource: URI, options?: ReadTextFileOptions): Promise { + const [bufferStream, decoder] = await this.doRead(resource, { + ...options, + // optimization: since we know that the caller does not + // care about buffering, we indicate this to the reader. + // this reduces all the overhead the buffered reading + // has (open, read, close) if the provider supports + // unbuffered reading. + preferUnbuffered: true + }); + + return { + ...bufferStream, + encoding: decoder.detected.encoding || UTF8, + value: await consumeStream(decoder.stream, strings => strings.join('')) + }; + } + + async readStream(resource: URI, options?: ReadTextFileOptions): Promise { + const [bufferStream, decoder] = await this.doRead(resource, options); + + return { + ...bufferStream, + encoding: decoder.detected.encoding || UTF8, + value: decoder.stream + }; + } + + private async doRead(resource: URI, options?: ReadTextFileOptions & { preferUnbuffered?: boolean }): Promise<[FileStreamContent, DecodeStreamResult]> { options = this.resolveReadOptions(options); - const content = await this.readFile(resource, options); - // TODO stream - const detected = await this.encodingService.detectEncoding(content.value, options.autoGuessEncoding); - if (options?.acceptTextOnly && detected.seemsBinary) { + + // read stream raw (either buffered or unbuffered) + let bufferStream: FileStreamContent; + if (options?.preferUnbuffered) { + const content = await this.readFile(resource, options); + bufferStream = { + ...content, + value: BinaryBufferReadableStream.fromBuffer(content.value) + }; + } else { + bufferStream = await this.readFileStream(resource, options); + } + + const decoder = await this.encodingService.decodeStream(bufferStream.value, { + guessEncoding: options.autoGuessEncoding, + overwriteEncoding: detectedEncoding => this.getReadEncoding(resource, options, detectedEncoding) + }); + + // validate binary + if (options?.acceptTextOnly && decoder.detected.seemsBinary) { throw new TextFileOperationError('File seems to be binary and cannot be opened as text', TextFileOperationResult.FILE_IS_BINARY, options); } - const encoding = await this.getReadEncoding(resource, options, detected.encoding); - const value = this.encodingService.decode(content.value, encoding); - return { ...content, encoding, value }; + + return [bufferStream, decoder]; } protected resolveReadOptions(options?: ReadTextFileOptions): ReadTextFileOptions { @@ -647,22 +702,30 @@ export class FileService { // to write is a Readable, we consume up to 3 chunks and try to write the data // unbuffered to reduce the overhead. If the Readable has more data to provide // we continue to write buffered. + let bufferOrReadableOrStreamOrBufferedStream: BinaryBuffer | BinaryBufferReadable | BinaryBufferReadableStream | BinaryBufferReadableBufferedStream; if (hasReadWriteCapability(provider) && !(bufferOrReadableOrStream instanceof BinaryBuffer)) { if (isReadableStream(bufferOrReadableOrStream)) { - bufferOrReadableOrStream = await consumeStreamWithLimit(bufferOrReadableOrStream, data => BinaryBuffer.concat(data), 3); + const bufferedStream = await peekStream(bufferOrReadableOrStream, 3); + if (bufferedStream.ended) { + bufferOrReadableOrStreamOrBufferedStream = BinaryBuffer.concat(bufferedStream.buffer); + } else { + bufferOrReadableOrStreamOrBufferedStream = bufferedStream; + } } else { - bufferOrReadableOrStream = consumeReadableWithLimit(bufferOrReadableOrStream, data => BinaryBuffer.concat(data), 3); + bufferOrReadableOrStreamOrBufferedStream = peekReadable(bufferOrReadableOrStream, data => BinaryBuffer.concat(data), 3); } + } else { + bufferOrReadableOrStreamOrBufferedStream = bufferOrReadableOrStream; } // write file: unbuffered (only if data to write is a buffer, or the provider has no buffered write capability) - if (!hasOpenReadWriteCloseCapability(provider) || (hasReadWriteCapability(provider) && bufferOrReadableOrStream instanceof BinaryBuffer)) { - await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStream); + if (!hasOpenReadWriteCloseCapability(provider) || (hasReadWriteCapability(provider) && bufferOrReadableOrStreamOrBufferedStream instanceof BinaryBuffer)) { + await this.doWriteUnbuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream); } // write file: buffered else { - await this.doWriteBuffered(provider, resource, bufferOrReadableOrStream instanceof BinaryBuffer ? BinaryBufferReadable.fromBuffer(bufferOrReadableOrStream) : bufferOrReadableOrStream); + await this.doWriteBuffered(provider, resource, bufferOrReadableOrStreamOrBufferedStream instanceof BinaryBuffer ? BinaryBufferReadable.fromBuffer(bufferOrReadableOrStreamOrBufferedStream) : bufferOrReadableOrStreamOrBufferedStream); } } catch (error) { this.rethrowAsFileOperationError('Unable to write file', resource, error, options); @@ -764,9 +827,15 @@ export class FileService { let fileStreamPromise: Promise; // read unbuffered (only if either preferred, or the provider has no buffered read capability) - if (!hasOpenReadWriteCloseCapability(provider) || (hasReadWriteCapability(provider) && options?.preferUnbuffered)) { + if (!(hasOpenReadWriteCloseCapability(provider) || hasFileReadStreamCapability(provider)) || (hasReadWriteCapability(provider) && options?.preferUnbuffered)) { fileStreamPromise = this.readFileUnbuffered(provider, resource, options); } + + // read streamed (always prefer over primitive buffered read) + else if (hasFileReadStreamCapability(provider)) { + fileStreamPromise = Promise.resolve(this.readFileStreamed(provider, resource, cancellableSource.token, options)); + } + // read buffered else { fileStreamPromise = Promise.resolve(this.readFileBuffered(provider, resource, cancellableSource.token, options)); @@ -783,22 +852,27 @@ export class FileService { } } - private readFileBuffered(provider: FileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, token: CancellationToken, options: ReadFileOptions = Object.create(null)): BinaryBufferReadableStream { - const fileStream = createReadStream(provider, resource, { - ...options, - bufferSize: this.BUFFER_SIZE - }, token); - - return this.transformFileReadStream(resource, fileStream, options); - } + private readFileStreamed(provider: FileSystemProviderWithFileReadStreamCapability, resource: URI, token: CancellationToken, options: ReadFileOptions = Object.create(null)): BinaryBufferReadableStream { + const fileStream = provider.readFileStream(resource, options, token); - private transformFileReadStream(resource: URI, stream: ReadableStreamEvents, options: ReadFileOptions): BinaryBufferReadableStream { - return transform(stream, { + return transform(fileStream, { data: data => data instanceof BinaryBuffer ? data : BinaryBuffer.wrap(data), error: error => this.asFileOperationError('Unable to read file', resource, error, options) }, data => BinaryBuffer.concat(data)); } + private readFileBuffered(provider: FileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, token: CancellationToken, options: ReadFileOptions = Object.create(null)): BinaryBufferReadableStream { + const stream = BinaryBufferWriteableStream.create(); + + readFileIntoStream(provider, resource, stream, data => data, { + ...options, + bufferSize: this.BUFFER_SIZE, + errorTransformer: error => this.asFileOperationError('Unable to read file', resource, error, options) + }, token); + + return stream; + } + protected rethrowAsFileOperationError(message: string, resource: URI, error: Error, options?: ReadFileOptions & WriteFileOptions & CreateFileOptions): never { throw this.asFileOperationError(message, resource, error, options); } @@ -822,6 +896,9 @@ export class FileService { buffer = buffer.slice(0, options.length); } + // Throw if file is too large to load + this.validateReadFileLimits(resource, buffer.byteLength, options); + return BinaryBufferReadableStream.fromBuffer(BinaryBuffer.wrap(buffer)); } @@ -1277,7 +1354,7 @@ export class FileService { return isPathCaseSensitive ? resource.toString() : resource.toString().toLowerCase(); } - private async doWriteBuffered(provider: FileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStream: BinaryBufferReadable | BinaryBufferReadableStream): Promise { + private async doWriteBuffered(provider: FileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, readableOrStreamOrBufferedStream: BinaryBufferReadable | BinaryBufferReadableStream | BinaryBufferReadableBufferedStream): Promise { return this.ensureWriteQueue(provider, resource, async () => { // open handle @@ -1285,10 +1362,10 @@ export class FileService { // write into handle until all bytes from buffer have been written try { - if (isReadableStream(readableOrStream)) { - await this.doWriteStreamBufferedQueued(provider, handle, readableOrStream); + if (isReadableStream(readableOrStreamOrBufferedStream) || isReadableBufferedStream(readableOrStreamOrBufferedStream)) { + await this.doWriteStreamBufferedQueued(provider, handle, readableOrStreamOrBufferedStream); } else { - await this.doWriteReadableBufferedQueued(provider, handle, readableOrStream); + await this.doWriteReadableBufferedQueued(provider, handle, readableOrStreamOrBufferedStream); } } catch (error) { throw ensureFileSystemProviderError(error); @@ -1300,9 +1377,34 @@ export class FileService { }); } - private doWriteStreamBufferedQueued(provider: FileSystemProviderWithOpenReadWriteCloseCapability, handle: number, stream: BinaryBufferReadableStream): Promise { - return new Promise((resolve, reject) => { - let posInFile = 0; + private async doWriteStreamBufferedQueued(provider: FileSystemProviderWithOpenReadWriteCloseCapability, handle: number, streamOrBufferedStream: BinaryBufferReadableStream | BinaryBufferReadableBufferedStream): Promise { + let posInFile = 0; + let stream: BinaryBufferReadableStream; + + // Buffered stream: consume the buffer first by writing + // it to the target before reading from the stream. + if (isReadableBufferedStream(streamOrBufferedStream)) { + if (streamOrBufferedStream.buffer.length > 0) { + const chunk = BinaryBuffer.concat(streamOrBufferedStream.buffer); + await this.doWriteBuffer(provider, handle, chunk, chunk.byteLength, posInFile, 0); + + posInFile += chunk.byteLength; + } + + // If the stream has been consumed, return early + if (streamOrBufferedStream.ended) { + return; + } + + stream = streamOrBufferedStream.stream; + } + + // Unbuffered stream - just take as is + else { + stream = streamOrBufferedStream; + } + + return new Promise(async (resolve, reject) => { stream.on('data', async chunk => { @@ -1348,18 +1450,20 @@ export class FileService { } } - private async doWriteUnbuffered(provider: FileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: BinaryBuffer | BinaryBufferReadable | BinaryBufferReadableStream): Promise { - return this.ensureWriteQueue(provider, resource, () => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStream)); + private async doWriteUnbuffered(provider: FileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: BinaryBuffer | BinaryBufferReadable | BinaryBufferReadableStream | BinaryBufferReadableBufferedStream): Promise { + return this.ensureWriteQueue(provider, resource, () => this.doWriteUnbufferedQueued(provider, resource, bufferOrReadableOrStreamOrBufferedStream)); } - private async doWriteUnbufferedQueued(provider: FileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStream: BinaryBuffer | BinaryBufferReadable | BinaryBufferReadableStream): Promise { + private async doWriteUnbufferedQueued(provider: FileSystemProviderWithFileReadWriteCapability, resource: URI, bufferOrReadableOrStreamOrBufferedStream: BinaryBuffer | BinaryBufferReadable | BinaryBufferReadableStream | BinaryBufferReadableBufferedStream): Promise { let buffer: BinaryBuffer; - if (bufferOrReadableOrStream instanceof BinaryBuffer) { - buffer = bufferOrReadableOrStream; - } else if (isReadableStream(bufferOrReadableOrStream)) { - buffer = await BinaryBufferReadableStream.toBuffer(bufferOrReadableOrStream); + if (bufferOrReadableOrStreamOrBufferedStream instanceof BinaryBuffer) { + buffer = bufferOrReadableOrStreamOrBufferedStream; + } else if (isReadableStream(bufferOrReadableOrStreamOrBufferedStream)) { + buffer = await BinaryBufferReadableStream.toBuffer(bufferOrReadableOrStreamOrBufferedStream); + } else if (isReadableBufferedStream(bufferOrReadableOrStreamOrBufferedStream)) { + buffer = await BinaryBufferReadableBufferedStream.toBuffer(bufferOrReadableOrStreamOrBufferedStream); } else { - buffer = BinaryBufferReadable.toBuffer(bufferOrReadableOrStream); + buffer = BinaryBufferReadable.toBuffer(bufferOrReadableOrStreamOrBufferedStream); } return provider.writeFile(resource, buffer.buffer, { create: true, overwrite: true }); diff --git a/packages/filesystem/src/common/delegating-file-system-provider.ts b/packages/filesystem/src/common/delegating-file-system-provider.ts index eb0f58b613fce..2078392131f10 100644 --- a/packages/filesystem/src/common/delegating-file-system-provider.ts +++ b/packages/filesystem/src/common/delegating-file-system-provider.ts @@ -15,13 +15,16 @@ ********************************************************************************/ import URI from '@theia/core/lib/common/uri'; -import { Event, Emitter } from '@theia/core/lib/common'; +import { Event, Emitter, CancellationToken } from '@theia/core/lib/common'; import { Disposable, DisposableCollection } from '@theia/core/lib/common/disposable'; import { FileSystemProvider, FileSystemProviderCapabilities, WatchOptions, FileDeleteOptions, FileOverwriteOptions, FileWriteOptions, FileOpenOptions, FileChange, Stat, FileType, - hasReadWriteCapability, hasFileFolderCopyCapability, hasOpenReadWriteCloseCapability, hasAccessCapability, FileUpdateOptions, hasUpdateCapability, FileUpdateResult + hasReadWriteCapability, hasFileFolderCopyCapability, hasOpenReadWriteCloseCapability, hasAccessCapability, FileUpdateOptions, hasUpdateCapability, FileUpdateResult, + FileReadStreamOptions, + hasFileReadStreamCapability } from './files'; import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol'; +import { ReadableStreamEvents } from '@theia/core/lib/common/stream'; export class DelegatingFileSystemProvider implements Required, Disposable { @@ -92,6 +95,13 @@ export class DelegatingFileSystemProvider implements Required { + if (hasFileReadStreamCapability(this.delegate)) { + return this.delegate.readFileStream(this.options.uriConverter.to(resource), opts, token); + } + throw new Error('not supported'); + } + readdir(resource: URI): Promise<[string, FileType][]> { return this.delegate.readdir(this.options.uriConverter.to(resource)); } diff --git a/packages/filesystem/src/common/files.ts b/packages/filesystem/src/common/files.ts index b521ca006fa61..49eeb7c2b0aab 100644 --- a/packages/filesystem/src/common/files.ts +++ b/packages/filesystem/src/common/files.ts @@ -24,6 +24,8 @@ import { Event } from '@theia/core/lib/common/event'; import { Disposable as IDisposable } from '@theia/core/lib/common/disposable'; import { BinaryBuffer, BinaryBufferReadableStream } from '@theia/core/lib/common/buffer'; import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol'; +import { ReadableStreamEvents } from '@theia/core/lib/common/stream'; +import { CancellationToken } from '@theia/core/lib/common/cancellation'; export const enum FileOperation { CREATE, @@ -507,6 +509,7 @@ export interface WatchOptions { export const enum FileSystemProviderCapabilities { FileReadWrite = 1 << 1, FileOpenReadWriteClose = 1 << 2, + FileReadStream = 1 << 4, FileFolderCopy = 1 << 3, @@ -524,6 +527,8 @@ export enum FileSystemProviderErrorCode { FileNotFound = 'EntryNotFound', FileNotADirectory = 'EntryNotADirectory', FileIsADirectory = 'EntryIsADirectory', + FileExceedsMemoryLimit = 'EntryExceedsMemoryLimit', + FileTooLarge = 'EntryTooLarge', NoPermissions = 'NoPermissions', Unavailable = 'Unavailable', Unknown = 'Unknown' @@ -572,6 +577,8 @@ export interface FileSystemProvider { readFile?(resource: URI): Promise; writeFile?(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise; + readFileStream?(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents; + open?(resource: URI, opts: FileOpenOptions): Promise; close?(fd: number): Promise; read?(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise; @@ -628,6 +635,14 @@ export function hasOpenReadWriteCloseCapability(provider: FileSystemProvider): p return !!(provider.capabilities & FileSystemProviderCapabilities.FileOpenReadWriteClose); } +export interface FileSystemProviderWithFileReadStreamCapability extends FileSystemProvider { + readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents; +} + +export function hasFileReadStreamCapability(provider: FileSystemProvider): provider is FileSystemProviderWithFileReadStreamCapability { + return !!(provider.capabilities & FileSystemProviderCapabilities.FileReadStream); +} + export function markAsFileSystemProviderError(error: Error, code: FileSystemProviderErrorCode): Error { error.name = code ? `${code} (FileSystemError)` : 'FileSystemError'; @@ -658,6 +673,8 @@ export function toFileSystemProviderErrorCode(error: Error | undefined | null): case FileSystemProviderErrorCode.FileIsADirectory: return FileSystemProviderErrorCode.FileIsADirectory; case FileSystemProviderErrorCode.FileNotADirectory: return FileSystemProviderErrorCode.FileNotADirectory; case FileSystemProviderErrorCode.FileNotFound: return FileSystemProviderErrorCode.FileNotFound; + case FileSystemProviderErrorCode.FileExceedsMemoryLimit: return FileSystemProviderErrorCode.FileExceedsMemoryLimit; + case FileSystemProviderErrorCode.FileTooLarge: return FileSystemProviderErrorCode.FileTooLarge; case FileSystemProviderErrorCode.NoPermissions: return FileSystemProviderErrorCode.NoPermissions; case FileSystemProviderErrorCode.Unavailable: return FileSystemProviderErrorCode.Unavailable; } @@ -684,6 +701,10 @@ export function toFileOperationResult(error: Error): FileOperationResult { return FileOperationResult.FILE_PERMISSION_DENIED; case FileSystemProviderErrorCode.FileExists: return FileOperationResult.FILE_MOVE_CONFLICT; + case FileSystemProviderErrorCode.FileExceedsMemoryLimit: + return FileOperationResult.FILE_EXCEEDS_MEMORY_LIMIT; + case FileSystemProviderErrorCode.FileTooLarge: + return FileOperationResult.FILE_TOO_LARGE; default: return FileOperationResult.FILE_OTHER_ERROR; } diff --git a/packages/filesystem/src/common/io.ts b/packages/filesystem/src/common/io.ts index 347013e496abf..3af27d4e9900b 100644 --- a/packages/filesystem/src/common/io.ts +++ b/packages/filesystem/src/common/io.ts @@ -22,9 +22,10 @@ /* eslint-disable max-len */ import URI from '@theia/core/lib/common/uri'; -import { BinaryBuffer, BinaryBufferWriteableStream, BinaryBufferReadableStream } from '@theia/core/lib/common//buffer'; +import { BinaryBuffer } from '@theia/core/lib/common//buffer'; import { CancellationToken, cancelled as canceled } from '@theia/core/lib/common/cancellation'; -import { FileSystemProviderWithOpenReadWriteCloseCapability, FileReadStreamOptions, ensureFileSystemProviderError } from './files'; +import { FileSystemProviderWithOpenReadWriteCloseCapability, FileReadStreamOptions, ensureFileSystemProviderError, createFileSystemProviderError, FileSystemProviderErrorCode } from './files'; +import { WriteableStream, ErrorTransformer, DataTransformer } from '@theia/core/lib/common/stream'; export interface CreateReadStreamOptions extends FileReadStreamOptions { @@ -32,20 +33,40 @@ export interface CreateReadStreamOptions extends FileReadStreamOptions { * The size of the buffer to use before sending to the stream. */ bufferSize: number; -} -export function createReadStream(provider: FileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, options: CreateReadStreamOptions, token?: CancellationToken): BinaryBufferReadableStream { - const stream = BinaryBufferWriteableStream.create(); + /** + * Allows to massage any possibly error that happens during reading. + */ + errorTransformer?: ErrorTransformer; +} - // do not await reading but simply return the stream directly since it operates - // via events. finally end the stream and send through the possible error +/** + * A helper to read a file from a provider with open/read/close capability into a stream. + */ +export async function readFileIntoStream( + provider: FileSystemProviderWithOpenReadWriteCloseCapability, + resource: URI, + target: WriteableStream, + transformer: DataTransformer, + options: CreateReadStreamOptions, + token: CancellationToken +): Promise { + let error: Error | undefined = undefined; - doReadFileIntoStream(provider, resource, stream, options, token).then(() => stream.end(), error => stream.end(error)); + try { + await doReadFileIntoStream(provider, resource, target, transformer, options, token); + } catch (err) { + error = err; + } finally { + if (error && options.errorTransformer) { + error = options.errorTransformer(error); + } - return stream; + target.end(error); + } } -async function doReadFileIntoStream(provider: FileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, stream: BinaryBufferWriteableStream, options: CreateReadStreamOptions, token?: CancellationToken): Promise { +async function doReadFileIntoStream(provider: FileSystemProviderWithOpenReadWriteCloseCapability, resource: URI, target: WriteableStream, transformer: DataTransformer, options: CreateReadStreamOptions, token: CancellationToken): Promise { // Check for cancellation throwIfCancelled(token); @@ -57,6 +78,7 @@ async function doReadFileIntoStream(provider: FileSystemProviderWithOpenReadWrit throwIfCancelled(token); try { + let totalBytesRead = 0; let bytesRead = 0; let allowedRemainingBytes = (options && typeof options.length === 'number') ? options.length : undefined; @@ -71,6 +93,7 @@ async function doReadFileIntoStream(provider: FileSystemProviderWithOpenReadWrit posInFile += bytesRead; posInBuffer += bytesRead; + totalBytesRead += bytesRead; if (typeof allowedRemainingBytes === 'number') { allowedRemainingBytes -= bytesRead; @@ -78,13 +101,13 @@ async function doReadFileIntoStream(provider: FileSystemProviderWithOpenReadWrit // when buffer full, create a new one and emit it through stream if (posInBuffer === buffer.byteLength) { - stream.write(buffer); + await target.write(transformer(buffer)); buffer = BinaryBuffer.alloc(Math.min(options.bufferSize, typeof allowedRemainingBytes === 'number' ? allowedRemainingBytes : options.bufferSize)); posInBuffer = 0; } - } while (bytesRead > 0 && (typeof allowedRemainingBytes !== 'number' || allowedRemainingBytes > 0) && throwIfCancelled(token)); + } while (bytesRead > 0 && (typeof allowedRemainingBytes !== 'number' || allowedRemainingBytes > 0) && throwIfCancelled(token) && throwIfTooLarge(totalBytesRead, options)); // wrap up with last buffer (also respect maxBytes if provided) if (posInBuffer > 0) { @@ -93,7 +116,7 @@ async function doReadFileIntoStream(provider: FileSystemProviderWithOpenReadWrit lastChunkLength = Math.min(posInBuffer, allowedRemainingBytes); } - stream.write(buffer.slice(0, lastChunkLength)); + target.write(transformer(buffer.slice(0, lastChunkLength))); } } catch (error) { throw ensureFileSystemProviderError(error); @@ -102,10 +125,26 @@ async function doReadFileIntoStream(provider: FileSystemProviderWithOpenReadWrit } } -function throwIfCancelled(token?: CancellationToken): boolean { - if (token && token.isCancellationRequested) { +function throwIfCancelled(token: CancellationToken): boolean { + if (token.isCancellationRequested) { throw canceled(); } return true; } + +function throwIfTooLarge(totalBytesRead: number, options: CreateReadStreamOptions): boolean { + + // Return early if file is too large to load and we have configured limits + if (options?.limits) { + if (typeof options.limits.memory === 'number' && totalBytesRead > options.limits.memory) { + throw createFileSystemProviderError('To open a file of this size, you need to restart and allow it to use more memory', FileSystemProviderErrorCode.FileExceedsMemoryLimit); + } + + if (typeof options.limits.size === 'number' && totalBytesRead > options.limits.size) { + throw createFileSystemProviderError('File is too large to open', FileSystemProviderErrorCode.FileTooLarge); + } + } + + return true; +} diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index d2617de3e9c2c..a9cf15e4b06ef 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -23,12 +23,14 @@ import { FileWriteOptions, FileOpenOptions, FileChangeType, FileSystemProviderCapabilities, FileChange, Stat, FileOverwriteOptions, WatchOptions, FileType, FileSystemProvider, FileDeleteOptions, hasOpenReadWriteCloseCapability, hasFileFolderCopyCapability, hasReadWriteCapability, hasAccessCapability, - FileSystemProviderError, FileSystemProviderErrorCode, FileUpdateOptions, hasUpdateCapability, FileUpdateResult + FileSystemProviderError, FileSystemProviderErrorCode, FileUpdateOptions, hasUpdateCapability, FileUpdateResult, FileReadStreamOptions, hasFileReadStreamCapability } from './files'; import { JsonRpcServer, JsonRpcProxy, JsonRpcProxyFactory } from '@theia/core/lib/common/messaging/proxy-factory'; import { ApplicationError } from '@theia/core/lib/common/application-error'; import { Deferred } from '@theia/core/lib/common/promise-util'; import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol'; +import { newWriteableStream, ReadableStreamEvents } from '@theia/core/lib/common/stream'; +import { CancellationToken, cancelled } from '@theia/core/lib/common/cancellation'; export const remoteFileSystemPath = '/services/remote-filesystem'; @@ -41,6 +43,7 @@ export interface RemoteFileSystemServer extends JsonRpcServer; close(fd: number): Promise; read(fd: number, pos: number, length: number): Promise<{ bytes: number[]; bytesRead: number; }>; + readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise; readFile(resource: string): Promise; write(fd: number, pos: number, data: number[], offset: number, length: number): Promise; writeFile(resource: string, content: number[], opts: FileWriteOptions): Promise; @@ -59,9 +62,15 @@ export interface RemoteFileChange { readonly resource: string; } +export interface RemoteFileStreamError extends Error { + code?: FileSystemProviderErrorCode +} + export interface RemoteFileSystemClient { notifyDidChangeFile(event: { changes: RemoteFileChange[] }): void; notifyDidChangeCapabilities(capabilities: FileSystemProviderCapabilities): void; + onFileStreamData(handle: number, data: number[]): void; + onFileStreamEnd(handle: number, error: RemoteFileStreamError | undefined): void; } export const RemoteFileSystemProviderError = ApplicationError.declare(-33005, @@ -102,9 +111,17 @@ export class RemoteFileSystemProvider implements Required, D private readonly onDidChangeCapabilitiesEmitter = new Emitter(); readonly onDidChangeCapabilities = this.onDidChangeCapabilitiesEmitter.event; + private readonly onFileStreamDataEmitter = new Emitter<[number, Uint8Array]>(); + private readonly onFileStreamData = this.onFileStreamDataEmitter.event; + + private readonly onFileStreamEndEmitter = new Emitter<[number, Error | FileSystemProviderError | undefined]>(); + private readonly onFileStreamEnd = this.onFileStreamEndEmitter.event; + protected readonly toDispose = new DisposableCollection( this.onDidChangeFileEmitter, - this.onDidChangeCapabilitiesEmitter + this.onDidChangeCapabilitiesEmitter, + this.onFileStreamDataEmitter, + this.onFileStreamEndEmitter ); protected watcherSequence = 0; @@ -134,7 +151,9 @@ export class RemoteFileSystemProvider implements Required, D notifyDidChangeFile: ({ changes }) => { this.onDidChangeFileEmitter.fire(changes.map(event => ({ resource: new URI(event.resource), type: event.type }))); }, - notifyDidChangeCapabilities: capabilities => this.setCapabilities(capabilities) + notifyDidChangeCapabilities: capabilities => this.setCapabilities(capabilities), + onFileStreamData: (handle, data) => this.onFileStreamDataEmitter.fire([handle, Uint8Array.from(data)]), + onFileStreamEnd: (handle, error) => this.onFileStreamEndEmitter.fire([handle, error]) }); const onInitialized = this.server.onDidOpenConnection(() => { // skip reconnection on the first connection @@ -191,6 +210,42 @@ export class RemoteFileSystemProvider implements Required, D return Uint8Array.from(bytes); } + readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents { + const capturedError = new Error(); + // eslint-disable-next-line no-shadow + const stream = newWriteableStream(data => BinaryBuffer.concat(data.map(data => BinaryBuffer.wrap(data))).buffer); + this.server.readFileStream(resource.toString(), opts, token).then(streamHandle => { + if (token.isCancellationRequested) { + stream.end(cancelled()); + return; + } + const toDispose = new DisposableCollection( + token.onCancellationRequested(() => stream.end(cancelled())), + this.onFileStreamData(([handle, data]) => { + if (streamHandle === handle) { + stream.write(data); + } + }), + this.onFileStreamEnd(([handle, error]) => { + if (streamHandle === handle) { + if (error) { + const code = ('code' in error && error.code) || FileSystemProviderErrorCode.Unknown; + const fileOperationError = new FileSystemProviderError(error.message, code); + fileOperationError.name = error.name; + const capturedStack = capturedError.stack || ''; + fileOperationError.stack = `${capturedStack}\nCaused by: ${error.stack}`; + stream.end(fileOperationError); + } else { + stream.end(); + } + } + }) + ); + stream.on('end', () => toDispose.dispose()); + }, error => stream.end(error)); + return stream; + } + write(fd: number, pos: number, data: Uint8Array, offset: number, length: number): Promise { return this.server.write(fd, pos, [...data.values()], offset, length); } @@ -399,4 +454,23 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { } } + protected readFileStreamSeq = 0; + + async readFileStream(resource: string, opts: FileReadStreamOptions, token: CancellationToken): Promise { + if (hasFileReadStreamCapability(this.provider)) { + const handle = this.readFileStreamSeq++; + const stream = this.provider.readFileStream(new URI(resource), opts, token); + stream.on('data', data => this.client?.onFileStreamData(handle, [...data.values()])); + stream.on('error', error => { + const code = error instanceof FileSystemProviderError ? error.code : undefined; + const { name, message, stack } = error; + // eslint-disable-next-line no-unused-expressions + this.client?.onFileStreamEnd(handle, { code, name, message, stack }); + }); + stream.on('end', () => this.client?.onFileStreamEnd(handle, undefined)); + return handle; + } + throw new Error('not supported'); + } + } diff --git a/packages/filesystem/src/node/disk-file-system-provider.ts b/packages/filesystem/src/node/disk-file-system-provider.ts index 7d14ac60ae53b..e9e4032f0636a 100644 --- a/packages/filesystem/src/node/disk-file-system-provider.ts +++ b/packages/filesystem/src/node/disk-file-system-provider.ts @@ -54,7 +54,7 @@ import { FileSystemProviderError, FileChange, WatchOptions, - FileUpdateOptions, FileUpdateResult + FileUpdateOptions, FileUpdateResult, FileReadStreamOptions } from '../common/files'; import { FileSystemWatcherServer } from '../common/filesystem-watcher-protocol'; import trash = require('trash'); @@ -62,6 +62,9 @@ import { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol'; import { TextDocument } from 'vscode-languageserver-textdocument'; import { EncodingService } from '@theia/core/lib/common/encoding-service'; import { BinaryBuffer } from '@theia/core/lib/common/buffer'; +import { ReadableStreamEvents, newWriteableStream } from '@theia/core/lib/common/stream'; +import { CancellationToken } from '@theia/core/lib/common/cancellation'; +import { readFileIntoStream } from '../common/io'; export namespace DiskFileSystemProvider { export interface StatAndLink { @@ -87,6 +90,8 @@ export class DiskFileSystemProvider implements Disposable, FileSystemProviderWithOpenReadWriteCloseCapability, FileSystemProviderWithFileFolderCopyCapability { + private readonly BUFFER_SIZE = 64 * 1024; + private readonly onDidChangeFileEmitter = new Emitter(); readonly onDidChangeFile = this.onDidChangeFileEmitter.event; @@ -121,6 +126,7 @@ export class DiskFileSystemProvider implements Disposable, this._capabilities = FileSystemProviderCapabilities.FileReadWrite | FileSystemProviderCapabilities.FileOpenReadWriteClose | + FileSystemProviderCapabilities.FileReadStream | FileSystemProviderCapabilities.FileFolderCopy | FileSystemProviderCapabilities.Access | FileSystemProviderCapabilities.Trash | @@ -255,6 +261,17 @@ export class DiskFileSystemProvider implements Disposable, } } + readFileStream(resource: URI, opts: FileReadStreamOptions, token: CancellationToken): ReadableStreamEvents { + const stream = newWriteableStream(data => BinaryBuffer.concat(data.map(data => BinaryBuffer.wrap(data))).buffer); + + readFileIntoStream(this, resource, stream, data => data.buffer, { + ...opts, + bufferSize: this.BUFFER_SIZE + }, token); + + return stream; + } + async writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): Promise { let handle: number | undefined = undefined; try { From b2097c14881c962a71dea86e04a99b4aa9904e10 Mon Sep 17 00:00:00 2001 From: Anton Kosyakov Date: Fri, 10 Jul 2020 13:22:32 +0000 Subject: [PATCH 4/4] [monaco] stream content to avoid blocking the backend and network by large files Signed-off-by: Anton Kosyakov --- packages/core/src/common/resource.ts | 39 +++++++++++-- packages/core/src/common/stream.ts | 8 +++ .../src/browser/editor-preview-manager.ts | 11 +++- .../filesystem/src/browser/file-resource.ts | 57 ++++++++++++++++++- .../monaco/src/browser/monaco-editor-model.ts | 57 ++++++++++++------- packages/monaco/src/browser/monaco-loader.ts | 6 +- packages/monaco/src/typings/monaco/index.d.ts | 31 ++++++++++ 7 files changed, 177 insertions(+), 32 deletions(-) diff --git a/packages/core/src/common/resource.ts b/packages/core/src/common/resource.ts index 6530f4c7b8e2e..ffec243ec0ead 100644 --- a/packages/core/src/common/resource.ts +++ b/packages/core/src/common/resource.ts @@ -23,6 +23,7 @@ import { Disposable } from './disposable'; import { MaybePromise } from './types'; import { CancellationToken } from './cancellation'; import { ApplicationError } from './application-error'; +import { ReadableStream, Readable } from './stream'; export interface ResourceVersion { } @@ -62,6 +63,15 @@ export interface Resource extends Disposable { * @throws `ResourceError.NotFound` if a resource not found */ readContents(options?: ResourceReadOptions): Promise; + /** + * Stream latest content of this resource. + * + * If a resource supports versioning it updates version to latest. + * If a resource supports encoding it updates encoding to latest. + * + * @throws `ResourceError.NotFound` if a resource not found + */ + readStream?(options?: ResourceReadOptions): Promise>; /** * Rewrites the complete content for this resource. * If a resource does not exist it will be created. @@ -74,6 +84,18 @@ export interface Resource extends Disposable { * @throws `ResourceError.OutOfSync` if latest resource version is out of sync with the given */ saveContents?(content: string, options?: ResourceSaveOptions): Promise; + /** + * Rewrites the complete content for this resource. + * If a resource does not exist it will be created. + * + * If a resource supports versioning clients can pass some version + * to check against it, if it is not provided latest version is used. + * + * It updates version and encoding to latest. + * + * @throws `ResourceError.OutOfSync` if latest resource version is out of sync with the given + */ + saveStream?(content: Readable, options?: ResourceSaveOptions): Promise; /** * Applies incremental content changes to this resource. * @@ -90,7 +112,8 @@ export interface Resource extends Disposable { } export namespace Resource { export interface SaveContext { - content: string + contentLength: number + content: string | Readable changes?: TextDocumentContentChangeEvent[] options?: ResourceSaveOptions } @@ -104,10 +127,15 @@ export namespace Resource { if (token && token.isCancellationRequested) { return; } - await resource.saveContents(context.content, context.options); + if (typeof context.content !== 'string' && resource.saveStream) { + await resource.saveStream(context.content, context.options); + } else { + const content = typeof context.content === 'string' ? context.content : Readable.toString(context.content); + await resource.saveContents(content, context.options); + } } export async function trySaveContentChanges(resource: Resource, context: SaveContext): Promise { - if (!context.changes || !resource.saveContentChanges || shouldSaveContent(context)) { + if (!context.changes || !resource.saveContentChanges || shouldSaveContent(resource, context)) { return false; } try { @@ -120,12 +148,11 @@ export namespace Resource { return false; } } - export function shouldSaveContent({ content, changes }: SaveContext): boolean { - if (!changes) { + export function shouldSaveContent(resource: Resource, { contentLength, changes }: SaveContext): boolean { + if (!changes || (resource.saveStream && contentLength > 32 * 1024 * 1024)) { return true; } let contentChangesLength = 0; - const contentLength = content.length; for (const change of changes) { contentChangesLength += JSON.stringify(change).length; if (contentChangesLength > contentLength) { diff --git a/packages/core/src/common/stream.ts b/packages/core/src/common/stream.ts index 615065d6f8054..3f5131d2e4293 100644 --- a/packages/core/src/common/stream.ts +++ b/packages/core/src/common/stream.ts @@ -103,6 +103,14 @@ export namespace Readable { } }; } + export function toString(readable: Readable): string { + let result = ''; + let chunk: string | null; + while ((chunk = readable.read()) != null) { + result += chunk; + } + return result; + } } /** diff --git a/packages/editor-preview/src/browser/editor-preview-manager.ts b/packages/editor-preview/src/browser/editor-preview-manager.ts index 2079e1ac257b1..2515fa59e7b67 100644 --- a/packages/editor-preview/src/browser/editor-preview-manager.ts +++ b/packages/editor-preview/src/browser/editor-preview-manager.ts @@ -140,8 +140,15 @@ export class EditorPreviewManager extends WidgetOpenHandler { - return this.currentEditorPreview = super.open(uri, options) as Promise; + protected openNewPreview(uri: URI, options: PreviewEditorOpenerOptions): Promise { + const result = super.open(uri, options); + this.currentEditorPreview = result.then(widget => { + if (widget instanceof EditorPreviewWidget) { + return widget; + } + return undefined; + }, () => undefined); + return result; } protected createWidgetOptions(uri: URI, options?: WidgetOpenerOptions): EditorPreviewWidgetOptions { diff --git a/packages/filesystem/src/browser/file-resource.ts b/packages/filesystem/src/browser/file-resource.ts index 274267dfaea88..ccb98f764e747 100644 --- a/packages/filesystem/src/browser/file-resource.ts +++ b/packages/filesystem/src/browser/file-resource.ts @@ -18,6 +18,7 @@ import { injectable, inject } from 'inversify'; import { Resource, ResourceVersion, ResourceResolver, ResourceError, ResourceSaveOptions } from '@theia/core/lib/common/resource'; import { DisposableCollection } from '@theia/core/lib/common/disposable'; import { Emitter, Event } from '@theia/core/lib/common/event'; +import { Readable, ReadableStream } from '@theia/core/lib/common/stream'; import URI from '@theia/core/lib/common/uri'; import { FileOperation, FileOperationError, FileOperationResult, ETAG_DISABLED, FileSystemProviderCapabilities, FileReadStreamOptions, BinarySize } from '../common/files'; import { FileService, TextFileOperationError, TextFileOperationResult } from './file-service'; @@ -135,7 +136,59 @@ export class FileResource implements Resource { } } - async saveContents(content: string, options?: ResourceSaveOptions): Promise { + async readStream(options?: { encoding?: string }): Promise> { + try { + const encoding = options?.encoding || this.version?.encoding; + const stat = await this.fileService.readStream(this.uri, { + encoding, + etag: ETAG_DISABLED, + acceptTextOnly: this.acceptTextOnly, + limits: this.limits + }); + this._version = { + encoding: stat.encoding, + etag: stat.etag, + mtime: stat.mtime + }; + return stat.value; + } catch (e) { + if (e instanceof TextFileOperationError && e.textFileOperationResult === TextFileOperationResult.FILE_IS_BINARY) { + if (await this.shouldOpenAsText('The file is either binary or uses an unsupported text encoding.')) { + this.acceptTextOnly = false; + return this.readStream(options); + } + } else if (e instanceof FileOperationError && e.fileOperationResult === FileOperationResult.FILE_TOO_LARGE) { + const stat = await this.fileService.resolve(this.uri, { resolveMetadata: true }); + const maxFileSize = GENERAL_MAX_FILE_SIZE_MB * 1024 * 1024; + if (this.limits?.size !== maxFileSize && await this.shouldOpenAsText(`The file is too large (${BinarySize.formatSize(stat.size)}).`)) { + this.limits = { + size: maxFileSize + }; + return this.readStream(options); + } + } else if (e instanceof FileOperationError && e.fileOperationResult === FileOperationResult.FILE_NOT_FOUND) { + this._version = undefined; + const { message, stack } = e; + throw ResourceError.NotFound({ + message, stack, + data: { + uri: this.uri + } + }); + } + throw e; + } + } + + saveContents(content: string, options?: ResourceSaveOptions): Promise { + return this.doWrite(content, options); + } + + saveStream(content: Readable, options?: ResourceSaveOptions): Promise { + return this.doWrite(content, options); + } + + protected async doWrite(content: string | Readable, options?: ResourceSaveOptions): Promise { const version = options?.version || this._version; const current = FileResourceVersion.is(version) ? version : undefined; const etag = current?.etag; @@ -154,7 +207,7 @@ export class FileResource implements Resource { } catch (e) { if (e instanceof FileOperationError && e.fileOperationResult === FileOperationResult.FILE_MODIFIED_SINCE) { if (etag !== ETAG_DISABLED && await this.shouldOverwrite()) { - return this.saveContents(content, { ...options, version: { stat: { ...current, etag: ETAG_DISABLED } } }); + return this.doWrite(content, { ...options, version: { stat: { ...current, etag: ETAG_DISABLED } } }); } const { message, stack } = e; throw ResourceError.OutOfSync({ message, stack, data: { uri: this.uri } }); diff --git a/packages/monaco/src/browser/monaco-editor-model.ts b/packages/monaco/src/browser/monaco-editor-model.ts index 22e7fac12d522..6e45ecf41bd87 100644 --- a/packages/monaco/src/browser/monaco-editor-model.ts +++ b/packages/monaco/src/browser/monaco-editor-model.ts @@ -92,8 +92,7 @@ export class MonacoEditorModel implements ITextEditorModel, TextEditorDocument { this.toDispose.push(Disposable.create(() => this.cancelSave())); this.toDispose.push(Disposable.create(() => this.cancelSync())); this.resolveModel = this.readContents().then( - content => this.initialize(content || ''), - e => console.error(`Failed to initialize for '${this.resource.uri.toString()}':`, e) + content => this.initialize(content || '') ); } @@ -143,9 +142,21 @@ export class MonacoEditorModel implements ITextEditorModel, TextEditorDocument { * Only this method can create an instance of `monaco.editor.IModel`, * there should not be other calls to `monaco.editor.createModel`. */ - protected initialize(content: string): void { + protected initialize(value: string | monaco.editor.ITextBufferFactory): void { if (!this.toDispose.disposed) { - this.model = monaco.editor.createModel(content, undefined, monaco.Uri.parse(this.resource.uri.toString())); + const uri = monaco.Uri.parse(this.resource.uri.toString()); + let firstLine; + if (typeof value === 'string') { + firstLine = value; + const firstLF = value.indexOf('\n'); + if (firstLF !== -1) { + firstLine = value.substring(0, firstLF); + } + } else { + firstLine = value.getFirstLineText(1000); + } + const languageSelection = monaco.services.StaticServices.modeService.get().createByFilepathOrFirstLine(uri, firstLine); + this.model = monaco.services.StaticServices.modelService.get().createModel(value, languageSelection, uri); this.resourceVersion = this.resource.version; this.updateSavedVersionId(); this.toDispose.push(this.model); @@ -304,29 +315,30 @@ export class MonacoEditorModel implements ITextEditorModel, TextEditorDocument { return; } - const newText = await this.readContents(); - if (newText === undefined || token.isCancellationRequested || this._dirty) { - return; - } - this.resourceVersion = this.resource.version; - - const value = this.model.getValue(); - if (value === newText) { + const value = await this.readContents(); + if (value === undefined || token.isCancellationRequested || this._dirty) { return; } - const range = this.model.getFullModelRange(); - this.applyEdits([{ range, text: newText }], { + this.resourceVersion = this.resource.version; + this.updateModel(() => monaco.services.StaticServices.modelService.get().updateModel(this.model, value), { ignoreDirty: true, ignoreContentChanges: true }); } - protected async readContents(): Promise { + protected async readContents(): Promise { try { - const content = await this.resource.readContents({ encoding: this.getEncoding() }); + const options = { encoding: this.getEncoding() }; + const content = await (this.resource.readStream ? this.resource.readStream(options) : this.resource.readContents(options)); + let value; + if (typeof content === 'string') { + value = content; + } else { + value = monaco.textModel.createTextBufferFactoryFromStream(content); + } this.updateContentEncoding(); this.setValid(true); - return content; + return value; } catch (e) { this.setValid(false); if (ResourceError.NotFound.is(e)) { @@ -404,6 +416,10 @@ export class MonacoEditorModel implements ITextEditorModel, TextEditorDocument { operations: monaco.editor.IIdentifiedSingleEditOperation[], options?: Partial ): monaco.editor.IIdentifiedSingleEditOperation[] { + return this.updateModel(() => this.model.applyEdits(operations), options); + } + + protected updateModel(doUpdate: () => T, options?: Partial): T { const resolvedOptions: MonacoEditorModel.ApplyEditsOptions = { ignoreDirty: false, ignoreContentChanges: false, @@ -413,7 +429,7 @@ export class MonacoEditorModel implements ITextEditorModel, TextEditorDocument { this.ignoreDirtyEdits = resolvedOptions.ignoreDirty; this.ignoreContentChanges = resolvedOptions.ignoreContentChanges; try { - return this.model.applyEdits(operations); + return doUpdate(); } finally { this.ignoreDirtyEdits = ignoreDirtyEdits; this.ignoreContentChanges = ignoreContentChanges; @@ -435,11 +451,12 @@ export class MonacoEditorModel implements ITextEditorModel, TextEditorDocument { return; } - const content = this.model.getValue(); + const contentLength = this.model.getValueLength(); + const content = this.model.createSnapshot() || this.model.getValue(); try { const encoding = this.getEncoding(); const version = this.resourceVersion; - await Resource.save(this.resource, { changes, content, options: { encoding, overwriteEncoding, version } }, token); + await Resource.save(this.resource, { changes, content, contentLength, options: { encoding, overwriteEncoding, version } }, token); this.contentChanges.splice(0, changes.length); this.resourceVersion = this.resource.version; this.updateContentEncoding(); diff --git a/packages/monaco/src/browser/monaco-loader.ts b/packages/monaco/src/browser/monaco-loader.ts index 244593a32dec6..a077f8d280850 100644 --- a/packages/monaco/src/browser/monaco-loader.ts +++ b/packages/monaco/src/browser/monaco-loader.ts @@ -78,7 +78,8 @@ export function loadMonaco(vsRequire: any): Promise { 'vs/platform/contextkey/browser/contextKeyService', 'vs/editor/common/model/wordHelper', 'vs/base/common/errors', - 'vs/base/common/path' + 'vs/base/common/path', + 'vs/editor/common/model/textModel' ], (commands: any, actions: any, keybindingsRegistry: any, keybindingResolver: any, resolvedKeybinding: any, keybindingLabels: any, keyCodes: any, mime: any, editorExtensions: any, simpleServices: any, @@ -91,7 +92,7 @@ export function loadMonaco(vsRequire: any): Promise { markerService: any, contextKey: any, contextKeyService: any, wordHelper: any, - error: any, path: any) => { + error: any, path: any, textModel: any) => { const global: any = self; global.monaco.commands = commands; global.monaco.actions = actions; @@ -114,6 +115,7 @@ export function loadMonaco(vsRequire: any): Promise { global.monaco.wordHelper = wordHelper; global.monaco.error = error; global.monaco.path = path; + global.monaco.textModel = textModel; resolve(); }); }); diff --git a/packages/monaco/src/typings/monaco/index.d.ts b/packages/monaco/src/typings/monaco/index.d.ts index cd6add44bfd48..688d8e5b43914 100644 --- a/packages/monaco/src/typings/monaco/index.d.ts +++ b/packages/monaco/src/typings/monaco/index.d.ts @@ -24,8 +24,24 @@ declare module monaco.instantiation { } } +declare module monaco.textModel { + interface ITextStream { + on(event: 'data', callback: (data: string) => void): void; + on(event: 'error', callback: (err: Error) => void): void; + on(event: 'end', callback: () => void): void; + on(event: string, callback: any): void; + } + // https://github.com/microsoft/vscode/blob/e683ace9e5acadba0e8bde72d793cb2cb83e58a7/src/vs/editor/common/model/textModel.ts#L58 + export function createTextBufferFactoryFromStream(stream: ITextStream, filter?: (chunk: any) => string, validator?: (chunk: any) => Error | undefined): Promise; +} + declare module monaco.editor { + // https://github.com/microsoft/vscode/blob/e683ace9e5acadba0e8bde72d793cb2cb83e58a7/src/vs/editor/common/model.ts#L1263 + export interface ITextBufferFactory { + getFirstLineText(lengthLimit: number): string; + } + export interface ICodeEditor { protected readonly _instantiationService: monaco.instantiation.IInstantiationService; @@ -344,6 +360,11 @@ declare module monaco.editor { after?: IContentDecorationRenderOptions; } + // https://github.com/microsoft/vscode/blob/e683ace9e5acadba0e8bde72d793cb2cb83e58a7/src/vs/editor/common/model.ts#L522 + export interface ITextSnapshot { + read(): string | null; + } + export interface ITextModel { /** * Get the tokens for the line `lineNumber`. @@ -359,6 +380,9 @@ declare module monaco.editor { */ // https://github.com/theia-ide/vscode/blob/standalone/0.19.x/src/vs/editor/common/model.ts#L806-L810 forceTokenization(lineNumber: number): void; + + // https://github.com/microsoft/vscode/blob/e683ace9e5acadba0e8bde72d793cb2cb83e58a7/src/vs/editor/common/model.ts#L623 + createSnapshot(): ITextSnapshot | null; } } @@ -718,6 +742,12 @@ declare module monaco.services { read(filter?: { owner?: string; resource?: monaco.Uri; severities?: number, take?: number; }): editor.IMarker[]; } + // https://github.com/microsoft/vscode/blob/e683ace9e5acadba0e8bde72d793cb2cb83e58a7/src/vs/editor/common/services/modelService.ts#L18 + export interface IModelService { + createModel(value: string | monaco.editor.ITextBufferFactory, languageSelection: ILanguageSelection | null, resource?: monaco.URI, isForSimpleWidget?: boolean): monaco.editor.ITextModel; + updateModel(model: monaco.editor.ITextModel, value: string | monaco.editor.ITextBufferFactory): void; + } + // https://github.com/theia-ide/vscode/blob/standalone/0.19.x/src/vs/editor/standalone/browser/standaloneServices.ts#L56 export module StaticServices { export function init(overrides: monaco.editor.IEditorOverrideServices): [ServiceCollection, monaco.instantiation.IInstantiationService]; @@ -728,6 +758,7 @@ declare module monaco.services { export const resourcePropertiesService: LazyStaticService; export const instantiationService: LazyStaticService; export const markerService: LazyStaticService; + export const modelService: LazyStaticService; } }