Skip to content

Commit

Permalink
Shared kernelconnection wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
DonJayamanne committed May 30, 2022
1 parent 51fad38 commit edb849d
Show file tree
Hide file tree
Showing 18 changed files with 637 additions and 322 deletions.
227 changes: 130 additions & 97 deletions TELEMETRY.md

Large diffs are not rendered by default.

44 changes: 24 additions & 20 deletions src/kernels/common/baseJupyterSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { ChainingExecuteRequester } from './chainingExecuteRequester';
import { getResourceType } from '../../platform/common/utils';
import { KernelProgressReporter } from '../../platform/progress/kernelProgressReporter';
import { isTestExecution } from '../../platform/common/constants';
import { KernelConnectionWrapper } from './kernelConnectionWrapper';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function suppressShutdownErrors(realKernel: any) {
Expand Down Expand Up @@ -71,6 +72,13 @@ export class JupyterSessionStartError extends WrappedError {
}

export abstract class BaseJupyterSession implements IJupyterSession {
/**
* Keep a single instance of KernelConnectionWrapper.
* This way when sessions change, we still have a single Kernel.IKernelConnection proxy (wrapper),
* which will have all of the event handlers bound to it.
* This allows consumers to add event handlers hand not worry about internals & can use the lower level Jupyter API.
*/
private _wrappedKernel?: KernelConnectionWrapper;
private _isDisposed?: boolean;
private readonly _disposed = new EventEmitter<void>();
protected readonly disposables: IDisposable[] = [];
Expand All @@ -83,28 +91,30 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected get session(): ISessionWithSocket | undefined {
return this._session;
}
public get kernelId(): string {
return this.session?.kernel?.id || '';
}
public get kernel(): Kernel.IKernelConnection | undefined {
return this._session?.kernel || undefined;
if (this._wrappedKernel) {
return this._wrappedKernel;
}
if (!this._session?.kernel) {
return;
}
this._wrappedKernel = new KernelConnectionWrapper(this._session.kernel, this.disposables);
return this._wrappedKernel;
}

public get kernelSocket(): Observable<KernelSocketInformation | undefined> {
return this._kernelSocket;
}
public get onSessionStatusChanged(): Event<KernelMessage.Status> {
return this.onStatusChangedEvent.event;
}
public get onIOPubMessage(): Event<KernelMessage.IIOPubMessage> {
if (!this.ioPubEventEmitter) {
this.ioPubEventEmitter = new EventEmitter<KernelMessage.IIOPubMessage>();
}
return this.ioPubEventEmitter.event;
}

public get status(): KernelMessage.Status {
return this.getServerStatus();
}

public abstract get kernelId(): string;

public get isConnected(): boolean {
return this.connected;
}
Expand All @@ -115,8 +125,6 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected restartSessionPromise?: { token: CancellationTokenSource; promise: Promise<ISessionWithSocket> };
private _session: ISessionWithSocket | undefined;
private _kernelSocket = new ReplaySubject<KernelSocketInformation | undefined>();
private ioPubEventEmitter = new EventEmitter<KernelMessage.IIOPubMessage>();
private ioPubHandler: Slot<ISessionWithSocket, KernelMessage.IIOPubMessage>;
private unhandledMessageHandler: Slot<ISessionWithSocket, KernelMessage.IMessage>;
private chainingExecute = new ChainingExecuteRequester();

Expand All @@ -128,7 +136,6 @@ export abstract class BaseJupyterSession implements IJupyterSession {
private readonly interruptTimeout: number
) {
this.statusHandler = this.onStatusChanged.bind(this);
this.ioPubHandler = (_s, m) => this.ioPubEventEmitter.fire(m);
this.unhandledMessageHandler = (_s, m) => {
traceInfo(`Unhandled message found: ${m.header.msg_type}`);
};
Expand Down Expand Up @@ -346,9 +353,6 @@ export abstract class BaseJupyterSession implements IJupyterSession {
protected setSession(session: ISessionWithSocket | undefined, forceUpdateKernelSocketInfo: boolean = false) {
const oldSession = this._session;
if (oldSession) {
if (this.ioPubHandler) {
oldSession.iopubMessage.disconnect(this.ioPubHandler);
}
if (this.unhandledMessageHandler) {
oldSession.unhandledMessage.disconnect(this.unhandledMessageHandler);
}
Expand All @@ -358,12 +362,12 @@ export abstract class BaseJupyterSession implements IJupyterSession {
}
this._session = session;
if (session) {
if (session.kernel && this._wrappedKernel) {
this._wrappedKernel.changeKernel(session.kernel);
}

// Listen for session status changes
session.statusChanged.connect(this.statusHandler);

if (session.iopubMessage) {
session.iopubMessage.connect(this.ioPubHandler);
}
if (session.unhandledMessage) {
session.unhandledMessage.connect(this.unhandledMessageHandler);
}
Expand Down
73 changes: 73 additions & 0 deletions src/kernels/common/kernelConnectionWrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import type { Kernel } from '@jupyterlab/services';
import { IDisposable } from '../../platform/common/types';
import { BaseKernelConnectionWrapper } from '../jupyter/baseKernelConnectionWrapper';

export class KernelConnectionWrapper extends BaseKernelConnectionWrapper {
/**
* Use `kernelConnection` to access the value as its not a constant (can change over time).
* E.g. when restarting kernels or the like.
*/
private _kernelConnection!: Kernel.IKernelConnection;
protected get possibleKernelConnection(): undefined | Kernel.IKernelConnection {
return this._kernelConnection;
}
public get kernel() {
return this._kernelConnection;
}

constructor(kernel: Kernel.IKernelConnection, disposables: IDisposable[]) {
super(kernel, disposables);
this._kernelConnection = kernel;
}
public changeKernel(kernel: Kernel.IKernelConnection) {
if (this.kernel === kernel) {
return;
}
this.stopHandlingKernelMessages(this.possibleKernelConnection!);
this._kernelConnection = kernel;
this.startHandleKernelMessages(kernel);
}
async shutdown(): Promise<void> {
await this._kernelConnection.shutdown();
}
dispose(): void {
this._kernelConnection.dispose();
}
async interrupt(): Promise<void> {
await this._kernelConnection.interrupt();
}
async restart(): Promise<void> {
await this._kernelConnection.restart();
}
protected override startHandleKernelMessages(kernelConnection: Kernel.IKernelConnection) {
super.startHandleKernelMessages(kernelConnection);
this._kernelConnection = kernelConnection;
kernelConnection.connectionStatusChanged.connect(this.onConnectionStatusChanged, this);
kernelConnection.statusChanged.connect(this.onStatusChanged, this);
kernelConnection.disposed.connect(this.onDisposed, this);
}
protected override stopHandlingKernelMessages(kernelConnection: Kernel.IKernelConnection): void {
super.stopHandlingKernelMessages(kernelConnection);
kernelConnection.connectionStatusChanged.disconnect(this.onConnectionStatusChanged, this);
kernelConnection.statusChanged.disconnect(this.onStatusChanged, this);
kernelConnection.disposed.disconnect(this.onDisposed, this);
}
private onDisposed(connection: Kernel.IKernelConnection) {
if (connection === this.possibleKernelConnection) {
this.disposed.emit();
}
}
private onStatusChanged(connection: Kernel.IKernelConnection, args: Kernel.Status) {
if (connection === this.possibleKernelConnection) {
this.statusChanged.emit(args);
}
}
private onConnectionStatusChanged(connection: Kernel.IKernelConnection, args: Kernel.ConnectionStatus) {
if (connection === this.possibleKernelConnection) {
this.connectionStatusChanged.emit(args);
}
}
}
27 changes: 15 additions & 12 deletions src/kernels/debugger/kernelDebugAdapterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
DebugAdapter,
DebugProtocolMessage,
DebugSession,
Disposable,
Event,
EventEmitter,
NotebookCell,
Expand All @@ -33,7 +34,6 @@ import {
IDebugInfoResponse
} from './types';
import { sendTelemetryEvent } from '../../telemetry';
import { IDisposable } from '../../platform/common/types';
import { traceError, traceInfo, traceInfoIfCI, traceVerbose } from '../../platform/logging';
import {
assertIsDebugConfig,
Expand All @@ -42,6 +42,7 @@ import {
getMessageSourceAndHookIt
} from '../../notebooks/debugger/helper';
import { ResourceMap } from '../../platform/vscode-path/map';
import { IDisposable } from '../../platform/common/types';

/**
* For info on the custom requests implemented by jupyter see:
Expand Down Expand Up @@ -89,18 +90,9 @@ export abstract class KernelDebugAdapterBase implements DebugAdapter, IKernelDeb
this.debugCell = notebookDocument.cellAt(configuration.__cellIndex!);
}

this.jupyterSession.kernel?.iopubMessage.connect(this.onIOPubMessage, this);
this.disposables.push(
this.jupyterSession.onIOPubMessage(async (msg: KernelMessage.IIOPubMessage) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const anyMsg = msg as any;
traceInfoIfCI(`Debug IO Pub message: ${JSON.stringify(msg)}`);
if (anyMsg.header.msg_type === 'debug_event') {
this.trace('event', JSON.stringify(msg));
if (!(await this.delegate?.willSendEvent(anyMsg))) {
this.sendMessage.fire(msg.content);
}
}
})
new Disposable(() => this.jupyterSession.kernel?.iopubMessage.disconnect(this.onIOPubMessage, this))
);

if (this.kernel) {
Expand Down Expand Up @@ -157,6 +149,17 @@ export abstract class KernelDebugAdapterBase implements DebugAdapter, IKernelDeb
traceVerbose(`[Debug] ${tag}: ${msg}`);
}

async onIOPubMessage(_: unknown, msg: KernelMessage.IIOPubMessage) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const anyMsg = msg as any;
traceInfoIfCI(`Debug IO Pub message: ${JSON.stringify(msg)}`);
if (anyMsg.header.msg_type === 'debug_event') {
this.trace('event', JSON.stringify(msg));
if (!(await this.delegate?.willSendEvent(anyMsg))) {
this.sendMessage.fire(msg.content);
}
}
}
async handleMessage(message: DebugProtocol.ProtocolMessage) {
try {
traceInfoIfCI(`KernelDebugAdapter::handleMessage ${JSON.stringify(message, undefined, ' ')}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ import { ISpecModel } from '@jupyterlab/services/lib/kernelspec/restapi';
import { JSONObject } from '@lumino/coreutils';
import { Signal } from '@lumino/signaling';
import { Disposable } from 'vscode';
import { IDisposable } from '../platform/common/types';
import { noop } from '../platform/common/utils/misc';
import { IKernel } from './types';
import { IDisposable } from '../../platform/common/types';

export class KernelConnectionWrapper implements Kernel.IKernelConnection {
export abstract class BaseKernelConnectionWrapper implements Kernel.IKernelConnection {
public readonly statusChanged = new Signal<this, Kernel.Status>(this);
public readonly connectionStatusChanged = new Signal<this, Kernel.ConnectionStatus>(this);
public readonly iopubMessage = new Signal<this, IIOPubMessage<IOPubMessageType>>(this);
Expand All @@ -48,23 +46,8 @@ export class KernelConnectionWrapper implements Kernel.IKernelConnection {
return (this.possibleKernelConnection || this._previousKernelConnection).serverSettings;
}
public readonly disposed = new Signal<this, void>(this);
/**
* Use `kernelConnection` to access the value as its not a constant (can change over time).
* E.g. when restarting kernels or the like.
*/
private _kernelConnection!: Kernel.IKernelConnection;
private readonly _previousKernelConnection: Kernel.IKernelConnection;
// private _isRestarting?: boolean;
private get possibleKernelConnection(): undefined | Kernel.IKernelConnection {
if (this.kernel.session?.kernel === this._kernelConnection) {
return this._kernelConnection;
}
this.stopHandlingKernelMessages(this._kernelConnection);
if (this.kernel.session?.kernel) {
this.startHandleKernelMessages(this.kernel.session.kernel);
return this._kernelConnection;
}
}
protected abstract get possibleKernelConnection(): undefined | Kernel.IKernelConnection;
private getKernelConnection(): Kernel.IKernelConnection {
if (!this.possibleKernelConnection) {
throw new Error(
Expand All @@ -74,27 +57,8 @@ export class KernelConnectionWrapper implements Kernel.IKernelConnection {
return this.possibleKernelConnection;
}

constructor(readonly kernel: IKernel, disposables: IDisposable[]) {
const emiStatusChangeEvents = () => {
this.statusChanged.emit(kernel.status);
if (kernel.status === 'dead' && !kernel.disposed && !kernel.disposing) {
this.connectionStatusChanged.emit('disconnected');
}
};
kernel.onDisposed(
() => {
// this._isRestarting = false;
emiStatusChangeEvents();
this.disposed.emit();
},
this,
disposables
);
kernel.onStarted(emiStatusChangeEvents, this, disposables);
kernel.onRestarted(emiStatusChangeEvents, this, disposables);
kernel.onStatusChanged(emiStatusChangeEvents, this, disposables);
this._previousKernelConnection = kernel.session!.kernel!;
this.startHandleKernelMessages(kernel.session!.kernel!);
constructor(private _previousKernelConnection: Kernel.IKernelConnection, disposables: IDisposable[]) {
this.startHandleKernelMessages(_previousKernelConnection);
disposables.push(
new Disposable(() => {
if (this.possibleKernelConnection) {
Expand All @@ -103,14 +67,19 @@ export class KernelConnectionWrapper implements Kernel.IKernelConnection {
})
);
}
abstract shutdown(): Promise<void>;
abstract dispose(): void;
abstract interrupt(): Promise<void>;
abstract restart(): Promise<void>;

public get id(): string {
return (this.possibleKernelConnection || this._previousKernelConnection).id;
}
public get name(): string {
return (this.possibleKernelConnection || this._previousKernelConnection).name;
}
public get isDisposed(): boolean {
return this.kernel.disposed;
return this.possibleKernelConnection ? this.possibleKernelConnection?.isDisposed === true : true;
}

public get model(): Kernel.IModel {
Expand Down Expand Up @@ -233,55 +202,18 @@ export class KernelConnectionWrapper implements Kernel.IKernelConnection {
): void {
return this.getKernelConnection().removeMessageHook(msgId, hook);
}
async shutdown(): Promise<void> {
if (
this.kernel.kernelConnectionMetadata.kind === 'startUsingRemoteKernelSpec' ||
this.kernel.kernelConnectionMetadata.kind === 'connectToLiveRemoteKernel'
) {
await this.kernel.session?.shutdown();
}
await this.kernel.dispose();
}

clone(
_options?: Pick<Kernel.IKernelConnection.IOptions, 'clientId' | 'username' | 'handleComms'>
): Kernel.IKernelConnection {
throw new Error('Method not implemented.');
}
dispose(): void {
this.kernel.dispose().catch(noop);
}
async interrupt(): Promise<void> {
// Sometimes we end up starting a new session.
// Hence assume a new session was created, meaning we need to bind to the kernel connection all over again.
this.stopHandlingKernelMessages(this.possibleKernelConnection!);

await this.kernel.interrupt();

if (!this.kernel.session?.kernel) {
throw new Error('Restart failed');
}
this.startHandleKernelMessages(this.kernel.session?.kernel);
}
async restart(): Promise<void> {
if (this.possibleKernelConnection) {
this.stopHandlingKernelMessages(this.possibleKernelConnection);
}

// If this is a remote, then we do something special.
await this.kernel.restart();

if (!this.kernel.session?.kernel) {
throw new Error('Restart failed');
}
this.startHandleKernelMessages(this.kernel.session?.kernel);
}
private startHandleKernelMessages(kernelConnection: Kernel.IKernelConnection) {
this._kernelConnection = kernelConnection;
protected startHandleKernelMessages(kernelConnection: Kernel.IKernelConnection) {
kernelConnection.anyMessage.connect(this.onAnyMessage, this);
kernelConnection.iopubMessage.connect(this.onIOPubMessage, this);
kernelConnection.unhandledMessage.connect(this.onUnhandledMessage, this);
}
private stopHandlingKernelMessages(kernelConnection: Kernel.IKernelConnection) {
protected stopHandlingKernelMessages(kernelConnection: Kernel.IKernelConnection) {
kernelConnection.anyMessage.disconnect(this.onAnyMessage, this);
kernelConnection.iopubMessage.disconnect(this.onIOPubMessage, this);
kernelConnection.unhandledMessage.disconnect(this.onUnhandledMessage, this);
Expand Down
Loading

0 comments on commit edb849d

Please sign in to comment.