Skip to content

Commit

Permalink
Make a timeout for interrupt ask for a restart (#4414)
Browse files Browse the repository at this point in the history
* Make a timeout for interrupt ask for a restart

* Fix linter

* Another file to not ignore

* Code review feedback
  • Loading branch information
rchiodo authored Jan 19, 2021
1 parent df8b967 commit 27cfffe
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 47 deletions.
7 changes: 0 additions & 7 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,6 @@ module.exports = {
'src/client/application/diagnostics/commands/types.ts',
'src/client/application/diagnostics/commands/launchBrowser.ts',
'src/client/application/misc/joinMailingListPrompt.ts',
'src/client/datascience/baseJupyterSession.ts',
'src/client/datascience/data-viewing/jupyterVariableDataProviderFactory.ts',
'src/client/datascience/data-viewing/dataViewerMessageListener.ts',
'src/client/datascience/data-viewing/jupyterVariableDataProvider.ts',
Expand Down Expand Up @@ -1082,7 +1081,6 @@ module.exports = {
'src/client/datascience/notebook/contentProvider.ts',
'src/client/datascience/notebook/notebookDisposeService.ts',
'src/client/datascience/notebook/serviceRegistry.ts',
'src/client/datascience/notebook/notebookEditor.ts',
'src/client/datascience/notebook/notebookEditorCompatibilitySupport.ts',
'src/client/datascience/notebook/kernelProvider.ts',
'src/client/datascience/notebook/integration.ts',
Expand Down Expand Up @@ -1119,16 +1117,11 @@ module.exports = {
'src/client/datascience/jupyter/kernels/kernelSelector.ts',
'src/client/datascience/jupyter/kernels/jupyterKernelPromiseFailedError.ts',
'src/client/datascience/jupyter/kernels/helpers.ts',
'src/client/datascience/jupyter/kernels/kernelExecution.ts',
'src/client/datascience/jupyter/kernels/kernelSwitcher.ts',
'src/client/datascience/jupyter/kernels/kernelService.ts',
'src/client/datascience/jupyter/kernels/kernelProvider.ts',
'src/client/datascience/jupyter/kernels/types.ts',
'src/client/datascience/jupyter/kernels/kernelSelections.ts',
'src/client/datascience/jupyter/kernels/jupyterKernelSpec.ts',
'src/client/datascience/jupyter/kernels/kernelDependencyService.ts',
'src/client/datascience/jupyter/kernels/kernel.ts',
'src/client/datascience/jupyter/kernels/cellExecution.ts',
'src/client/datascience/jupyter/jupyterNotebook.ts',
'src/client/datascience/jupyter/jupyterExecutionFactory.ts',
'src/client/datascience/jupyter/jupyterSession.ts',
Expand Down
1 change: 1 addition & 0 deletions news/2 Fixes/4369.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
If a kernel refuses to interrupt ask the user if they want to restart instead.
9 changes: 5 additions & 4 deletions src/client/datascience/baseJupyterSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export abstract class BaseJupyterSession implements IJupyterSession {
}
public async interrupt(timeout: number): Promise<void> {
if (this.session && this.session.kernel) {
traceInfo(`Interrupting kernel: ${this.session.kernel.name}`);
// Listen for session status changes
this.session.statusChanged.connect(this.statusHandler);

Expand Down Expand Up @@ -323,7 +324,7 @@ export abstract class BaseJupyterSession implements IJupyterSession {

// Sub classes need to implement their own restarting specific code
protected abstract startRestartSession(): void;
protected abstract async createRestartSession(
protected abstract createRestartSession(
kernelConnection: KernelConnectionMetadata | undefined,
session: ISessionWithSocket,
cancelToken?: CancellationToken
Expand Down Expand Up @@ -362,17 +363,17 @@ export abstract class BaseJupyterSession implements IJupyterSession {
};

let statusChangeHandler: Slot<ISessionWithSocket, Kernel.Status> | undefined;
const kernelStatusChangedPromise = new Promise((resolve, reject) => {
const kernelStatusChangedPromise = new Promise<void>((resolve, reject) => {
statusChangeHandler = (_: ISessionWithSocket, e: Kernel.Status) => statusHandler(resolve, reject, e);
session.statusChanged.connect(statusChangeHandler);
});
let kernelChangedHandler: Slot<ISessionWithSocket, Session.IKernelChangedArgs> | undefined;
const statusChangedPromise = new Promise((resolve, reject) => {
const statusChangedPromise = new Promise<void>((resolve, reject) => {
kernelChangedHandler = (_: ISessionWithSocket, e: Session.IKernelChangedArgs) =>
statusHandler(resolve, reject, e.newValue?.status);
session.kernelChanged.connect(kernelChangedHandler);
});
const checkStatusPromise = new Promise(async (resolve) => {
const checkStatusPromise = new Promise<void>(async (resolve) => {
// This function seems to cause CI builds to timeout randomly on
// different tests. Waiting for status to go idle doesn't seem to work and
// in the past, waiting on the ready promise doesn't work either. Check status with a maximum of 5 seconds
Expand Down
91 changes: 87 additions & 4 deletions src/client/datascience/jupyter/kernels/cellExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import type {
NotebookEditor as VSCNotebookEditor
} from '../../../../../types/vscode-proposed';
import { concatMultilineString, formatStreamText } from '../../../../datascience-ui/common';
import { ServerStatus } from '../../../../datascience-ui/interactive-common/mainState';
import { IApplicationShell, IVSCodeNotebook } from '../../../common/application/types';
import { traceError, traceErrorIf, traceInfoIf, traceWarning } from '../../../common/logger';
import { RefBool } from '../../../common/refBool';
import { IDisposable, IExtensionContext } from '../../../common/types';
import { createDeferred, Deferred } from '../../../common/utils/async';
import { createDeferred, Deferred, waitForPromise } from '../../../common/utils/async';
import { swallowExceptions } from '../../../common/utils/decorators';
import { noop } from '../../../common/utils/misc';
import { StopWatch } from '../../../common/utils/stopWatch';
Expand Down Expand Up @@ -45,7 +46,8 @@ import {
IJupyterSession,
INotebook,
INotebookEditorProvider,
INotebookExecutionLogger
INotebookExecutionLogger,
InterruptResult
} from '../../types';
import { translateCellFromNative } from '../../utils';
import { IKernel } from './types';
Expand Down Expand Up @@ -114,10 +116,12 @@ export class CellExecution {
private started?: boolean;

private _completed?: boolean;
private _interruptPromise?: Promise<InterruptResult>;
private readonly initPromise: Promise<void>;
private disposables: IDisposable[] = [];
private cancelHandled = false;
private requestHandlerChain = Promise.resolve();
private activeExecution: { execution: Promise<void>; session: IJupyterSession } | undefined = undefined;
private constructor(
public readonly editor: VSCNotebookEditor,
public readonly cell: NotebookCell,
Expand Down Expand Up @@ -165,6 +169,26 @@ export class CellExecution {
);
}

public async interrupt(timeoutMs: number): Promise<InterruptResult> {
// Skip if already interrupted
if (this._completed || !this.activeExecution) {
return InterruptResult.Success;
}
// Interrupt the active execution
const result = this._interruptPromise
? await this._interruptPromise
: await (this._interruptPromise = this.interruptExecution(
this.activeExecution.session,
this.activeExecution.execution,
timeoutMs
));

// Done interrrupting, clear interrupt promise
this._interruptPromise = undefined;

return result;
}

public async start(kernelPromise: Promise<IKernel>, notebook: INotebook) {
if (this.cancelHandled) {
traceCellMessage(this.cell, 'Not starting as it was cancelled');
Expand Down Expand Up @@ -228,12 +252,69 @@ export class CellExecution {
*/
private dispose() {
traceCellMessage(this.cell, 'Execution disposed');
this.activeExecution = undefined;
this.disposables.forEach((d) => d.dispose());
const deferred = CellExecution.cellsCompletedForTesting.get(this.cell);
if (deferred) {
deferred.resolve();
}
}

private async interruptExecution(
session: IJupyterSession,
execution: Promise<void>,
timeoutMS: number
): Promise<InterruptResult> {
// Create a deferred promise that resolves if we have a failure
const restarted = createDeferred<boolean>();

// Listen to status change events so we can tell if we're restarting
const restartHandler = (e: ServerStatus) => {
if (e === ServerStatus.Restarting) {
// We restarted the kernel.
traceWarning('Kernel restarting during interrupt');

// Indicate we restarted the race below
restarted.resolve(true);
}
};
const restartHandlerToken = session.onSessionStatusChanged(restartHandler);

// Start our interrupt. If it fails, indicate a restart
session.interrupt(timeoutMS).catch((exc) => {
traceWarning(`Error during interrupt: ${exc}`);
restarted.resolve(true);
});

try {
// Wait for all of the pending cells to finish or the timeout to fire
const result = await waitForPromise(Promise.race([execution, restarted.promise]), timeoutMS);

// See if we restarted or not
if (restarted.completed) {
return InterruptResult.Restarted;
}

if (result === null) {
// We timed out. You might think we should stop our pending list, but that's not
// up to us. The cells are still executing. The user has to request a restart or try again
return InterruptResult.TimedOut;
}

// Indicate the interrupt worked.
return InterruptResult.Success;
} catch (exc) {
// Something failed. See if we restarted or not.
if (restarted.completed) {
return InterruptResult.Restarted;
}

// Otherwise a real error occurred.
throw exc;
} finally {
restartHandlerToken.dispose();
}
}
private handleKernelRestart(kernel: IKernel) {
kernel.onRestarted(
async () => {
Expand Down Expand Up @@ -394,7 +475,9 @@ export class CellExecution {
private async execute(session: IJupyterSession, loggers: INotebookExecutionLogger[]) {
const code = this.cell.document.getText();
traceCellMessage(this.cell, 'Send code for execution');
return this.executeCodeCell(code, session, loggers);
const execution = this.executeCodeCell(code, session, loggers);
this.activeExecution = { execution, session };
await execution;
}

private async executeCodeCell(code: string, session: IJupyterSession, loggers: INotebookExecutionLogger[]) {
Expand Down Expand Up @@ -581,7 +664,7 @@ export class CellExecution {
// Ask the user for input
if (msg.content && 'prompt' in msg.content) {
const hasPassword = msg.content.password !== null && (msg.content.password as boolean);
this.applicationService
void this.applicationService
.showInputBox({
prompt: msg.content.prompt ? msg.content.prompt.toString() : '',
ignoreFocusOut: true,
Expand Down
27 changes: 25 additions & 2 deletions src/client/datascience/jupyter/kernels/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ export class Kernel implements IKernel {
get onDisposed(): Event<void> {
return this._onDisposed.event;
}
get onInterruptTimedOut(): Event<void> {
return this._onInterruptTimedOut.event;
}
private _info?: KernelMessage.IInfoReplyMsg['content'];
get info(): KernelMessage.IInfoReplyMsg['content'] | undefined {
return this._info;
Expand All @@ -70,6 +73,7 @@ export class Kernel implements IKernel {
private readonly _onStatusChanged = new EventEmitter<ServerStatus>();
private readonly _onRestarted = new EventEmitter<void>();
private readonly _onDisposed = new EventEmitter<void>();
private readonly _onInterruptTimedOut = new EventEmitter<void>();
private _notebookPromise?: Promise<INotebook | undefined>;
private readonly hookedNotebookForEvents = new WeakSet<INotebook>();
private restarting?: Deferred<void>;
Expand All @@ -82,6 +86,7 @@ export class Kernel implements IKernel {
private readonly notebookProvider: INotebookProvider,
private readonly disposables: IDisposableRegistry,
private readonly launchTimeout: number,
private readonly interruptTimeout: number,
commandManager: ICommandManager,
private readonly errorHandler: IDataScienceErrorHandler,
private readonly editorProvider: INotebookEditorProvider,
Expand Down Expand Up @@ -153,14 +158,32 @@ export class Kernel implements IKernel {
await this.initializeAfterStart();
}
}
public async interrupt(): Promise<InterruptResult> {

public async interruptCell(cell: NotebookCell): Promise<InterruptResult> {
if (this.restarting) {
await this.restarting.promise;
}
if (!this.notebook) {
throw new Error('No notebook to interrupt');
}
return this.notebook.interruptKernel(this.launchTimeout);
const result = await this.kernelExecution.interruptCell(cell, this.interruptTimeout);
if (result === InterruptResult.TimedOut) {
this._onInterruptTimedOut.fire();
}
return result;
}
public async interruptAllCells(document: NotebookDocument): Promise<InterruptResult> {
if (this.restarting) {
await this.restarting.promise;
}
if (!this.notebook) {
throw new Error('No notebook to interrupt');
}
const result = await this.kernelExecution.interruptAllCells(document, this.interruptTimeout);
if (result === InterruptResult.TimedOut) {
this._onInterruptTimedOut.fire();
}
return result;
}
public async dispose(): Promise<void> {
this.restarting = undefined;
Expand Down
28 changes: 28 additions & 0 deletions src/client/datascience/jupyter/kernels/kernelExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
IDataScienceErrorHandler,
INotebook,
INotebookEditorProvider,
InterruptResult,
IRawNotebookSupportedService
} from '../../types';
import { CellExecution, CellExecutionFactory } from './cellExecution';
Expand Down Expand Up @@ -174,6 +175,33 @@ export class KernelExecution implements IDisposable {
traceInfo('Cancel document execution');
document.cells.forEach((cell) => this.cancelCell(cell));
}

public async interruptCell(cell: NotebookCell, timeoutMs: number): Promise<InterruptResult> {
const execution = this.cellExecutions.get(cell);
if (execution) {
this.cellExecutions.delete(cell);
traceCellMessage(cell, 'Cancel cell from Kernel Execution');
return execution.interrupt(timeoutMs);
} else {
traceCellMessage(cell, 'Cannot cancel cell execution from Kernel Execution');
}

return InterruptResult.Success;
}
public async interruptAllCells(document: NotebookDocument, timeoutMs: number): Promise<InterruptResult> {
traceInfo('Interrupt document execution');
const results = await Promise.all(document.cells.map((cell) => this.interruptCell(cell, timeoutMs)));

// Flatten the results
if (results.includes(InterruptResult.Restarted)) {
return InterruptResult.Restarted;
}
if (results.includes(InterruptResult.TimedOut)) {
return InterruptResult.TimedOut;
}

return InterruptResult.Success;
}
public dispose() {
this.disposables.forEach((d) => d.dispose());
}
Expand Down
17 changes: 14 additions & 3 deletions src/client/datascience/jupyter/kernels/kernelProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import * as fastDeepEqual from 'fast-deep-equal';
import { inject, injectable } from 'inversify';
import { Uri } from 'vscode';
import { Disposable, Event, EventEmitter, Uri } from 'vscode';
import { IApplicationShell, ICommandManager, IVSCodeNotebook } from '../../../common/application/types';
import { traceInfo, traceWarning } from '../../../common/logger';
import { IFileSystem } from '../../../common/platform/types';
Expand All @@ -29,8 +29,15 @@ import { IKernel, IKernelProvider, IKernelSelectionUsage, KernelOptions } from '

@injectable()
export class KernelProvider implements IKernelProvider {
private readonly kernelsByUri = new Map<string, { options: KernelOptions; kernel: IKernel }>();
get onInterruptTimedOut(): Event<IKernel> {
return this._interruptTimedOut.event;
}
private readonly kernelsByUri = new Map<
string,
{ options: KernelOptions; kernel: IKernel; disposables: Disposable[] }
>();
private readonly pendingDisposables = new Set<IAsyncDisposable>();
private _interruptTimedOut = new EventEmitter<IKernel>();
constructor(
@inject(IAsyncDisposableRegistry) private asyncDisposables: IAsyncDisposableRegistry,
@inject(IDisposableRegistry) private disposables: IDisposableRegistry,
Expand Down Expand Up @@ -79,12 +86,14 @@ export class KernelProvider implements IKernelProvider {
this.disposeOldKernel(uri);

const waitForIdleTimeout = this.configService.getSettings(uri).jupyterLaunchTimeout;
const interruptTimeout = this.configService.getSettings(uri).jupyterInterruptTimeout;
const kernel = new Kernel(
uri,
options.metadata,
this.notebookProvider,
this.disposables,
waitForIdleTimeout,
interruptTimeout,
this.commandManager,
this.errorHandler,
this.editorProvider,
Expand All @@ -97,7 +106,8 @@ export class KernelProvider implements IKernelProvider {
this.context
);
this.asyncDisposables.push(kernel);
this.kernelsByUri.set(uri.toString(), { options, kernel });
const interruptTimedOutDisposable = kernel.onInterruptTimedOut(() => this._interruptTimedOut.fire(kernel));
this.kernelsByUri.set(uri.toString(), { options, kernel, disposables: [interruptTimedOutDisposable] });
this.deleteMappingIfKernelIsDisposed(uri, kernel);
return kernel;
}
Expand All @@ -124,6 +134,7 @@ export class KernelProvider implements IKernelProvider {
const kernelToDispose = this.kernelsByUri.get(uri.toString());
if (kernelToDispose) {
this.pendingDisposables.add(kernelToDispose.kernel);
kernelToDispose.disposables.forEach((d) => d.dispose());
kernelToDispose.kernel
.dispose()
.catch((ex) => traceWarning('Failed to dispose old kernel', ex))
Expand Down
Loading

0 comments on commit 27cfffe

Please sign in to comment.