Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared kernelconnection wrapper #10141

Merged
merged 1 commit into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 130 additions & 97 deletions TELEMETRY.md

Large diffs are not rendered by default.

63 changes: 44 additions & 19 deletions src/kernels/common/baseJupyterSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { ChainingExecuteRequester } from './chainingExecuteRequester';
import { getResourceType } from '../../platform/common/utils';
import { KernelProgressReporter } from '../../platform/progress/kernelProgressReporter';
import { isTestExecution } from '../../platform/common/constants';
import { KernelConnectionWrapper } from './kernelConnectionWrapper';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function suppressShutdownErrors(realKernel: any) {
Expand Down Expand Up @@ -71,6 +72,13 @@ export class JupyterSessionStartError extends WrappedError {
}

export abstract class BaseJupyterSession implements IJupyterSession {
/**
* Keep a single instance of KernelConnectionWrapper.
* This way when sessions change, we still have a single Kernel.IKernelConnection proxy (wrapper),
* which will have all of the event handlers bound to it.
* This allows consumers to add event handlers hand not worry about internals & can use the lower level Jupyter API.
*/
private _wrappedKernel?: KernelConnectionWrapper;
private _isDisposed?: boolean;
private readonly _disposed = new EventEmitter<void>();
protected readonly disposables: IDisposable[] = [];
Expand All @@ -83,28 +91,30 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected get session(): ISessionWithSocket | undefined {
return this._session;
}
public get kernelId(): string {
return this.session?.kernel?.id || '';
}
public get kernel(): Kernel.IKernelConnection | undefined {
return this._session?.kernel || undefined;
if (this._wrappedKernel) {
return this._wrappedKernel;
}
if (!this._session?.kernel) {
return;
}
this._wrappedKernel = new KernelConnectionWrapper(this._session.kernel, this.disposables);
return this._wrappedKernel;
}

public get kernelSocket(): Observable<KernelSocketInformation | undefined> {
return this._kernelSocket;
}
public get onSessionStatusChanged(): Event<KernelMessage.Status> {
return this.onStatusChangedEvent.event;
}
public get onIOPubMessage(): Event<KernelMessage.IIOPubMessage> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to hook up to the iopub and anymessage events in the underlyiing Jupyter kernel connection.
This property onIOPubMessage is a custom event we wire up. If I were to introduce anyMessage event then i'd have to handle kernel restarts all over again, tomorrow if we wanted to handle other events, then all over gain etc...

Also, as we expose the underlying jupyter kernel (Kernel.IKenrelConnection) everywhere it makes sense to just hook to those events.

Hence created a kernel wrapper so that we have a single instance, of the Kernel.KernleConnection on which we can monitor the events (basically moved onIOPubMessage & similar events into the kernel.IKernelConnection and have that keep track of all the event handlers even when kernels restart)

if (!this.ioPubEventEmitter) {
this.ioPubEventEmitter = new EventEmitter<KernelMessage.IIOPubMessage>();
}
return this.ioPubEventEmitter.event;
}

public get status(): KernelMessage.Status {
return this.getServerStatus();
}

public abstract get kernelId(): string;

public get isConnected(): boolean {
return this.connected;
}
Expand All @@ -115,10 +125,9 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected restartSessionPromise?: { token: CancellationTokenSource; promise: Promise<ISessionWithSocket> };
private _session: ISessionWithSocket | undefined;
private _kernelSocket = new ReplaySubject<KernelSocketInformation | undefined>();
private ioPubEventEmitter = new EventEmitter<KernelMessage.IIOPubMessage>();
private ioPubHandler: Slot<ISessionWithSocket, KernelMessage.IIOPubMessage>;
private unhandledMessageHandler: Slot<ISessionWithSocket, KernelMessage.IMessage>;
private chainingExecute = new ChainingExecuteRequester();
private previousAnyMessageHandler?: IDisposable;

constructor(
public readonly kind: 'localRaw' | 'remoteJupyter' | 'localJupyter',
Expand All @@ -128,7 +137,6 @@ export abstract class BaseJupyterSession implements IJupyterSession {
private readonly interruptTimeout: number
) {
this.statusHandler = this.onStatusChanged.bind(this);
this.ioPubHandler = (_s, m) => this.ioPubEventEmitter.fire(m);
this.unhandledMessageHandler = (_s, m) => {
traceInfo(`Unhandled message found: ${m.header.msg_type}`);
};
Expand Down Expand Up @@ -345,10 +353,8 @@ export abstract class BaseJupyterSession implements IJupyterSession {
// Changes the current session.
protected setSession(session: ISessionWithSocket | undefined, forceUpdateKernelSocketInfo: boolean = false) {
const oldSession = this._session;
this.previousAnyMessageHandler?.dispose();
if (oldSession) {
if (this.ioPubHandler) {
oldSession.iopubMessage.disconnect(this.ioPubHandler);
}
if (this.unhandledMessageHandler) {
oldSession.unhandledMessage.disconnect(this.unhandledMessageHandler);
}
Expand All @@ -358,11 +364,29 @@ export abstract class BaseJupyterSession implements IJupyterSession {
}
this._session = session;
if (session) {
if (session.kernel && this._wrappedKernel) {
this._wrappedKernel.changeKernel(session.kernel);
}

// Listen for session status changes
session.statusChanged.connect(this.statusHandler);

if (session.iopubMessage) {
session.iopubMessage.connect(this.ioPubHandler);
if (session.kernelSocketInformation.socket?.onAnyMessage) {
// These messages are sent directly to the kernel bypassing the Jupyter lab npm libraries.
// As a result, we don't get any notification that messages were sent (on the anymessage signal).
// To ensure those signals can still be used to monitor such messages, send them via a callback so that we can emit these messages on the anymessage signal.
this.previousAnyMessageHandler = session.kernelSocketInformation.socket?.onAnyMessage((msg) => {
try {
if (this._wrappedKernel) {
const jupyterLabSerialize =
require('@jupyterlab/services/lib/kernel/serialize') as typeof import('@jupyterlab/services/lib/kernel/serialize'); // NOSONAR
const message =
typeof msg.msg === 'string' ? jupyterLabSerialize.deserialize(msg.msg) : msg.msg;
this._wrappedKernel.anyMessage.emit({ direction: msg.direction, msg: message });
}
} catch (ex) {
traceWarning(`failed to deserialize message to broadcast anymessage signal`);
}
});
}
if (session.unhandledMessage) {
session.unhandledMessage.connect(this.unhandledMessageHandler);
Expand Down Expand Up @@ -443,6 +467,7 @@ export abstract class BaseJupyterSession implements IJupyterSession {
this._disposed.fire();
this._disposed.dispose();
this.onStatusChangedEvent.dispose();
this.previousAnyMessageHandler?.dispose();
}
disposeAllDisposables(this.disposables);
traceVerbose('Shutdown session -- complete');
Expand Down
73 changes: 73 additions & 0 deletions src/kernels/common/kernelConnectionWrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import type { Kernel } from '@jupyterlab/services';
import { IDisposable } from '../../platform/common/types';
import { BaseKernelConnectionWrapper } from '../jupyter/baseKernelConnectionWrapper';

export class KernelConnectionWrapper extends BaseKernelConnectionWrapper {
/**
* Use `kernelConnection` to access the value as its not a constant (can change over time).
* E.g. when restarting kernels or the like.
*/
private _kernelConnection!: Kernel.IKernelConnection;
protected get possibleKernelConnection(): undefined | Kernel.IKernelConnection {
return this._kernelConnection;
}
public get kernel() {
return this._kernelConnection;
}

constructor(kernel: Kernel.IKernelConnection, disposables: IDisposable[]) {
super(kernel, disposables);
this._kernelConnection = kernel;
}
public changeKernel(kernel: Kernel.IKernelConnection) {
if (this.kernel === kernel) {
return;
}
this.stopHandlingKernelMessages(this.possibleKernelConnection!);
this._kernelConnection = kernel;
this.startHandleKernelMessages(kernel);
}
async shutdown(): Promise<void> {
await this._kernelConnection.shutdown();
}
dispose(): void {
this._kernelConnection.dispose();
}
async interrupt(): Promise<void> {
await this._kernelConnection.interrupt();
}
async restart(): Promise<void> {
await this._kernelConnection.restart();
}
protected override startHandleKernelMessages(kernelConnection: Kernel.IKernelConnection) {
super.startHandleKernelMessages(kernelConnection);
this._kernelConnection = kernelConnection;
kernelConnection.connectionStatusChanged.connect(this.onConnectionStatusChanged, this);
kernelConnection.statusChanged.connect(this.onStatusChanged, this);
kernelConnection.disposed.connect(this.onDisposed, this);
}
protected override stopHandlingKernelMessages(kernelConnection: Kernel.IKernelConnection): void {
super.stopHandlingKernelMessages(kernelConnection);
kernelConnection.connectionStatusChanged.disconnect(this.onConnectionStatusChanged, this);
kernelConnection.statusChanged.disconnect(this.onStatusChanged, this);
kernelConnection.disposed.disconnect(this.onDisposed, this);
}
private onDisposed(connection: Kernel.IKernelConnection) {
if (connection === this.possibleKernelConnection) {
this.disposed.emit();
}
}
private onStatusChanged(connection: Kernel.IKernelConnection, args: Kernel.Status) {
if (connection === this.possibleKernelConnection) {
this.statusChanged.emit(args);
}
}
private onConnectionStatusChanged(connection: Kernel.IKernelConnection, args: Kernel.ConnectionStatus) {
if (connection === this.possibleKernelConnection) {
this.connectionStatusChanged.emit(args);
}
}
}
5 changes: 4 additions & 1 deletion src/kernels/common/kernelSocketWrapper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
import { EventEmitter } from 'vscode';
import * as WebSocketWS from 'ws';
import { ClassType } from '../../platform/ioc/types';
import { traceError } from '../../platform/logging';
Expand Down Expand Up @@ -50,7 +51,8 @@ export function KernelSocketWrapper<T extends ClassType<IWebSocketLike>>(SuperCl
private sendHooks: ((data: any, cb?: (err?: Error) => void) => Promise<void>)[];
private msgChain: Promise<any>;
private sendChain: Promise<any>;

private _onAnyMessage = new EventEmitter<{ msg: string; direction: 'send' }>();
public onAnyMessage = this._onAnyMessage.event;
constructor(...rest: any[]) {
super(...rest);
// Make sure the message chain is initialized
Expand All @@ -67,6 +69,7 @@ export function KernelSocketWrapper<T extends ClassType<IWebSocketLike>>(SuperCl
public sendToRealKernel(data: any, a2: any) {
// This will skip the send hooks. It's coming from
// the UI side.
this._onAnyMessage.fire({ msg: data, direction: 'send' });
super.send(data, a2);
}

Expand Down
27 changes: 15 additions & 12 deletions src/kernels/debugger/kernelDebugAdapterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
DebugAdapter,
DebugProtocolMessage,
DebugSession,
Disposable,
Event,
EventEmitter,
NotebookCell,
Expand All @@ -33,7 +34,6 @@ import {
IDebugInfoResponse
} from './types';
import { sendTelemetryEvent } from '../../telemetry';
import { IDisposable } from '../../platform/common/types';
import { traceError, traceInfo, traceInfoIfCI, traceVerbose } from '../../platform/logging';
import {
assertIsDebugConfig,
Expand All @@ -42,6 +42,7 @@ import {
getMessageSourceAndHookIt
} from '../../notebooks/debugger/helper';
import { ResourceMap } from '../../platform/vscode-path/map';
import { IDisposable } from '../../platform/common/types';

/**
* For info on the custom requests implemented by jupyter see:
Expand Down Expand Up @@ -89,18 +90,9 @@ export abstract class KernelDebugAdapterBase implements DebugAdapter, IKernelDeb
this.debugCell = notebookDocument.cellAt(configuration.__cellIndex!);
}

this.jupyterSession.kernel?.iopubMessage.connect(this.onIOPubMessage, this);
this.disposables.push(
this.jupyterSession.onIOPubMessage(async (msg: KernelMessage.IIOPubMessage) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const anyMsg = msg as any;
traceInfoIfCI(`Debug IO Pub message: ${JSON.stringify(msg)}`);
if (anyMsg.header.msg_type === 'debug_event') {
this.trace('event', JSON.stringify(msg));
if (!(await this.delegate?.willSendEvent(anyMsg))) {
this.sendMessage.fire(msg.content);
}
}
})
new Disposable(() => this.jupyterSession.kernel?.iopubMessage.disconnect(this.onIOPubMessage, this))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can hook up to the underlying Jupyter API events.

);

if (this.kernel) {
Expand Down Expand Up @@ -157,6 +149,17 @@ export abstract class KernelDebugAdapterBase implements DebugAdapter, IKernelDeb
traceVerbose(`[Debug] ${tag}: ${msg}`);
}

async onIOPubMessage(_: unknown, msg: KernelMessage.IIOPubMessage) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const anyMsg = msg as any;
traceInfoIfCI(`Debug IO Pub message: ${JSON.stringify(msg)}`);
if (anyMsg.header.msg_type === 'debug_event') {
this.trace('event', JSON.stringify(msg));
if (!(await this.delegate?.willSendEvent(anyMsg))) {
this.sendMessage.fire(msg.content);
}
}
}
async handleMessage(message: DebugProtocol.ProtocolMessage) {
try {
traceInfoIfCI(`KernelDebugAdapter::handleMessage ${JSON.stringify(message, undefined, ' ')}`);
Expand Down
Loading