diff --git a/src/notebooks/execution/cellExecutionMessageHandler.ts b/src/notebooks/execution/cellExecutionMessageHandler.ts index cd21f352ac95..e9c5fac48cc9 100644 --- a/src/notebooks/execution/cellExecutionMessageHandler.ts +++ b/src/notebooks/execution/cellExecutionMessageHandler.ts @@ -5,7 +5,7 @@ import * as fastDeepEqual from 'fast-deep-equal'; import type * as nbformat from '@jupyterlab/nbformat'; -import type * as KernelMessage from '@jupyterlab/services/lib/kernel/messages'; +import * as KernelMessage from '@jupyterlab/services/lib/kernel/messages'; import { NotebookCell, NotebookCellExecution, @@ -30,7 +30,6 @@ import { CellExecutionCreator } from './cellExecutionCreator'; import { IApplicationShell } from '../../platform/common/application/types'; import { disposeAllDisposables } from '../../platform/common/helpers'; import { traceError, traceWarning } from '../../platform/logging'; -import { RefBool } from '../../platform/common/refBool'; import { IDisposable, IExtensionContext } from '../../platform/common/types'; import { traceCellMessage, cellOutputToVSCCellOutput, translateCellDisplayOutput, isJupyterNotebook } from '../helpers'; import { formatStreamText, concatMultilineString } from '../../webviews/webview-side/common'; @@ -39,6 +38,7 @@ import { noop } from '../../platform/common/utils/misc'; import { ITracebackFormatter } from '../../kernels/types'; import { handleTensorBoardDisplayDataOutput } from './executionHelpers'; import { WIDGET_MIMETYPE } from '../../kernels/ipywidgets-message-coordination/constants'; +import { CellOutputMimeTypes } from '../types'; // Helper interface for the set_next_input execute reply payload interface ISetNextInputPayload { @@ -60,6 +60,26 @@ type DisplayData = nbformat.IDisplayData & { */ export const activeNotebookCellExecution = new WeakMap(); +function getParentHeaderMsgId(msg: KernelMessage.IMessage): string | undefined { + if (msg.parent_header && 'msg_id' in msg.parent_header) { + return msg.parent_header.msg_id; + } + return undefined; +} + +function canMimeTypeBeRenderedByWidgetManager(mime: string) { + if (mime == CellOutputMimeTypes.stderr || mime == CellOutputMimeTypes.stdout || mime == CellOutputMimeTypes.error) { + // These are plain text mimetypes that can be rendered by the Jupyter Lab widget manager. + return true; + } + if (mime.startsWith('application/vnd')) { + // Custom vendored mimetypes cannot be rendered by the widget manager, it relies on the output renderers. + return false; + } + // Everything else can be rendered by the Jupyter Lab widget manager. + return true; +} + /** * Responsible for handling of jupyter messages as a result of execution of individual cells. */ @@ -73,10 +93,12 @@ export class CellExecutionMessageHandler implements IDisposable { */ private completedExecution?: boolean; /** - * Listen to messages and update our cell execution state appropriately - * Keep track of our clear state + * Jupyter can sent a `clear_output` message which indicates the output of a cell should be cleared. + * If the flag `wait` is set to `true`, then we should wait for the next output before clearing the output. + * I.e. if the value for `wait` is false (default) then clear the cell output immediately. + * https://ipywidgets.readthedocs.io/en/latest/examples/Output%20Widget.html#Output-widgets:-leveraging-Jupyter's-display-system */ - private readonly clearState = new RefBool(false); + private clearOutputOnNextUpdateToOutput?: boolean; private execution?: NotebookCellExecution; private readonly _onErrorHandlingIOPubMessage = new EventEmitter<{ @@ -98,6 +120,25 @@ export class CellExecutionMessageHandler implements IDisposable { * Because if after the stream we have an image, then the stream is not the last output item, hence its cleared. */ private lastUsedStreamOutput?: { stream: 'stdout' | 'stderr'; text: string; output: NotebookCellOutput }; + private outputsAreSpecificToAWidget?: { + /** + * Comm Id (or model_id) of widget that will handle all messages and render them via the widget manager. + * This could be a widget in another cell. + */ + handlingCommId: string; + /** + * All messages that have a parent_header.msg_id = msg_id will be swallowed and handled by the widget with model_id = this.handlingCommId. + * These requests could be from another cell, ie messages can original from one cell and end up getting displayed in another. + * E.g. widget is in cell 1 and output will be redirected from cell 2 into widget 1. + */ + msgIdsToSwallow: string; + /** + * If true, then we should clear all of the output owned by the widget defined by the commId. + * By owned, we mean the output added after the widget widget output and not the widget itself. + */ + clearOutputOnNextUpdateToOutput?: boolean; + }; + private commIdsMappedToParentWidgetModel = new Map(); private readonly disposables: IDisposable[] = []; private readonly prompts = new Set(); /** @@ -105,11 +146,12 @@ export class CellExecutionMessageHandler implements IDisposable { * or for any subsequent requests as a result of outputs sending custom messages. */ private readonly ownedCommIds = new Set(); + private readonly outputsOwnedByWidgetModel = new Map>(); /** * List of msg_ids of requests sent either as part of request_execute * or for any subsequent requests as a result of outputs sending custom messages. */ - private readonly ownedRequestIds = new Set(); + private readonly ownedRequestMsgIds = new Set(); constructor( public readonly cell: NotebookCell, private readonly applicationService: IApplicationShell, @@ -122,7 +164,7 @@ export class CellExecutionMessageHandler implements IDisposable { cellExecution: NotebookCellExecution ) { this.executeRequestMessageId = request.msg.header.msg_id; - this.ownedRequestIds.add(request.msg.header.msg_id); + this.ownedRequestMsgIds.add(request.msg.header.msg_id); workspace.onDidChangeNotebookDocument( (e) => { if (!isJupyterNotebook(e.notebook)) { @@ -142,16 +184,16 @@ export class CellExecutionMessageHandler implements IDisposable { this.disposables ); this.execution = cellExecution; - request.onIOPub = (msg) => { + // We're in all messages. + // When using the `interact` function in Python, we can get outputs from comm messages even before execution has completed. + // See https://github.com/microsoft/vscode-jupyter/issues/9503 for more information on why we need to monitor anyMessage and iopubMessage signals. + this.kernel.anyMessage.connect(this.onKernelAnyMessage, this); + this.kernel.iopubMessage.connect(this.onKernelIOPubMessage, this); + + request.onIOPub = () => { // Cell has been deleted or the like. - if (this.cell.document.isClosed) { + if (this.cell.document.isClosed && !this.completedExecution) { request.dispose(); - return; - } - try { - this.handleIOPub(msg); - } catch (ex) { - this._onErrorHandlingIOPubMessage.fire(ex); } }; request.onReply = (msg) => { @@ -166,11 +208,6 @@ export class CellExecutionMessageHandler implements IDisposable { request.done .finally(() => { this.completedExecution = true; - // We're only interested in messages after execution has completed. - // See https://github.com/microsoft/vscode-jupyter/issues/9503 for more information. - // this.kernel.anyMessage.connect(this.onKernelAnyMessage, this); - // this.kernel.iopubMessage.connect(this.onKernelIOPubMessage, this); - this.endCellExecution(); }) .catch(noop); @@ -201,7 +238,7 @@ export class CellExecutionMessageHandler implements IDisposable { this.clearLastUsedStreamOutput(); this.execution = undefined; - if (this.ownedCommIds.size === 0 && this.completedExecution) { + if (this.cell.document.isClosed || (this.ownedCommIds.size === 0 && this.completedExecution)) { // If no comms channels were opened as a result of any outputs of this cell, // this means we don't have any widgets that can send comm message back to the kernel. // Hence no point listening to any of the iopub messages & the like, i.e. we can stop listening to everything in this class. @@ -209,6 +246,10 @@ export class CellExecutionMessageHandler implements IDisposable { } } private onKernelAnyMessage(_: unknown, { direction, msg }: Kernel.IAnyMessageArgs) { + if (this.cell.document.isClosed) { + return this.endCellExecution(); + } + // We're only interested in messages after execution has completed. // See https://github.com/microsoft/vscode-jupyter/issues/9503 for more information. if (direction !== 'send' || !this.completedExecution) { @@ -219,10 +260,13 @@ export class CellExecutionMessageHandler implements IDisposable { if (jupyterLab.KernelMessage.isCommMsgMsg(msg) && this.ownedCommIds.has(msg.content.comm_id)) { // Looks like we have a comm msg request sent by some output or the like. // See https://github.com/microsoft/vscode-jupyter/issues/9503 for more information. - this.ownedRequestIds.add(msg.header.msg_id); + this.ownedRequestMsgIds.add(msg.header.msg_id); } } private onKernelIOPubMessage(_: unknown, msg: KernelMessage.IIOPubMessage) { + if (this.cell.document.isClosed) { + return this.endCellExecution(); + } // We're only interested in messages after execution has completed. // See https://github.com/microsoft/vscode-jupyter/issues/9503 for more information. @@ -230,11 +274,9 @@ export class CellExecutionMessageHandler implements IDisposable { // comm message (requests) sent by an output widget. // See https://github.com/microsoft/vscode-jupyter/issues/9503 for more information. if ( - !this.completedExecution || !msg.parent_header || !('msg_id' in msg.parent_header) || - msg.parent_header.msg_id === this.executeRequestMessageId || - !this.ownedRequestIds.has(msg.parent_header.msg_id) || + !this.ownedRequestMsgIds.has(msg.parent_header.msg_id) || msg.channel !== 'iopub' ) { return; @@ -243,6 +285,11 @@ export class CellExecutionMessageHandler implements IDisposable { this.handleIOPub(msg); } catch (ex) { traceError(`Failed to handle iopub message as a result of some comm message`, msg, ex); + if (!this.completedExecution && !this.cell.document.isClosed) { + // If there are problems handling the execution, then bubble those to the calling code. + // Else just log the errors. + this._onErrorHandlingIOPubMessage.fire(ex); + } } } private clearLastUsedStreamOutput() { @@ -321,7 +368,7 @@ export class CellExecutionMessageHandler implements IDisposable { } else if (jupyterLab.KernelMessage.isCommOpenMsg(msg)) { // Noop. } else if (jupyterLab.KernelMessage.isCommMsgMsg(msg)) { - // Noop. + this.handleCommMsg(msg); } else if (jupyterLab.KernelMessage.isCommCloseMsg(msg)) { // Noop. } else { @@ -333,8 +380,76 @@ export class CellExecutionMessageHandler implements IDisposable { this.execution.executionOrder = msg.content.execution_count; } } + private handleCommMsg(msg: KernelMessage.ICommMsgMsg) { + const data = msg.content.data as Partial<{ + method: 'update'; + state: { msg_id: string } | { children: string[] }; + }>; + if (!isObject(data) || data.method !== 'update' || !isObject(data.state)) { + return; + } - private addToCellData(output: ExecuteResult | DisplayData | nbformat.IStream | nbformat.IError | nbformat.IOutput) { + if ('msg_id' in data.state && typeof data.state.msg_id === 'string') { + // When such a comm message is received, then + // the kernel is instructing the front end (UI widget manager) + // to handle all of the messages that would have other wise been handled as regular execution messages for msg_id. + const parentHeader = 'msg_id' in msg.parent_header ? msg.parent_header : undefined; + if ( + this.ownedRequestMsgIds.has(msg.content.comm_id) || + (parentHeader && this.ownedRequestMsgIds.has(parentHeader.msg_id)) + ) { + if (data.state.msg_id) { + // Any future messages sent from `parent_header.msg_id = msg_id` must be handled by the widget with the `mode_id = msg.content.comm_id`. + this.outputsAreSpecificToAWidget = { + handlingCommId: msg.content.comm_id, + msgIdsToSwallow: data.state.msg_id + }; + } else if (this.outputsAreSpecificToAWidget?.handlingCommId === msg.content.comm_id) { + // Handle all messages the normal way. + this.outputsAreSpecificToAWidget = undefined; + } + } + } else if ( + 'children' in data.state && + Array.isArray(data.state.children) && + this.ownedCommIds.has(msg.content.comm_id) + ) { + // This is the kernel instructing the widget manager that some outputs (comm_ids) + // are in fact children of another output (comm). + // We need to keep track of this so that we know who the common parent is. + const IPY_MODEL_PREFIX = 'IPY_MODEL_'; + data.state.children.forEach((item) => { + if (typeof item !== 'string') { + return traceWarning(`Came across a comm update message a child that isn't a string`, item); + } + if (!item.startsWith(IPY_MODEL_PREFIX)) { + return traceWarning( + `Came across a comm update message a child that start start with ${IPY_MODEL_PREFIX}`, + item + ); + } + const commId = item.substring(IPY_MODEL_PREFIX.length); + this.ownedCommIds.add(commId); + this.commIdsMappedToParentWidgetModel.set(commId, msg.content.comm_id); + }); + } + } + private clearOutputIfNecessary(execution: NotebookCellExecution | undefined): { + previousValueOfClearOutputOnNextUpdateToOutput: boolean; + } { + if (this.clearOutputOnNextUpdateToOutput) { + traceCellMessage(this.cell, 'Clear cell output'); + this.clearLastUsedStreamOutput(); + execution?.clearOutput().then(noop, noop); + this.clearOutputOnNextUpdateToOutput = false; + return { previousValueOfClearOutputOnNextUpdateToOutput: true }; + } + return { previousValueOfClearOutputOnNextUpdateToOutput: false }; + } + private addToCellData( + output: ExecuteResult | DisplayData | nbformat.IStream | nbformat.IError | nbformat.IOutput, + originalMessage: KernelMessage.IMessage + ) { if ( this.context.extensionMode === ExtensionMode.Test && output.data && @@ -358,12 +473,8 @@ export class CellExecutionMessageHandler implements IDisposable { } traceCellMessage(this.cell, 'Update output'); // Clear if necessary - if (this.clearState.value) { - this.clearLastUsedStreamOutput(); - this.execution?.clearOutput().then(noop, noop); - this.clearState.update(false); - } - // Keep track of the displa_id against the output item, we might need this to update this later. + this.clearOutputIfNecessary(this.execution); + // Keep track of the display_id against the output item, we might need this to update this later. if (displayId) { this.outputDisplayIdTracker.trackOutputByDisplayId(this.cell, displayId, cellOutput); } @@ -375,10 +486,99 @@ export class CellExecutionMessageHandler implements IDisposable { const task = this.execution || this.createTemporaryTask(); this.clearLastUsedStreamOutput(); traceCellMessage(this.cell, 'Append output in addToCellData'); - task?.appendOutput([cellOutput]).then(noop, noop); + // If the output belongs to a widget, then add the output to that specific widget (i.e. just below the widget). + let outputShouldBeAppended = true; + const parentHeaderMsgId = getParentHeaderMsgId(originalMessage); + if ( + this.outputsAreSpecificToAWidget && + this.outputsAreSpecificToAWidget?.msgIdsToSwallow === parentHeaderMsgId && + cellOutput.items.every((item) => canMimeTypeBeRenderedByWidgetManager(item.mime)) + ) { + // Plain text outputs will be displayed by the widget. + outputShouldBeAppended = false; + } else if ( + this.outputsAreSpecificToAWidget && + this.outputsAreSpecificToAWidget?.msgIdsToSwallow === parentHeaderMsgId + ) { + const result = this.updateWidgetOwnedOutput( + { commId: this.outputsAreSpecificToAWidget.handlingCommId, outputToAppend: cellOutput }, + task + ); + + if (result?.outputAdded) { + outputShouldBeAppended = false; + } + } + if (outputShouldBeAppended) { + task?.appendOutput([cellOutput]).then(noop, noop); + } this.endTemporaryTask(); } + private updateWidgetOwnedOutput( + options: { outputToAppend: NotebookCellOutput; commId: string } | { clearOutput: true }, + task?: NotebookCellExecution + ): { outputAdded: true } | undefined { + const commId = 'commId' in options ? options.commId : this.outputsAreSpecificToAWidget?.handlingCommId; + if (!commId) { + return; + } + const outputToAppend = 'outputToAppend' in options ? options.outputToAppend : undefined; + const expectedModelId = this.commIdsMappedToParentWidgetModel.get(commId) || commId; + const widgetOutput = this.cell.outputs.find((output) => { + return output.items.find((outputItem) => { + if (outputItem.mime !== WIDGET_MIMETYPE) { + return false; + } + try { + const value = JSON.parse(Buffer.from(outputItem.data).toString()) as { model_id?: string }; + return value.model_id === expectedModelId; + } catch (ex) { + traceWarning(`Failed to deserialize the widget data`, ex); + } + return false; + }); + }); + if (!widgetOutput) { + return; + } + const outputsOwnedByWidgetModel = this.outputsOwnedByWidgetModel.get(expectedModelId) || new Set(); + + // We have some new outputs, that need to be placed immediately after the widget and before any other output + // that doesn't belong to the widget. + const clearWidgetOutput = this.outputsAreSpecificToAWidget?.clearOutputOnNextUpdateToOutput === true; + if (this.outputsAreSpecificToAWidget) { + this.outputsAreSpecificToAWidget.clearOutputOnNextUpdateToOutput = false; + } + const newOutputs = this.cell.outputs.slice().filter((item) => { + if (clearWidgetOutput) { + // If we're supposed to clear the output, then clear all of the output that's + // specific to this widget. + // These are tracked further below. + return !outputsOwnedByWidgetModel.has(item.id); + } else { + return true; + } + }); + + const outputsUptoWidget = newOutputs.slice(0, newOutputs.indexOf(widgetOutput) + 1); + const outputsAfterWidget = newOutputs.slice(newOutputs.indexOf(widgetOutput) + 1); + + this.outputsOwnedByWidgetModel.set(expectedModelId, outputsOwnedByWidgetModel); + if (outputToAppend) { + // Keep track of the output added that belongs to the widget. + // Next time when we need to clear the output belonging to this widget, all we need to do is + // filter out (exclude) these outputs. + outputsOwnedByWidgetModel.add(outputToAppend.id); + } + + // Ensure the new output is added just after the widget. + const newOutput = outputToAppend + ? outputsUptoWidget.concat(outputToAppend).concat(outputsAfterWidget) + : outputsUptoWidget.concat(outputsAfterWidget); + task?.replaceOutput(newOutput).then(noop, noop); + return { outputAdded: true }; + } private async handleInputRequest(msg: KernelMessage.IStdinMessage) { // Ask the user for input if (msg.content && 'prompt' in msg.content) { @@ -405,14 +605,17 @@ export class CellExecutionMessageHandler implements IDisposable { // See this for docs on the messages: // https://jupyter-client.readthedocs.io/en/latest/messaging.html#messaging-in-jupyter private handleExecuteResult(msg: KernelMessage.IExecuteResultMsg) { - this.addToCellData({ - output_type: 'execute_result', - data: msg.content.data, - metadata: msg.content.metadata, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - transient: msg.content.transient as any, // NOSONAR - execution_count: msg.content.execution_count - }); + this.addToCellData( + { + output_type: 'execute_result', + data: msg.content.data, + metadata: msg.content.metadata, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + transient: msg.content.transient as any, // NOSONAR + execution_count: msg.content.execution_count + }, + msg + ); } private handleExecuteReply(msg: KernelMessage.IExecuteReplyMsg) { @@ -428,15 +631,18 @@ export class CellExecutionMessageHandler implements IDisposable { this.handleSetNextInput(payload as unknown as ISetNextInputPayload); } if (payload.data && payload.data.hasOwnProperty('text/plain')) { - this.addToCellData({ - // Mark as stream output so the text is formatted because it likely has ansi codes in it. - output_type: 'stream', - // eslint-disable-next-line @typescript-eslint/no-explicit-any - text: (payload.data as any)['text/plain'].toString(), - name: 'stdout', - metadata: {}, - execution_count: reply.execution_count - }); + this.addToCellData( + { + // Mark as stream output so the text is formatted because it likely has ansi codes in it. + output_type: 'stream', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + text: (payload.data as any)['text/plain'].toString(), + name: 'stdout', + metadata: {}, + execution_count: reply.execution_count + }, + msg + ); } }); } @@ -477,6 +683,14 @@ export class CellExecutionMessageHandler implements IDisposable { traceCellMessage(this.cell, `Kernel switching to ${msg.content.execution_state}`); } private handleStreamMessage(msg: KernelMessage.IStreamMsg) { + if ( + getParentHeaderMsgId(msg) && + this.outputsAreSpecificToAWidget?.msgIdsToSwallow == getParentHeaderMsgId(msg) + ) { + // Stream messages will be handled by the widget output. + return; + } + // eslint-disable-next-line complexity traceCellMessage(this.cell, 'Update streamed output'); // Possible execution of cell has completed (the task would have been disposed). @@ -485,13 +699,7 @@ export class CellExecutionMessageHandler implements IDisposable { const task = this.execution || this.createTemporaryTask(); // Clear output if waiting for a clear - const clearOutput = this.clearState.value; - if (clearOutput) { - traceCellMessage(this.cell, 'Clear cell output'); - this.clearLastUsedStreamOutput(); - task?.clearOutput().then(noop, noop); - this.clearState.update(false); - } + const { previousValueOfClearOutputOnNextUpdateToOutput } = this.clearOutputIfNecessary(task); // Ensure we append to previous output, only if the streams as the same & // If the last output is the desired stream type. if (this.lastUsedStreamOutput?.stream === msg.content.name) { @@ -524,7 +732,7 @@ export class CellExecutionMessageHandler implements IDisposable { }); traceCellMessage(this.cell, `Replace output items ${this.lastUsedStreamOutput.text.substring(0, 100)}`); task?.replaceOutputItems(output.items, this.lastUsedStreamOutput.output).then(noop, noop); - } else if (clearOutput) { + } else if (previousValueOfClearOutputOnNextUpdateToOutput) { // Replace the current outputs with a single new output. const text = formatStreamText(concatMultilineString(msg.content.text)); const output = cellOutputToVSCCellOutput({ @@ -558,14 +766,28 @@ export class CellExecutionMessageHandler implements IDisposable { // eslint-disable-next-line @typescript-eslint/no-explicit-any transient: msg.content.transient as any // NOSONAR }; - this.addToCellData(output); + this.addToCellData(output, msg); } private handleClearOutput(msg: KernelMessage.IClearOutputMsg) { - // If the message says wait, add every message type to our clear state. This will - // make us wait for this type of output before we clear it. - if (msg && msg.content.wait) { - this.clearState.update(true); + // Check if this message should be handled by a specific Widget output. + if ( + this.outputsAreSpecificToAWidget && + this.outputsAreSpecificToAWidget.msgIdsToSwallow === getParentHeaderMsgId(msg) + ) { + if (msg.content.wait) { + this.outputsAreSpecificToAWidget.clearOutputOnNextUpdateToOutput = true; + } else { + const task = this.execution || this.createTemporaryTask(); + this.updateWidgetOwnedOutput({ clearOutput: true }, task); + this.endTemporaryTask(); + } + return; + } + + // Regular output. + if (msg.content.wait) { + this.clearOutputOnNextUpdateToOutput = true; } else { // Possible execution of cell has completed (the task would have been disposed). // This message could have come from a background thread. @@ -590,7 +812,7 @@ export class CellExecutionMessageHandler implements IDisposable { traceback }; - this.addToCellData(output); + this.addToCellData(output, msg); this.cellHasErrorsInOutput = true; } diff --git a/src/platform/common/refBool.ts b/src/platform/common/refBool.ts deleted file mode 100644 index 22b0421b5e9f..000000000000 --- a/src/platform/common/refBool.ts +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -export class RefBool { - constructor(private val: boolean) {} - - public get value(): boolean { - return this.val; - } - - public update(newVal: boolean) { - this.val = newVal; - } -}