Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Results for comm messages sent from widgets should get handled by the same cell (part 3) #10157

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 130 additions & 97 deletions TELEMETRY.md

Large diffs are not rendered by default.

64 changes: 44 additions & 20 deletions src/kernels/common/baseJupyterSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { ChainingExecuteRequester } from './chainingExecuteRequester';
import { getResourceType } from '../../platform/common/utils';
import { KernelProgressReporter } from '../../platform/progress/kernelProgressReporter';
import { isTestExecution } from '../../platform/common/constants';
import { KernelConnectionWrapper } from './kernelConnectionWrapper';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function suppressShutdownErrors(realKernel: any) {
Expand Down Expand Up @@ -71,6 +72,13 @@ export class JupyterSessionStartError extends WrappedError {
}

export abstract class BaseJupyterSession implements IJupyterSession {
/**
* Keep a single instance of KernelConnectionWrapper.
* This way when sessions change, we still have a single Kernel.IKernelConnection proxy (wrapper),
* which will have all of the event handlers bound to it.
* This allows consumers to add event handlers hand not worry about internals & can use the lower level Jupyter API.
*/
private _wrappedKernel?: KernelConnectionWrapper;
private _isDisposed?: boolean;
private readonly _disposed = new EventEmitter<void>();
protected readonly disposables: IDisposable[] = [];
Expand All @@ -83,28 +91,30 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected get session(): ISessionWithSocket | undefined {
return this._session;
}
public get kernelId(): string {
return this.session?.kernel?.id || '';
}
public get kernel(): Kernel.IKernelConnection | undefined {
return this._session?.kernel || undefined;
if (this._wrappedKernel) {
return this._wrappedKernel;
}
if (!this._session?.kernel) {
return;
}
this._wrappedKernel = new KernelConnectionWrapper(this._session.kernel, this.disposables);
return this._wrappedKernel;
}

public get kernelSocket(): Observable<KernelSocketInformation | undefined> {
return this._kernelSocket;
}
public get onSessionStatusChanged(): Event<KernelMessage.Status> {
return this.onStatusChangedEvent.event;
}
public get onIOPubMessage(): Event<KernelMessage.IIOPubMessage> {
if (!this.ioPubEventEmitter) {
this.ioPubEventEmitter = new EventEmitter<KernelMessage.IIOPubMessage>();
}
return this.ioPubEventEmitter.event;
}

public get status(): KernelMessage.Status {
return this.getServerStatus();
}

public abstract get kernelId(): string;

public get isConnected(): boolean {
return this.connected;
}
Expand All @@ -115,11 +125,9 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected restartSessionPromise?: { token: CancellationTokenSource; promise: Promise<ISessionWithSocket> };
private _session: ISessionWithSocket | undefined;
private _kernelSocket = new ReplaySubject<KernelSocketInformation | undefined>();
private ioPubEventEmitter = new EventEmitter<KernelMessage.IIOPubMessage>();
private ioPubHandler: Slot<ISessionWithSocket, KernelMessage.IIOPubMessage>;
private unhandledMessageHandler: Slot<ISessionWithSocket, KernelMessage.IMessage>;
private chainingExecute = new ChainingExecuteRequester();

private previousAnyMessageHandler?: IDisposable;
constructor(
public readonly kind: 'localRaw' | 'remoteJupyter' | 'localJupyter',
protected resource: Resource,
Expand All @@ -128,7 +136,6 @@ export abstract class BaseJupyterSession implements IJupyterSession {
private readonly interruptTimeout: number
) {
this.statusHandler = this.onStatusChanged.bind(this);
this.ioPubHandler = (_s, m) => this.ioPubEventEmitter.fire(m);
this.unhandledMessageHandler = (_s, m) => {
traceInfo(`Unhandled message found: ${m.header.msg_type}`);
};
Expand Down Expand Up @@ -345,10 +352,8 @@ export abstract class BaseJupyterSession implements IJupyterSession {
// Changes the current session.
protected setSession(session: ISessionWithSocket | undefined, forceUpdateKernelSocketInfo: boolean = false) {
const oldSession = this._session;
this.previousAnyMessageHandler?.dispose();
if (oldSession) {
if (this.ioPubHandler) {
oldSession.iopubMessage.disconnect(this.ioPubHandler);
}
if (this.unhandledMessageHandler) {
oldSession.unhandledMessage.disconnect(this.unhandledMessageHandler);
}
Expand All @@ -358,11 +363,29 @@ export abstract class BaseJupyterSession implements IJupyterSession {
}
this._session = session;
if (session) {
if (session.kernel && this._wrappedKernel) {
this._wrappedKernel.changeKernel(session.kernel);
}

// Listen for session status changes
session.statusChanged.connect(this.statusHandler);

if (session.iopubMessage) {
session.iopubMessage.connect(this.ioPubHandler);
if (session.kernelSocketInformation.socket?.onAnyMessage) {
// These messages are sent directly to the kernel bypassing the Jupyter lab npm libraries.
// As a result, we don't get any notification that messages were sent (on the anymessage signal).
// To ensure those signals can still be used to monitor such messages, send them via a callback so that we can emit these messages on the anymessage signal.
this.previousAnyMessageHandler = session.kernelSocketInformation.socket?.onAnyMessage((msg) => {
try {
if (this._wrappedKernel) {
const jupyterLabSerialize =
require('@jupyterlab/services/lib/kernel/serialize') as typeof import('@jupyterlab/services/lib/kernel/serialize'); // NOSONAR
const message =
typeof msg.msg === 'string' ? jupyterLabSerialize.deserialize(msg.msg) : msg.msg;
this._wrappedKernel.anyMessage.emit({ direction: msg.direction, msg: message });
}
} catch (ex) {
traceWarning(`failed to deserialize message to broadcast anymessage signal`);
}
});
}
if (session.unhandledMessage) {
session.unhandledMessage.connect(this.unhandledMessageHandler);
Expand Down Expand Up @@ -443,6 +466,7 @@ export abstract class BaseJupyterSession implements IJupyterSession {
this._disposed.fire();
this._disposed.dispose();
this.onStatusChangedEvent.dispose();
this.previousAnyMessageHandler?.dispose();
}
disposeAllDisposables(this.disposables);
traceVerbose('Shutdown session -- complete');
Expand Down
73 changes: 73 additions & 0 deletions src/kernels/common/kernelConnectionWrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import type { Kernel } from '@jupyterlab/services';
import { IDisposable } from '../../platform/common/types';
import { BaseKernelConnectionWrapper } from '../jupyter/baseKernelConnectionWrapper';

export class KernelConnectionWrapper extends BaseKernelConnectionWrapper {
/**
* Use `kernelConnection` to access the value as its not a constant (can change over time).
* E.g. when restarting kernels or the like.
*/
private _kernelConnection!: Kernel.IKernelConnection;
protected get possibleKernelConnection(): undefined | Kernel.IKernelConnection {
return this._kernelConnection;
}
public get kernel() {
return this._kernelConnection;
}

constructor(kernel: Kernel.IKernelConnection, disposables: IDisposable[]) {
super(kernel, disposables);
this._kernelConnection = kernel;
}
public changeKernel(kernel: Kernel.IKernelConnection) {
if (this.kernel === kernel) {
return;
}
this.stopHandlingKernelMessages(this.possibleKernelConnection!);
this._kernelConnection = kernel;
this.startHandleKernelMessages(kernel);
}
async shutdown(): Promise<void> {
await this._kernelConnection.shutdown();
}
dispose(): void {
this._kernelConnection.dispose();
}
async interrupt(): Promise<void> {
await this._kernelConnection.interrupt();
}
async restart(): Promise<void> {
await this._kernelConnection.restart();
}
protected override startHandleKernelMessages(kernelConnection: Kernel.IKernelConnection) {
super.startHandleKernelMessages(kernelConnection);
this._kernelConnection = kernelConnection;
kernelConnection.connectionStatusChanged.connect(this.onConnectionStatusChanged, this);
kernelConnection.statusChanged.connect(this.onStatusChanged, this);
kernelConnection.disposed.connect(this.onDisposed, this);
}
protected override stopHandlingKernelMessages(kernelConnection: Kernel.IKernelConnection): void {
super.stopHandlingKernelMessages(kernelConnection);
kernelConnection.connectionStatusChanged.disconnect(this.onConnectionStatusChanged, this);
kernelConnection.statusChanged.disconnect(this.onStatusChanged, this);
kernelConnection.disposed.disconnect(this.onDisposed, this);
}
private onDisposed(connection: Kernel.IKernelConnection) {
if (connection === this.possibleKernelConnection) {
this.disposed.emit();
}
}
private onStatusChanged(connection: Kernel.IKernelConnection, args: Kernel.Status) {
if (connection === this.possibleKernelConnection) {
this.statusChanged.emit(args);
}
}
private onConnectionStatusChanged(connection: Kernel.IKernelConnection, args: Kernel.ConnectionStatus) {
if (connection === this.possibleKernelConnection) {
this.connectionStatusChanged.emit(args);
}
}
}
4 changes: 4 additions & 0 deletions src/kernels/common/kernelSocketWrapper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { EventEmitter } from 'vscode';
import * as WebSocketWS from 'ws';
import { ClassType } from '../../platform/ioc/types';
import { traceError } from '../../platform/logging';
Expand Down Expand Up @@ -59,6 +60,8 @@ export function KernelSocketWrapper<T extends ClassType<IWebSocketLike>>(SuperCl
this.receiveHooks = [];
this.sendHooks = [];
}
private _onAnyMessage = new EventEmitter<{ msg: string; direction: 'send' }>();
public onAnyMessage = this._onAnyMessage.event;

protected patchSuperEmit(patch: (event: string | symbol, ...args: any[]) => boolean) {
super.emit = patch;
Expand All @@ -67,6 +70,7 @@ export function KernelSocketWrapper<T extends ClassType<IWebSocketLike>>(SuperCl
public sendToRealKernel(data: any, a2: any) {
// This will skip the send hooks. It's coming from
// the UI side.
this._onAnyMessage.fire({ msg: data, direction: 'send' });
super.send(data, a2);
}

Expand Down
27 changes: 15 additions & 12 deletions src/kernels/debugger/kernelDebugAdapterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
DebugAdapter,
DebugProtocolMessage,
DebugSession,
Disposable,
Event,
EventEmitter,
NotebookCell,
Expand All @@ -33,7 +34,6 @@ import {
IDebugInfoResponse
} from './types';
import { sendTelemetryEvent } from '../../telemetry';
import { IDisposable } from '../../platform/common/types';
import { traceError, traceInfo, traceInfoIfCI, traceVerbose } from '../../platform/logging';
import {
assertIsDebugConfig,
Expand All @@ -42,6 +42,7 @@ import {
getMessageSourceAndHookIt
} from '../../notebooks/debugger/helper';
import { ResourceMap } from '../../platform/vscode-path/map';
import { IDisposable } from '../../platform/common/types';

/**
* For info on the custom requests implemented by jupyter see:
Expand Down Expand Up @@ -89,18 +90,9 @@ export abstract class KernelDebugAdapterBase implements DebugAdapter, IKernelDeb
this.debugCell = notebookDocument.cellAt(configuration.__cellIndex!);
}

this.jupyterSession.kernel?.iopubMessage.connect(this.onIOPubMessage, this);
this.disposables.push(
this.jupyterSession.onIOPubMessage(async (msg: KernelMessage.IIOPubMessage) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const anyMsg = msg as any;
traceInfoIfCI(`Debug IO Pub message: ${JSON.stringify(msg)}`);
if (anyMsg.header.msg_type === 'debug_event') {
this.trace('event', JSON.stringify(msg));
if (!(await this.delegate?.willSendEvent(anyMsg))) {
this.sendMessage.fire(msg.content);
}
}
})
new Disposable(() => this.jupyterSession.kernel?.iopubMessage.disconnect(this.onIOPubMessage, this))
);

if (this.kernel) {
Expand Down Expand Up @@ -157,6 +149,17 @@ export abstract class KernelDebugAdapterBase implements DebugAdapter, IKernelDeb
traceVerbose(`[Debug] ${tag}: ${msg}`);
}

async onIOPubMessage(_: unknown, msg: KernelMessage.IIOPubMessage) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const anyMsg = msg as any;
traceInfoIfCI(`Debug IO Pub message: ${JSON.stringify(msg)}`);
if (anyMsg.header.msg_type === 'debug_event') {
this.trace('event', JSON.stringify(msg));
if (!(await this.delegate?.willSendEvent(anyMsg))) {
this.sendMessage.fire(msg.content);
}
}
}
async handleMessage(message: DebugProtocol.ProtocolMessage) {
try {
traceInfoIfCI(`KernelDebugAdapter::handleMessage ${JSON.stringify(message, undefined, ' ')}`);
Expand Down
Loading