From a537cfdbe55a4a7a7868f6a18644fe54006db879 Mon Sep 17 00:00:00 2001 From: Don Jayamanne Date: Mon, 30 May 2022 10:39:23 +1000 Subject: [PATCH] Move cell exec result handler into a seprate class --- src/notebooks/execution/cellExecution.ts | 596 +----------------- .../execution/cellExecutionMessageHandler.ts | 574 +++++++++++++++++ .../cellExecutionMessageHandlerFactory.ts | 40 ++ src/notebooks/execution/kernelExecution.ts | 10 +- .../common/{refBool.node.ts => refBool.ts} | 3 + 5 files changed, 660 insertions(+), 563 deletions(-) create mode 100644 src/notebooks/execution/cellExecutionMessageHandler.ts create mode 100644 src/notebooks/execution/cellExecutionMessageHandlerFactory.ts rename src/platform/common/{refBool.node.ts => refBool.ts} (67%) diff --git a/src/notebooks/execution/cellExecution.ts b/src/notebooks/execution/cellExecution.ts index a632032ab536..2db4c7a834f0 100644 --- a/src/notebooks/execution/cellExecution.ts +++ b/src/notebooks/execution/cellExecution.ts @@ -3,99 +3,47 @@ 'use strict'; -import * as fastDeepEqual from 'fast-deep-equal'; -import type * as nbformat from '@jupyterlab/nbformat'; import type * as KernelMessage from '@jupyterlab/services/lib/kernel/messages'; import { NotebookCell, NotebookCellExecution, - NotebookCellKind, - NotebookCellExecutionSummary, - NotebookDocument, workspace, NotebookController, - WorkspaceEdit, - NotebookCellData, - Range, NotebookCellOutput, NotebookCellExecutionState, - CancellationTokenSource, Event, - EventEmitter, - ExtensionMode, - NotebookEdit + EventEmitter } from 'vscode'; import { Kernel } from '@jupyterlab/services'; -import { CellOutputDisplayIdTracker } from './cellDisplayIdTracker'; import { CellExecutionCreator } from './cellExecutionCreator'; -import { IApplicationShell } from '../../platform/common/application/types'; import { analyzeKernelErrors, createOutputWithErrorMessageForDisplay } from '../../platform/errors/errorUtils'; import { BaseError } from '../../platform/errors/types'; import { disposeAllDisposables } from '../../platform/common/helpers'; import { traceError, traceInfoIfCI, traceWarning } from '../../platform/logging'; -import { RefBool } from '../../platform/common/refBool.node'; -import { IDisposable, IExtensionContext } from '../../platform/common/types'; -import { Deferred, createDeferred } from '../../platform/common/utils/async'; +import { IDisposable } from '../../platform/common/types'; +import { createDeferred } from '../../platform/common/utils/async'; import { StopWatch } from '../../platform/common/utils/stopWatch'; -import { - NotebookCellStateTracker, - traceCellMessage, - cellOutputToVSCCellOutput, - translateCellDisplayOutput, - isJupyterNotebook -} from '../helpers'; +import { NotebookCellStateTracker, traceCellMessage } from '../helpers'; import { sendTelemetryEvent } from '../../telemetry'; -import { formatStreamText, concatMultilineString } from '../../webviews/webview-side/common'; import { Telemetry } from '../../webviews/webview-side/common/constants'; -import { swallowExceptions } from '../../platform/common/utils/decorators'; import { noop } from '../../platform/common/utils/misc'; import { getDisplayNameOrNameOfKernelConnection, isPythonKernelConnection } from '../../kernels/helpers'; -import { - IJupyterSession, - ITracebackFormatter, - KernelConnectionMetadata, - NotebookCellRunState -} from '../../kernels/types'; -import { handleTensorBoardDisplayDataOutput } from './executionHelpers'; -import { WIDGET_MIMETYPE } from '../../kernels/ipywidgets-message-coordination/constants'; +import { IJupyterSession, KernelConnectionMetadata, NotebookCellRunState } from '../../kernels/types'; import { getInteractiveCellMetadata } from '../../interactive-window/helpers'; import { isCancellationError } from '../../platform/common/cancellation'; - -// Helper interface for the set_next_input execute reply payload -interface ISetNextInputPayload { - replace: boolean; - source: 'set_next_input'; - text: string; -} - -type ExecuteResult = nbformat.IExecuteResult & { - transient?: { display_id?: string }; -}; -type DisplayData = nbformat.IDisplayData & { - transient?: { display_id?: string }; -}; +import { activeNotebookCellExecution, CellExecutionMessageHandler } from './cellExecutionMessageHandler'; +import { CellExecutionMessageHandlerFactory } from './cellExecutionMessageHandlerFactory'; export class CellExecutionFactory { constructor( - private readonly appShell: IApplicationShell, private readonly controller: NotebookController, - private readonly outputTracker: CellOutputDisplayIdTracker, - private readonly context: IExtensionContext, - private readonly formatters: ITracebackFormatter[] + private readonly factory: CellExecutionMessageHandlerFactory ) {} public create(cell: NotebookCell, metadata: Readonly) { // eslint-disable-next-line @typescript-eslint/no-use-before-define - return CellExecution.fromCell( - cell, - this.appShell, - metadata, - this.controller, - this.outputTracker, - this.context, - this.formatters - ); + return CellExecution.fromCell(cell, metadata, this.controller, this.factory); } } @@ -115,16 +63,6 @@ export class CellExecution implements IDisposable { public get preExecute(): Event { return this._preExecuteEmitter.event; } - /** - * To be used only in tests. - */ - public static cellsCompletedForTesting = new WeakMap>(); - /** - * At any given point in time, we can only have one cell actively running. - * This will keep track of that task. - */ - private static activeNotebookCellExecution = new WeakMap(); - private static sentExecuteCellTelemetry?: boolean; private stopWatch = new StopWatch(); @@ -138,29 +76,16 @@ export class CellExecution implements IDisposable { private startTime?: number; private endTime?: number; private execution?: NotebookCellExecution; - private temporaryExecution?: NotebookCellExecution; - private previousResultsToRestore?: NotebookCellExecutionSummary; private cancelHandled = false; - private cellHasErrorsInOutput?: boolean; - /** - * We keep track of the last output that was used to store stream text. - * We need this so that we can update it later on (when we get new data for the same stream). - * If users clear outputs or if we have a new output other than stream, then clear this item. - * Because if after the stream we have an image, then the stream is not the last output item, hence its cleared. - */ - private lastUsedStreamOutput?: { stream: 'stdout' | 'stderr'; text: string; output: NotebookCellOutput }; private request: Kernel.IShellFuture | undefined; private readonly disposables: IDisposable[] = []; - private readonly prompts = new Set(); private _preExecuteEmitter = new EventEmitter(); + private cellExecutionHandler?: CellExecutionMessageHandler; private constructor( public readonly cell: NotebookCell, - private readonly applicationService: IApplicationShell, private readonly kernelConnection: Readonly, private readonly controller: NotebookController, - private readonly outputDisplayIdTracker: CellOutputDisplayIdTracker, - private readonly context: IExtensionContext, - private readonly formatters: ITracebackFormatter[] + private readonly factory: CellExecutionMessageHandlerFactory ) { workspace.onDidCloseTextDocument( (e) => { @@ -180,23 +105,6 @@ export class CellExecution implements IDisposable { this, this.disposables ); - workspace.onDidChangeNotebookDocument( - (e) => { - if (!isJupyterNotebook(e.notebook)) { - return; - } - const thisCellChange = e.cellChanges.find(({ cell }) => cell === this.cell); - if (!thisCellChange) { - return; - } - if (thisCellChange.outputs?.length === 0) { - // keep track of the fact that user has cleared the output. - this.clearLastUsedStreamOutput(); - } - }, - this, - this.disposables - ); NotebookCellStateTracker.setCellState(cell, NotebookCellExecutionState.Idle); const execution = CellExecutionCreator.get(cell); if (this.canExecuteCell()) { @@ -219,14 +127,11 @@ export class CellExecution implements IDisposable { public static fromCell( cell: NotebookCell, - appService: IApplicationShell, metadata: Readonly, controller: NotebookController, - outputTracker: CellOutputDisplayIdTracker, - context: IExtensionContext, - formatters: ITracebackFormatter[] + factory: CellExecutionMessageHandlerFactory ) { - return new CellExecution(cell, appService, metadata, controller, outputTracker, context, formatters); + return new CellExecution(cell, metadata, controller, factory); } public async start(session: IJupyterSession) { if (this.cancelHandled) { @@ -251,10 +156,9 @@ export class CellExecution implements IDisposable { this.started = true; this.startTime = new Date().getTime(); - CellExecution.activeNotebookCellExecution.set(this.cell.notebook, this.execution); + activeNotebookCellExecution.set(this.cell.notebook, this.execution); this.execution?.start(this.startTime); NotebookCellStateTracker.setCellState(this.cell, NotebookCellExecutionState.Executing); - this.clearLastUsedStreamOutput(); // Await here, so that the UI updates on the progress & we clear the output. // Else when running cells with existing outputs, the outputs don't get cleared & it doesn't look like its running. // Ideally we shouldn't have any awaits, but here we want the UI to get updated. @@ -281,8 +185,6 @@ export class CellExecution implements IDisposable { if (this.cancelHandled) { return; } - // Close all of the prompts (if we any any UI prompts asking user for input). - this.prompts.forEach((item) => item.cancel()); if (this.started && !forced) { // At this point the cell execution can only be stopped from kernel & we should not // stop handling execution results & the like from the kernel. @@ -307,11 +209,6 @@ export class CellExecution implements IDisposable { public dispose() { traceCellMessage(this.cell, 'Execution disposed'); disposeAllDisposables(this.disposables); - this.prompts.forEach((item) => item.dispose()); - this.prompts.clear(); - } - private clearLastUsedStreamOutput() { - this.lastUsedStreamOutput = undefined; } private completedWithErrors(error: Partial) { traceWarning(`Cell completed with errors`, error); @@ -356,7 +253,7 @@ export class CellExecution implements IDisposable { let success: 'success' | 'failed' = 'success'; // If there are any errors in the cell, then change status to error. - if (this.cellHasErrorsInOutput) { + if (this.cellExecutionHandler?.hasErrorOutput) { success = 'failed'; runState = NotebookCellRunState.Error; } @@ -381,58 +278,12 @@ export class CellExecution implements IDisposable { // Undefined for not success or failures this.execution?.end(undefined); } - if (CellExecution.activeNotebookCellExecution.get(this.cell.notebook) === this.execution) { - CellExecution.activeNotebookCellExecution.set(this.cell.notebook, undefined); + if (activeNotebookCellExecution.get(this.cell.notebook) === this.execution) { + activeNotebookCellExecution.set(this.cell.notebook, undefined); } NotebookCellStateTracker.setCellState(this.cell, NotebookCellExecutionState.Idle); this.execution = undefined; } - /** - * Assume we run cell A - * Now run cell B, and this will update cell A. - * The way it works is, the request object created for cell A will get a message saying update your output. - * Cell A has completed, hence there's no execution task, we should create one or re-use an existing one. - * Creating one results in side effects such as execution order getting reset and timers starting. - * Hence where possible re-use an existing cell execution task associated with this document. - */ - private createTemporaryTask() { - if (this.cell.document.isClosed) { - return; - } - // If we have an active task, use that instead of creating a new task. - const existingTask = CellExecution.activeNotebookCellExecution.get(this.cell.notebook); - if (existingTask) { - return existingTask; - } - - // Create a temporary task. - this.previousResultsToRestore = { ...(this.cell.executionSummary || {}) }; - this.temporaryExecution = CellExecutionCreator.getOrCreate(this.cell, this.controller); - this.temporaryExecution?.start(); - if (this.previousResultsToRestore?.executionOrder && this.execution) { - this.execution.executionOrder = this.previousResultsToRestore.executionOrder; - } - return this.temporaryExecution; - } - private endTemporaryTask() { - if (this.previousResultsToRestore?.executionOrder && this.execution) { - this.execution.executionOrder = this.previousResultsToRestore.executionOrder; - } - if (this.previousResultsToRestore && this.temporaryExecution) { - if (this.previousResultsToRestore.executionOrder) { - this.temporaryExecution.executionOrder = this.previousResultsToRestore.executionOrder; - } - this.temporaryExecution.end( - this.previousResultsToRestore.success, - this.previousResultsToRestore.timing?.endTime - ); - } else { - // Undefined for not success or failure - this.temporaryExecution?.end(undefined); - } - this.previousResultsToRestore = undefined; - this.temporaryExecution = undefined; - } private completedDueToCancellation() { traceCellMessage(this.cell, 'Completed due to cancellation'); @@ -503,26 +354,22 @@ export class CellExecution implements IDisposable { traceError(`Cell execution failed without request, for cell Index ${this.cell.index}`, ex); return this.completedWithErrors(ex); } - // Listen to messages and update our cell execution state appropriately - // Keep track of our clear state - const clearState = new RefBool(false); - - const request = this.request; - request.onIOPub = (msg) => { - // Cell has been deleted or the like. - if (this.cell.document.isClosed) { - request.dispose(); - } - this.handleIOPub(clearState, msg); - }; - request.onReply = (msg) => { - // Cell has been deleted or the like. - if (this.cell.document.isClosed) { - request.dispose(); - } - this.handleReply(clearState, msg); - }; - request.onStdin = this.handleInputRequest.bind(this, session); + this.cellExecutionHandler = this.factory.create(this.cell, { + kernel: session.kernel!, + cellExecution: this.execution!, + request: this.request + }); + this.disposables.push(this.cellExecutionHandler); + this.cellExecutionHandler.onErrorHandlingExecuteRequestIOPubMessage( + ({ error }) => { + traceError(`Cell (index = ${this.cell.index}) execution completed with errors (2).`, error); + // If not a restart error, then tell the subscriber + // eslint-disable-next-line @typescript-eslint/no-explicit-any + this.completedWithErrors(error as any); + }, + this, + this.disposables + ); // WARNING: Do not dispose `request`. // Even after request.done & execute_reply is sent we could have more messages coming from iopub. @@ -533,7 +380,7 @@ export class CellExecution implements IDisposable { // request.done resolves even before all iopub messages have been sent through. // Solution is to wait for all messages to get processed. traceCellMessage(this.cell, 'Wait for jupyter execution'); - await request.done; + await this.request.done; traceCellMessage(this.cell, 'Jupyter execution completed'); this.completedSuccessfully(); traceCellMessage(this.cell, 'Executed successfully in executeCell'); @@ -553,379 +400,4 @@ export class CellExecution implements IDisposable { } } } - @swallowExceptions() - private handleIOPub(clearState: RefBool, msg: KernelMessage.IIOPubMessage) { - // eslint-disable-next-line @typescript-eslint/no-require-imports - const jupyterLab = require('@jupyterlab/services') as typeof import('@jupyterlab/services'); - - try { - if (jupyterLab.KernelMessage.isExecuteResultMsg(msg)) { - this.handleExecuteResult(msg as KernelMessage.IExecuteResultMsg, clearState); - } else if (jupyterLab.KernelMessage.isExecuteInputMsg(msg)) { - this.handleExecuteInput(msg as KernelMessage.IExecuteInputMsg, clearState); - } else if (jupyterLab.KernelMessage.isStatusMsg(msg)) { - // Status is handled by the result promise. While it is running we are active. Otherwise we're stopped. - // So ignore status messages. - const statusMsg = msg as KernelMessage.IStatusMsg; - this.handleStatusMessage(statusMsg, clearState); - } else if (jupyterLab.KernelMessage.isStreamMsg(msg)) { - this.handleStreamMessage(msg as KernelMessage.IStreamMsg, clearState); - } else if (jupyterLab.KernelMessage.isDisplayDataMsg(msg)) { - this.handleDisplayData(msg as KernelMessage.IDisplayDataMsg, clearState); - } else if (jupyterLab.KernelMessage.isUpdateDisplayDataMsg(msg)) { - this.handleUpdateDisplayDataMessage(msg); - } else if (jupyterLab.KernelMessage.isClearOutputMsg(msg)) { - this.handleClearOutput(msg as KernelMessage.IClearOutputMsg, clearState); - } else if (jupyterLab.KernelMessage.isErrorMsg(msg)) { - this.handleError(msg as KernelMessage.IErrorMsg, clearState); - } else if (jupyterLab.KernelMessage.isCommOpenMsg(msg)) { - // Noop. - } else if (jupyterLab.KernelMessage.isCommMsgMsg(msg)) { - // Noop. - } else if (jupyterLab.KernelMessage.isCommCloseMsg(msg)) { - // Noop. - } else { - traceWarning(`Unknown message ${msg.header.msg_type} : hasData=${'data' in msg.content}`); - } - - // Set execution count, all messages should have it - if ('execution_count' in msg.content && typeof msg.content.execution_count === 'number' && this.execution) { - this.execution.executionOrder = msg.content.execution_count; - } - } catch (err) { - traceError(`Cell (index = ${this.cell.index}) execution completed with errors (2).`, err); - // If not a restart error, then tell the subscriber - // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.completedWithErrors(err as any); - } - } - - private addToCellData( - output: ExecuteResult | DisplayData | nbformat.IStream | nbformat.IError | nbformat.IOutput, - clearState: RefBool - ) { - if ( - this.context.extensionMode === ExtensionMode.Test && - output.data && - typeof output.data === 'object' && - WIDGET_MIMETYPE in output.data - ) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (output.data[WIDGET_MIMETYPE] as any)['_vsc_test_cellIndex'] = this.cell.index; - } - const cellOutput = cellOutputToVSCCellOutput(output); - const displayId = - 'transient' in output && - typeof output.transient === 'object' && - output.transient && - 'display_id' in output.transient && - typeof output.transient?.display_id === 'string' - ? output.transient?.display_id - : undefined; - if (this.cell.document.isClosed) { - return; - } - traceCellMessage(this.cell, 'Update output'); - // Clear if necessary - if (clearState.value) { - this.clearLastUsedStreamOutput(); - this.execution?.clearOutput().then(noop, noop); - clearState.update(false); - } - // Keep track of the displa_id against the output item, we might need this to update this later. - if (displayId) { - this.outputDisplayIdTracker.trackOutputByDisplayId(this.cell, displayId, cellOutput); - } - - // Append to the data (we would push here but VS code requires a recreation of the array) - // Possible execution of cell has completed (the task would have been disposed). - // This message could have come from a background thread. - // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). - const task = this.execution || this.createTemporaryTask(); - this.clearLastUsedStreamOutput(); - traceCellMessage(this.cell, 'Append output in addToCellData'); - task?.appendOutput([cellOutput]).then(noop, noop); - this.endTemporaryTask(); - } - - private async handleInputRequest(session: IJupyterSession, msg: KernelMessage.IStdinMessage) { - // Ask the user for input - if (msg.content && 'prompt' in msg.content) { - const cancelToken = new CancellationTokenSource(); - this.prompts.add(cancelToken); - const hasPassword = msg.content.password !== null && (msg.content.password as boolean); - await this.applicationService - .showInputBox( - { - prompt: msg.content.prompt ? msg.content.prompt.toString() : '', - ignoreFocusOut: true, - password: hasPassword - }, - cancelToken.token - ) - .then((v) => { - session.sendInputReply({ value: v || '', status: 'ok' }); - }, noop); - - this.prompts.delete(cancelToken); - } - } - - // See this for docs on the messages: - // https://jupyter-client.readthedocs.io/en/latest/messaging.html#messaging-in-jupyter - private handleExecuteResult(msg: KernelMessage.IExecuteResultMsg, clearState: RefBool) { - this.addToCellData( - { - output_type: 'execute_result', - data: msg.content.data, - metadata: msg.content.metadata, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - transient: msg.content.transient as any, // NOSONAR - execution_count: msg.content.execution_count - }, - clearState - ); - } - - private handleExecuteReply(msg: KernelMessage.IExecuteReplyMsg, clearState: RefBool) { - const reply = msg.content as KernelMessage.IExecuteReply; - if (reply.payload) { - reply.payload.forEach((payload) => { - if ( - payload.source && - payload.source === 'set_next_input' && - 'text' in payload && - 'replace' in payload - ) { - this.handleSetNextInput(payload as unknown as ISetNextInputPayload); - } - if (payload.data && payload.data.hasOwnProperty('text/plain')) { - this.addToCellData( - { - // Mark as stream output so the text is formatted because it likely has ansi codes in it. - output_type: 'stream', - // eslint-disable-next-line @typescript-eslint/no-explicit-any - text: (payload.data as any)['text/plain'].toString(), - name: 'stdout', - metadata: {}, - execution_count: reply.execution_count - }, - clearState - ); - } - }); - } - } - - // Handle our set_next_input message, which can either replace or insert a new cell with text - private handleSetNextInput(payload: ISetNextInputPayload) { - const edit = new WorkspaceEdit(); - if (payload.replace) { - // Replace the contents of the current cell with text - edit.replace( - this.cell.document.uri, - new Range( - this.cell.document.lineAt(0).range.start, - this.cell.document.lineAt(this.cell.document.lineCount - 1).range.end - ), - payload.text - ); - } else { - // Add a new cell after the current with text - traceCellMessage(this.cell, 'Create new cell after current'); - const cellData = new NotebookCellData(NotebookCellKind.Code, payload.text, this.cell.document.languageId); - cellData.outputs = []; - cellData.metadata = {}; - const nbEdit = NotebookEdit.insertCells(this.cell.index + 1, [cellData]); - edit.set(this.cell.notebook.uri, [nbEdit]); - } - workspace.applyEdit(edit).then(noop, noop); - } - - private handleExecuteInput(msg: KernelMessage.IExecuteInputMsg, _clearState: RefBool) { - if (msg.content.execution_count && this.execution) { - this.execution.executionOrder = msg.content.execution_count; - } - } - - private handleStatusMessage(msg: KernelMessage.IStatusMsg, _clearState: RefBool) { - traceCellMessage(this.cell, `Kernel switching to ${msg.content.execution_state}`); - } - private handleStreamMessage(msg: KernelMessage.IStreamMsg, clearState: RefBool) { - // eslint-disable-next-line complexity - traceCellMessage(this.cell, 'Update streamed output'); - // Possible execution of cell has completed (the task would have been disposed). - // This message could have come from a background thread. - // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). - const task = this.execution || this.createTemporaryTask(); - - // Clear output if waiting for a clear - const clearOutput = clearState.value; - if (clearOutput) { - traceCellMessage(this.cell, 'Clear cell output'); - this.clearLastUsedStreamOutput(); - task?.clearOutput().then(noop, noop); - clearState.update(false); - } - // Ensure we append to previous output, only if the streams as the same & - // If the last output is the desired stream type. - if (this.lastUsedStreamOutput?.stream === msg.content.name) { - // Get the jupyter output from the vs code output (so we can concatenate the text ourselves). - let existingOutputText = this.lastUsedStreamOutput.text; - let newContent = msg.content.text; - // Look for the ansi code `[A`. (this means move up) - // Not going to support `[2A` (not for now). - const moveUpCode = `${String.fromCharCode(27)}[A`; - if (msg.content.text.startsWith(moveUpCode)) { - // Split message by lines & strip out the last n lines (where n = number of lines to move cursor up). - const existingOutputLines = existingOutputText.splitLines({ - trim: false, - removeEmptyEntries: false - }); - if (existingOutputLines.length) { - existingOutputLines.pop(); - } - existingOutputText = existingOutputLines.join('\n'); - newContent = newContent.substring(moveUpCode.length); - } - // Create a new output item with the concatenated string. - this.lastUsedStreamOutput.text = formatStreamText( - concatMultilineString(`${existingOutputText}${newContent}`) - ); - const output = cellOutputToVSCCellOutput({ - output_type: 'stream', - name: msg.content.name, - text: this.lastUsedStreamOutput.text - }); - traceCellMessage(this.cell, `Replace output items ${this.lastUsedStreamOutput.text.substring(0, 100)}`); - task?.replaceOutputItems(output.items, this.lastUsedStreamOutput.output).then(noop, noop); - } else if (clearOutput) { - // Replace the current outputs with a single new output. - const text = formatStreamText(concatMultilineString(msg.content.text)); - const output = cellOutputToVSCCellOutput({ - output_type: 'stream', - name: msg.content.name, - text - }); - this.lastUsedStreamOutput = { output, stream: msg.content.name, text }; - traceCellMessage(this.cell, `Replace output ${this.lastUsedStreamOutput.text.substring(0, 100)}`); - task?.replaceOutput([output]).then(noop, noop); - } else { - // Create a new output - const text = formatStreamText(concatMultilineString(msg.content.text)); - const output = cellOutputToVSCCellOutput({ - output_type: 'stream', - name: msg.content.name, - text - }); - this.lastUsedStreamOutput = { output, stream: msg.content.name, text }; - traceCellMessage(this.cell, `Append output ${this.lastUsedStreamOutput.text.substring(0, 100)}`); - task?.appendOutput([output]).then(noop, noop); - } - this.endTemporaryTask(); - } - - private handleDisplayData(msg: KernelMessage.IDisplayDataMsg, clearState: RefBool) { - const output = { - output_type: 'display_data', - data: handleTensorBoardDisplayDataOutput(msg.content.data), - metadata: msg.content.metadata, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - transient: msg.content.transient as any // NOSONAR - }; - this.addToCellData(output, clearState); - } - - private handleClearOutput(msg: KernelMessage.IClearOutputMsg, clearState: RefBool) { - // If the message says wait, add every message type to our clear state. This will - // make us wait for this type of output before we clear it. - if (msg && msg.content.wait) { - clearState.update(true); - } else { - // Possible execution of cell has completed (the task would have been disposed). - // This message could have come from a background thread. - // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). - // Clear all outputs and start over again. - const task = this.execution || this.createTemporaryTask(); - this.clearLastUsedStreamOutput(); - task?.clearOutput().then(noop, noop); - this.endTemporaryTask(); - } - } - - private handleError(msg: KernelMessage.IErrorMsg, clearState: RefBool) { - let traceback = msg.content.traceback; - this.formatters.forEach((formatter) => { - traceback = formatter.format(this.cell, traceback); - }); - const output: nbformat.IError = { - output_type: 'error', - ename: msg.content.ename, - evalue: msg.content.evalue, - traceback - }; - - this.addToCellData(output, clearState); - this.cellHasErrorsInOutput = true; - } - - @swallowExceptions() - private handleReply(clearState: RefBool, msg: KernelMessage.IShellControlMessage) { - // eslint-disable-next-line @typescript-eslint/no-require-imports - const jupyterLab = require('@jupyterlab/services') as typeof import('@jupyterlab/services'); - - if (jupyterLab.KernelMessage.isExecuteReplyMsg(msg)) { - this.handleExecuteReply(msg, clearState); - - // Set execution count, all messages should have it - if ('execution_count' in msg.content && typeof msg.content.execution_count === 'number' && this.execution) { - this.execution.executionOrder = msg.content.execution_count; - } - } - } - /** - * Execution of Cell B could result in updates to output in Cell A. - */ - private handleUpdateDisplayDataMessage(msg: KernelMessage.IUpdateDisplayDataMsg) { - const displayId = msg.content.transient.display_id; - if (!displayId) { - return; - } - const outputToBeUpdated = this.outputDisplayIdTracker.getMappedOutput(this.cell.notebook, displayId); - if (!outputToBeUpdated) { - return; - } - const output = translateCellDisplayOutput(outputToBeUpdated); - const newOutput = cellOutputToVSCCellOutput({ - ...output, - data: msg.content.data, - metadata: msg.content.metadata - } as nbformat.IDisplayData); - // If there was no output and still no output, then nothing to do. - if (outputToBeUpdated.items.length === 0 && newOutput.items.length === 0) { - return; - } - // Compare each output item (at the end of the day everything is serializable). - // Hence this is a safe comparison. - if (outputToBeUpdated.items.length === newOutput.items.length) { - let allAllOutputItemsSame = true; - for (let index = 0; index < outputToBeUpdated.items.length; index++) { - if (!fastDeepEqual(outputToBeUpdated.items[index], newOutput.items[index])) { - allAllOutputItemsSame = false; - break; - } - } - if (allAllOutputItemsSame) { - // If everything is still the same, then there's nothing to update. - return; - } - } - // Possible execution of cell has completed (the task would have been disposed). - // This message could have come from a background thread. - // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). - const task = this.execution || this.createTemporaryTask(); - traceCellMessage(this.cell, `Replace output items in display data ${newOutput.items.length}`); - task?.replaceOutputItems(newOutput.items, outputToBeUpdated).then(noop, noop); - this.endTemporaryTask(); - } } diff --git a/src/notebooks/execution/cellExecutionMessageHandler.ts b/src/notebooks/execution/cellExecutionMessageHandler.ts new file mode 100644 index 000000000000..81a5789c2f66 --- /dev/null +++ b/src/notebooks/execution/cellExecutionMessageHandler.ts @@ -0,0 +1,574 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +'use strict'; + +import * as fastDeepEqual from 'fast-deep-equal'; +import type * as nbformat from '@jupyterlab/nbformat'; +import type * as KernelMessage from '@jupyterlab/services/lib/kernel/messages'; +import { + NotebookCell, + NotebookCellExecution, + NotebookCellKind, + NotebookCellExecutionSummary, + NotebookDocument, + workspace, + NotebookController, + WorkspaceEdit, + NotebookCellData, + Range, + NotebookCellOutput, + CancellationTokenSource, + EventEmitter, + ExtensionMode, + NotebookEdit +} from 'vscode'; + +import { Kernel } from '@jupyterlab/services'; +import { CellOutputDisplayIdTracker } from './cellDisplayIdTracker'; +import { CellExecutionCreator } from './cellExecutionCreator'; +import { IApplicationShell } from '../../platform/common/application/types'; +import { disposeAllDisposables } from '../../platform/common/helpers'; +import { traceWarning } from '../../platform/logging'; +import { RefBool } from '../../platform/common/refBool'; +import { IDisposable, IExtensionContext } from '../../platform/common/types'; +import { traceCellMessage, cellOutputToVSCCellOutput, translateCellDisplayOutput, isJupyterNotebook } from '../helpers'; +import { formatStreamText, concatMultilineString } from '../../webviews/webview-side/common'; +import { swallowExceptions } from '../../platform/common/utils/decorators'; +import { noop } from '../../platform/common/utils/misc'; +import { ITracebackFormatter } from '../../kernels/types'; +import { handleTensorBoardDisplayDataOutput } from './executionHelpers'; +import { WIDGET_MIMETYPE } from '../../kernels/ipywidgets-message-coordination/constants'; + +// Helper interface for the set_next_input execute reply payload +interface ISetNextInputPayload { + replace: boolean; + source: 'set_next_input'; + text: string; +} + +type ExecuteResult = nbformat.IExecuteResult & { + transient?: { display_id?: string }; +}; +type DisplayData = nbformat.IDisplayData & { + transient?: { display_id?: string }; +}; + +/** + * At any given point in time, we can only have one cell actively running. + * This will keep track of that task. + */ +export const activeNotebookCellExecution = new WeakMap(); + +/** + * Responsible for handling of jupyter messages as a result of execution of individual cells. + */ +export class CellExecutionMessageHandler implements IDisposable { + /** + * Listen to messages and update our cell execution state appropriately + * Keep track of our clear state + */ + private readonly clearState = new RefBool(false); + + private execution?: NotebookCellExecution; + private readonly _onErrorHandlingIOPubMessage = new EventEmitter<{ + error: Error; + msg: KernelMessage.IIOPubMessage; + }>(); + public readonly onErrorHandlingExecuteRequestIOPubMessage = this._onErrorHandlingIOPubMessage.event; + private temporaryExecution?: NotebookCellExecution; + private previousResultsToRestore?: NotebookCellExecutionSummary; + private cellHasErrorsInOutput?: boolean; + + public get hasErrorOutput() { + return this.cellHasErrorsInOutput === true; + } + /** + * We keep track of the last output that was used to store stream text. + * We need this so that we can update it later on (when we get new data for the same stream). + * If users clear outputs or if we have a new output other than stream, then clear this item. + * Because if after the stream we have an image, then the stream is not the last output item, hence its cleared. + */ + private lastUsedStreamOutput?: { stream: 'stdout' | 'stderr'; text: string; output: NotebookCellOutput }; + private readonly disposables: IDisposable[] = []; + private readonly prompts = new Set(); + constructor( + public readonly cell: NotebookCell, + private readonly applicationService: IApplicationShell, + private readonly controller: NotebookController, + private readonly outputDisplayIdTracker: CellOutputDisplayIdTracker, + private readonly context: IExtensionContext, + private readonly formatters: ITracebackFormatter[], + private readonly kernel: Kernel.IKernelConnection, + request: Kernel.IShellFuture, + cellExecution: NotebookCellExecution + ) { + workspace.onDidChangeNotebookDocument( + (e) => { + if (!isJupyterNotebook(e.notebook)) { + return; + } + const thisCellChange = e.cellChanges.find(({ cell }) => cell === this.cell); + if (!thisCellChange) { + return; + } + if (thisCellChange.outputs?.length === 0) { + // keep track of the fact that user has cleared the output. + this.clearLastUsedStreamOutput(); + this.cellHasErrorsInOutput = false; + } + }, + this, + this.disposables + ); + this.execution = cellExecution; + this.startHandlingExecutionMessages(request); + } + /** + * This method is called when all execution has been completed (successfully or failed). + * Or when execution has been cancelled. + */ + public dispose() { + traceCellMessage(this.cell, 'Execution disposed'); + disposeAllDisposables(this.disposables); + this.endCellExecution(); + } + private endCellExecution() { + this.prompts.forEach((item) => item.dispose()); + this.prompts.clear(); + this.clearLastUsedStreamOutput(); + this.execution = undefined; + } + private startHandlingExecutionMessages( + request: Kernel.IShellFuture + ) { + request.onIOPub = (msg) => { + // Cell has been deleted or the like. + if (this.cell.document.isClosed) { + request.dispose(); + } + try { + this.handleIOPub(msg); + } catch (ex) { + this._onErrorHandlingIOPubMessage.fire(ex); + } + }; + request.onReply = (msg) => { + // Cell has been deleted or the like. + if (this.cell.document.isClosed) { + request.dispose(); + } + this.handleReply(msg); + }; + request.onStdin = this.handleInputRequest.bind(this); + request.done.finally(() => this.endCellExecution()); + } + + private clearLastUsedStreamOutput() { + this.lastUsedStreamOutput = undefined; + } + /** + * Assume we run cell A + * Now run cell B, and this will update cell A. + * The way it works is, the request object created for cell A will get a message saying update your output. + * Cell A has completed, hence there's no execution task, we should create one or re-use an existing one. + * Creating one results in side effects such as execution order getting reset and timers starting. + * Hence where possible re-use an existing cell execution task associated with this document. + */ + private createTemporaryTask() { + if (this.cell.document.isClosed) { + return; + } + // If we have an active task, use that instead of creating a new task. + const existingTask = activeNotebookCellExecution.get(this.cell.notebook); + if (existingTask) { + return existingTask; + } + + // Create a temporary task. + this.previousResultsToRestore = { ...(this.cell.executionSummary || {}) }; + this.temporaryExecution = CellExecutionCreator.getOrCreate(this.cell, this.controller); + this.temporaryExecution?.start(); + if (this.previousResultsToRestore?.executionOrder && this.execution) { + this.execution.executionOrder = this.previousResultsToRestore.executionOrder; + } + return this.temporaryExecution; + } + private endTemporaryTask() { + if (this.previousResultsToRestore?.executionOrder && this.execution) { + this.execution.executionOrder = this.previousResultsToRestore.executionOrder; + } + if (this.previousResultsToRestore && this.temporaryExecution) { + if (this.previousResultsToRestore.executionOrder) { + this.temporaryExecution.executionOrder = this.previousResultsToRestore.executionOrder; + } + this.temporaryExecution.end( + this.previousResultsToRestore.success, + this.previousResultsToRestore.timing?.endTime + ); + } else { + // Undefined for not success or failure + this.temporaryExecution?.end(undefined); + } + this.previousResultsToRestore = undefined; + this.temporaryExecution = undefined; + } + private handleIOPub(msg: KernelMessage.IIOPubMessage) { + // eslint-disable-next-line @typescript-eslint/no-require-imports + const jupyterLab = require('@jupyterlab/services') as typeof import('@jupyterlab/services'); + + if (jupyterLab.KernelMessage.isExecuteResultMsg(msg)) { + this.handleExecuteResult(msg as KernelMessage.IExecuteResultMsg); + } else if (jupyterLab.KernelMessage.isExecuteInputMsg(msg)) { + this.handleExecuteInput(msg as KernelMessage.IExecuteInputMsg); + } else if (jupyterLab.KernelMessage.isStatusMsg(msg)) { + // Status is handled by the result promise. While it is running we are active. Otherwise we're stopped. + // So ignore status messages. + const statusMsg = msg as KernelMessage.IStatusMsg; + this.handleStatusMessage(statusMsg); + } else if (jupyterLab.KernelMessage.isStreamMsg(msg)) { + this.handleStreamMessage(msg as KernelMessage.IStreamMsg); + } else if (jupyterLab.KernelMessage.isDisplayDataMsg(msg)) { + this.handleDisplayData(msg as KernelMessage.IDisplayDataMsg); + } else if (jupyterLab.KernelMessage.isUpdateDisplayDataMsg(msg)) { + this.handleUpdateDisplayDataMessage(msg); + } else if (jupyterLab.KernelMessage.isClearOutputMsg(msg)) { + this.handleClearOutput(msg as KernelMessage.IClearOutputMsg); + } else if (jupyterLab.KernelMessage.isErrorMsg(msg)) { + this.handleError(msg as KernelMessage.IErrorMsg); + } else if (jupyterLab.KernelMessage.isCommOpenMsg(msg)) { + // Noop. + } else if (jupyterLab.KernelMessage.isCommMsgMsg(msg)) { + // Noop. + } else if (jupyterLab.KernelMessage.isCommCloseMsg(msg)) { + // Noop. + } else { + traceWarning(`Unknown message ${msg.header.msg_type} : hasData=${'data' in msg.content}`); + } + + // Set execution count, all messages should have it + if ('execution_count' in msg.content && typeof msg.content.execution_count === 'number' && this.execution) { + this.execution.executionOrder = msg.content.execution_count; + } + } + + private addToCellData(output: ExecuteResult | DisplayData | nbformat.IStream | nbformat.IError | nbformat.IOutput) { + if ( + this.context.extensionMode === ExtensionMode.Test && + output.data && + typeof output.data === 'object' && + WIDGET_MIMETYPE in output.data + ) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (output.data[WIDGET_MIMETYPE] as any)['_vsc_test_cellIndex'] = this.cell.index; + } + const cellOutput = cellOutputToVSCCellOutput(output); + const displayId = + 'transient' in output && + typeof output.transient === 'object' && + output.transient && + 'display_id' in output.transient && + typeof output.transient?.display_id === 'string' + ? output.transient?.display_id + : undefined; + if (this.cell.document.isClosed) { + return; + } + traceCellMessage(this.cell, 'Update output'); + // Clear if necessary + if (this.clearState.value) { + this.clearLastUsedStreamOutput(); + this.execution?.clearOutput().then(noop, noop); + this.clearState.update(false); + } + // Keep track of the displa_id against the output item, we might need this to update this later. + if (displayId) { + this.outputDisplayIdTracker.trackOutputByDisplayId(this.cell, displayId, cellOutput); + } + + // Append to the data (we would push here but VS code requires a recreation of the array) + // Possible execution of cell has completed (the task would have been disposed). + // This message could have come from a background thread. + // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). + const task = this.execution || this.createTemporaryTask(); + this.clearLastUsedStreamOutput(); + traceCellMessage(this.cell, 'Append output in addToCellData'); + task?.appendOutput([cellOutput]).then(noop, noop); + this.endTemporaryTask(); + } + + private async handleInputRequest(msg: KernelMessage.IStdinMessage) { + // Ask the user for input + if (msg.content && 'prompt' in msg.content) { + const cancelToken = new CancellationTokenSource(); + this.prompts.add(cancelToken); + const hasPassword = msg.content.password !== null && (msg.content.password as boolean); + await this.applicationService + .showInputBox( + { + prompt: msg.content.prompt ? msg.content.prompt.toString() : '', + ignoreFocusOut: true, + password: hasPassword + }, + cancelToken.token + ) + .then((v) => { + this.kernel.sendInputReply({ value: v || '', status: 'ok' }); + }, noop); + + this.prompts.delete(cancelToken); + } + } + + // See this for docs on the messages: + // https://jupyter-client.readthedocs.io/en/latest/messaging.html#messaging-in-jupyter + private handleExecuteResult(msg: KernelMessage.IExecuteResultMsg) { + this.addToCellData({ + output_type: 'execute_result', + data: msg.content.data, + metadata: msg.content.metadata, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + transient: msg.content.transient as any, // NOSONAR + execution_count: msg.content.execution_count + }); + } + + private handleExecuteReply(msg: KernelMessage.IExecuteReplyMsg) { + const reply = msg.content as KernelMessage.IExecuteReply; + if (reply.payload) { + reply.payload.forEach((payload) => { + if ( + payload.source && + payload.source === 'set_next_input' && + 'text' in payload && + 'replace' in payload + ) { + this.handleSetNextInput(payload as unknown as ISetNextInputPayload); + } + if (payload.data && payload.data.hasOwnProperty('text/plain')) { + this.addToCellData({ + // Mark as stream output so the text is formatted because it likely has ansi codes in it. + output_type: 'stream', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + text: (payload.data as any)['text/plain'].toString(), + name: 'stdout', + metadata: {}, + execution_count: reply.execution_count + }); + } + }); + } + } + + // Handle our set_next_input message, which can either replace or insert a new cell with text + private handleSetNextInput(payload: ISetNextInputPayload) { + const edit = new WorkspaceEdit(); + if (payload.replace) { + // Replace the contents of the current cell with text + edit.replace( + this.cell.document.uri, + new Range( + this.cell.document.lineAt(0).range.start, + this.cell.document.lineAt(this.cell.document.lineCount - 1).range.end + ), + payload.text + ); + } else { + // Add a new cell after the current with text + traceCellMessage(this.cell, 'Create new cell after current'); + const cellData = new NotebookCellData(NotebookCellKind.Code, payload.text, this.cell.document.languageId); + cellData.outputs = []; + cellData.metadata = {}; + const nbEdit = NotebookEdit.insertCells(this.cell.index + 1, [cellData]); + edit.set(this.cell.notebook.uri, [nbEdit]); + } + workspace.applyEdit(edit).then(noop, noop); + } + + private handleExecuteInput(msg: KernelMessage.IExecuteInputMsg) { + if (msg.content.execution_count && this.execution) { + this.execution.executionOrder = msg.content.execution_count; + } + } + + private handleStatusMessage(msg: KernelMessage.IStatusMsg) { + traceCellMessage(this.cell, `Kernel switching to ${msg.content.execution_state}`); + } + private handleStreamMessage(msg: KernelMessage.IStreamMsg) { + // eslint-disable-next-line complexity + traceCellMessage(this.cell, 'Update streamed output'); + // Possible execution of cell has completed (the task would have been disposed). + // This message could have come from a background thread. + // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). + const task = this.execution || this.createTemporaryTask(); + + // Clear output if waiting for a clear + const clearOutput = this.clearState.value; + if (clearOutput) { + traceCellMessage(this.cell, 'Clear cell output'); + this.clearLastUsedStreamOutput(); + task?.clearOutput().then(noop, noop); + this.clearState.update(false); + } + // Ensure we append to previous output, only if the streams as the same & + // If the last output is the desired stream type. + if (this.lastUsedStreamOutput?.stream === msg.content.name) { + // Get the jupyter output from the vs code output (so we can concatenate the text ourselves). + let existingOutputText = this.lastUsedStreamOutput.text; + let newContent = msg.content.text; + // Look for the ansi code `[A`. (this means move up) + // Not going to support `[2A` (not for now). + const moveUpCode = `${String.fromCharCode(27)}[A`; + if (msg.content.text.startsWith(moveUpCode)) { + // Split message by lines & strip out the last n lines (where n = number of lines to move cursor up). + const existingOutputLines = existingOutputText.splitLines({ + trim: false, + removeEmptyEntries: false + }); + if (existingOutputLines.length) { + existingOutputLines.pop(); + } + existingOutputText = existingOutputLines.join('\n'); + newContent = newContent.substring(moveUpCode.length); + } + // Create a new output item with the concatenated string. + this.lastUsedStreamOutput.text = formatStreamText( + concatMultilineString(`${existingOutputText}${newContent}`) + ); + const output = cellOutputToVSCCellOutput({ + output_type: 'stream', + name: msg.content.name, + text: this.lastUsedStreamOutput.text + }); + traceCellMessage(this.cell, `Replace output items ${this.lastUsedStreamOutput.text.substring(0, 100)}`); + task?.replaceOutputItems(output.items, this.lastUsedStreamOutput.output).then(noop, noop); + } else if (clearOutput) { + // Replace the current outputs with a single new output. + const text = formatStreamText(concatMultilineString(msg.content.text)); + const output = cellOutputToVSCCellOutput({ + output_type: 'stream', + name: msg.content.name, + text + }); + this.lastUsedStreamOutput = { output, stream: msg.content.name, text }; + traceCellMessage(this.cell, `Replace output ${this.lastUsedStreamOutput.text.substring(0, 100)}`); + task?.replaceOutput([output]).then(noop, noop); + } else { + // Create a new output + const text = formatStreamText(concatMultilineString(msg.content.text)); + const output = cellOutputToVSCCellOutput({ + output_type: 'stream', + name: msg.content.name, + text + }); + this.lastUsedStreamOutput = { output, stream: msg.content.name, text }; + traceCellMessage(this.cell, `Append output ${this.lastUsedStreamOutput.text.substring(0, 100)}`); + task?.appendOutput([output]).then(noop, noop); + } + this.endTemporaryTask(); + } + + private handleDisplayData(msg: KernelMessage.IDisplayDataMsg) { + const output = { + output_type: 'display_data', + data: handleTensorBoardDisplayDataOutput(msg.content.data), + metadata: msg.content.metadata, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + transient: msg.content.transient as any // NOSONAR + }; + this.addToCellData(output); + } + + private handleClearOutput(msg: KernelMessage.IClearOutputMsg) { + // If the message says wait, add every message type to our clear state. This will + // make us wait for this type of output before we clear it. + if (msg && msg.content.wait) { + this.clearState.update(true); + } else { + // Possible execution of cell has completed (the task would have been disposed). + // This message could have come from a background thread. + // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). + // Clear all outputs and start over again. + const task = this.execution || this.createTemporaryTask(); + this.clearLastUsedStreamOutput(); + task?.clearOutput().then(noop, noop); + this.endTemporaryTask(); + } + } + + private handleError(msg: KernelMessage.IErrorMsg) { + let traceback = msg.content.traceback; + this.formatters.forEach((formatter) => { + traceback = formatter.format(this.cell, traceback); + }); + const output: nbformat.IError = { + output_type: 'error', + ename: msg.content.ename, + evalue: msg.content.evalue, + traceback + }; + + this.addToCellData(output); + this.cellHasErrorsInOutput = true; + } + + @swallowExceptions() + private handleReply(msg: KernelMessage.IShellControlMessage) { + // eslint-disable-next-line @typescript-eslint/no-require-imports + const jupyterLab = require('@jupyterlab/services') as typeof import('@jupyterlab/services'); + + if (jupyterLab.KernelMessage.isExecuteReplyMsg(msg)) { + this.handleExecuteReply(msg); + + // Set execution count, all messages should have it + if ('execution_count' in msg.content && typeof msg.content.execution_count === 'number' && this.execution) { + this.execution.executionOrder = msg.content.execution_count; + } + } + } + /** + * Execution of Cell B could result in updates to output in Cell A. + */ + private handleUpdateDisplayDataMessage(msg: KernelMessage.IUpdateDisplayDataMsg) { + const displayId = msg.content.transient.display_id; + if (!displayId) { + return; + } + const outputToBeUpdated = this.outputDisplayIdTracker.getMappedOutput(this.cell.notebook, displayId); + if (!outputToBeUpdated) { + return; + } + const output = translateCellDisplayOutput(outputToBeUpdated); + const newOutput = cellOutputToVSCCellOutput({ + ...output, + data: msg.content.data, + metadata: msg.content.metadata + } as nbformat.IDisplayData); + // If there was no output and still no output, then nothing to do. + if (outputToBeUpdated.items.length === 0 && newOutput.items.length === 0) { + return; + } + // Compare each output item (at the end of the day everything is serializable). + // Hence this is a safe comparison. + if (outputToBeUpdated.items.length === newOutput.items.length) { + let allAllOutputItemsSame = true; + for (let index = 0; index < outputToBeUpdated.items.length; index++) { + if (!fastDeepEqual(outputToBeUpdated.items[index], newOutput.items[index])) { + allAllOutputItemsSame = false; + break; + } + } + if (allAllOutputItemsSame) { + // If everything is still the same, then there's nothing to update. + return; + } + } + // Possible execution of cell has completed (the task would have been disposed). + // This message could have come from a background thread. + // In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output). + const task = this.execution || this.createTemporaryTask(); + traceCellMessage(this.cell, `Replace output items in display data ${newOutput.items.length}`); + task?.replaceOutputItems(newOutput.items, outputToBeUpdated).then(noop, noop); + this.endTemporaryTask(); + } +} diff --git a/src/notebooks/execution/cellExecutionMessageHandlerFactory.ts b/src/notebooks/execution/cellExecutionMessageHandlerFactory.ts new file mode 100644 index 000000000000..8a78e2ad00d0 --- /dev/null +++ b/src/notebooks/execution/cellExecutionMessageHandlerFactory.ts @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import { Kernel, KernelMessage } from '@jupyterlab/services'; +import { NotebookCell, NotebookCellExecution, NotebookController } from 'vscode'; +import { ITracebackFormatter } from '../../kernels/types'; +import { IApplicationShell } from '../../platform/common/application/types'; +import { IExtensionContext } from '../../platform/common/types'; +import { CellOutputDisplayIdTracker } from './cellDisplayIdTracker'; +import { CellExecutionMessageHandler } from './cellExecutionMessageHandler'; + +export class CellExecutionMessageHandlerFactory { + constructor( + private readonly appShell: IApplicationShell, + private readonly controller: NotebookController, + private readonly outputDisplayIdTracker: CellOutputDisplayIdTracker, + private readonly context: IExtensionContext, + private readonly formatters: ITracebackFormatter[] + ) {} + public create( + cell: NotebookCell, + options: { + kernel: Kernel.IKernelConnection; + request: Kernel.IShellFuture; + cellExecution: NotebookCellExecution; + } + ): CellExecutionMessageHandler { + return new CellExecutionMessageHandler( + cell, + this.appShell, + this.controller, + this.outputDisplayIdTracker, + this.context, + this.formatters, + options.kernel, + options.request, + options.cellExecution + ); + } +} diff --git a/src/notebooks/execution/kernelExecution.ts b/src/notebooks/execution/kernelExecution.ts index 1dfee31bf486..6856b09c0322 100644 --- a/src/notebooks/execution/kernelExecution.ts +++ b/src/notebooks/execution/kernelExecution.ts @@ -27,6 +27,7 @@ import { import { traceCellMessage } from '../helpers'; import { getDisplayPath } from '../../platform/common/platform/fs-paths'; import { getAssociatedNotebookDocument } from '../controllers/kernelSelector'; +import { CellExecutionMessageHandlerFactory } from './cellExecutionMessageHandlerFactory'; /** * Separate class that deals just with kernel execution. @@ -49,7 +50,14 @@ export class KernelExecution implements IDisposable { context: IExtensionContext, formatters: ITracebackFormatter[] ) { - this.executionFactory = new CellExecutionFactory(appShell, controller, outputTracker, context, formatters); + const factory = new CellExecutionMessageHandlerFactory( + appShell, + controller, + outputTracker, + context, + formatters + ); + this.executionFactory = new CellExecutionFactory(controller, factory); } public get onPreExecute() { diff --git a/src/platform/common/refBool.node.ts b/src/platform/common/refBool.ts similarity index 67% rename from src/platform/common/refBool.node.ts rename to src/platform/common/refBool.ts index c0ef43a20777..22b0421b5e9f 100644 --- a/src/platform/common/refBool.node.ts +++ b/src/platform/common/refBool.ts @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + export class RefBool { constructor(private val: boolean) {}