diff --git a/python_files/vscode_pytest/__init__.py b/python_files/vscode_pytest/__init__.py index 9f4b6e9afe00..9c9c6c770587 100644 --- a/python_files/vscode_pytest/__init__.py +++ b/python_files/vscode_pytest/__init__.py @@ -20,17 +20,6 @@ import pytest - -# sys.path.append("/Users/eleanorboyd/vscode-python/.nox/install_python_libs/lib/python3.10") -# sys.path.append("/Users/eleanorboyd/vscode-python-debugger") -# sys.path.append("/Users/eleanorboyd/vscode-python-debugger/bundled") -# sys.path.append("/Users/eleanorboyd/vscode-python-debugger/bundled/libs") - -# import debugpy # noqa: E402 - -# debugpy.connect(5678) -# debugpy.breakpoint() # noqa: E702 - if TYPE_CHECKING: from pluggy import Result @@ -161,7 +150,7 @@ def pytest_exception_interact(node, call, report): collected_test = TestRunResultDict() collected_test[node_id] = item_result cwd = pathlib.Path.cwd() - execution_post( + send_execution_message( os.fsdecode(cwd), "success", collected_test if collected_test else None, @@ -285,7 +274,7 @@ def pytest_report_teststatus(report, config): # noqa: ARG001 ) collected_test = TestRunResultDict() collected_test[absolute_node_id] = item_result - execution_post( + send_execution_message( os.fsdecode(cwd), "success", collected_test if collected_test else None, @@ -319,7 +308,7 @@ def pytest_runtest_protocol(item, nextitem): # noqa: ARG001 ) collected_test = TestRunResultDict() collected_test[absolute_node_id] = item_result - execution_post( + send_execution_message( os.fsdecode(cwd), "success", collected_test if collected_test else None, @@ -390,7 +379,7 @@ def pytest_sessionfinish(session, exitstatus): "children": [], "id_": "", } - post_response(os.fsdecode(cwd), error_node) + send_discovery_message(os.fsdecode(cwd), error_node) try: session_node: TestNode | None = build_test_tree(session) if not session_node: @@ -398,7 +387,7 @@ def pytest_sessionfinish(session, exitstatus): "Something went wrong following pytest finish, \ no session node was created" ) - post_response(os.fsdecode(cwd), session_node) + send_discovery_message(os.fsdecode(cwd), session_node) except Exception as e: ERRORS.append( f"Error Occurred, traceback: {(traceback.format_exc() if e.__traceback__ else '')}" @@ -410,7 +399,7 @@ def pytest_sessionfinish(session, exitstatus): "children": [], "id_": "", } - post_response(os.fsdecode(cwd), error_node) + send_discovery_message(os.fsdecode(cwd), error_node) else: if exitstatus == 0 or exitstatus == 1: exitstatus_bool = "success" @@ -420,7 +409,7 @@ def pytest_sessionfinish(session, exitstatus): ) exitstatus_bool = "error" - execution_post( + send_execution_message( os.fsdecode(cwd), exitstatus_bool, None, @@ -428,7 +417,7 @@ def pytest_sessionfinish(session, exitstatus): # send end of transmission token command_type = "discovery" if IS_DISCOVERY else "execution" payload: EOTPayloadDict = {"command_type": command_type, "eot": True} - send_post_request(payload) + send_message(payload) def build_test_tree(session: pytest.Session) -> TestNode: @@ -790,8 +779,10 @@ def get_node_path(node: Any) -> pathlib.Path: atexit.register(lambda: __writer.close() if __writer else None) -def execution_post(cwd: str, status: Literal["success", "error"], tests: TestRunResultDict | None): - """Sends a POST request with execution payload details. +def send_execution_message( + cwd: str, status: Literal["success", "error"], tests: TestRunResultDict | None +): + """Sends message execution payload details. Args: cwd (str): Current working directory. @@ -803,10 +794,10 @@ def execution_post(cwd: str, status: Literal["success", "error"], tests: TestRun ) if ERRORS: payload["error"] = ERRORS - send_post_request(payload) + send_message(payload) -def post_response(cwd: str, session_node: TestNode) -> None: +def send_discovery_message(cwd: str, session_node: TestNode) -> None: """ Sends a POST request with test session details in payload. @@ -822,7 +813,7 @@ def post_response(cwd: str, session_node: TestNode) -> None: } if ERRORS is not None: payload["error"] = ERRORS - send_post_request(payload, cls_encoder=PathEncoder) + send_message(payload, cls_encoder=PathEncoder) class PathEncoder(json.JSONEncoder): @@ -834,7 +825,7 @@ def default(self, obj): return super().default(obj) -def send_post_request( +def send_message( payload: ExecutionPayloadDict | DiscoveryPayloadDict | EOTPayloadDict, cls_encoder=None, ): @@ -845,7 +836,6 @@ def send_post_request( payload -- the payload data to be sent. cls_encoder -- a custom encoder if needed. """ - print("EJFB into send post request!") if not TEST_RUN_PIPE: error_msg = ( "PYTEST ERROR: TEST_RUN_PIPE is not set at the time of pytest starting. " @@ -860,10 +850,8 @@ def send_post_request( if __writer is None: try: - print("EJFB attemping writer open") __writer = open(TEST_RUN_PIPE, "w", encoding="utf-8", newline="\r\n") # noqa: SIM115, PTH123 except Exception as error: - print("EJFB error in writer open") error_msg = f"Error attempting to connect to extension named pipe {TEST_RUN_PIPE}[vscode-pytest]: {error}" print(error_msg, file=sys.stderr) print( @@ -880,16 +868,11 @@ def send_post_request( "params": payload, } data = json.dumps(rpc, cls=cls_encoder) - print(f"EJFB Plugin info[vscode-pytest]: sending data: \n{data}\n") try: if __writer: request = f"""content-length: {len(data)}\ncontent-type: application/json\n\n{data}""" __writer.write(request) __writer.flush() - print( - f"EJFB Plugin info[vscode-pytest]: data sent successfully[vscode-pytest]: \n{data}\n" - ) - # __writer.close() else: print( f"Plugin error connection error[vscode-pytest], writer is None \n[vscode-pytest] data: \n{data} \n", diff --git a/src/client/common/pipes/namedPipes.ts b/src/client/common/pipes/namedPipes.ts index 099d73ba9010..d796cbee8096 100644 --- a/src/client/common/pipes/namedPipes.ts +++ b/src/client/common/pipes/namedPipes.ts @@ -3,12 +3,12 @@ import * as cp from 'child_process'; import * as crypto from 'crypto'; -import * as fs from 'fs'; +import * as fs from 'fs-extra'; import * as net from 'net'; import * as os from 'os'; import * as path from 'path'; import * as rpc from 'vscode-jsonrpc/node'; -import { CancellationError, CancellationToken } from 'vscode'; +import { CancellationError, CancellationToken, Disposable } from 'vscode'; import { traceVerbose } from '../../logging'; import { isWindows } from '../platform/platformService'; import { createDeferred } from '../utils/async'; @@ -73,6 +73,9 @@ export async function createWriterPipe(pipeName: string, token?: CancellationTok } // linux implementation of FIFO await mkfifo(pipeName); + try { + await fs.chmod(pipeName, 0o666); + } catch {} const writer = fs.createWriteStream(pipeName, { encoding: 'utf-8', }); @@ -86,14 +89,14 @@ class CombinedReader implements rpc.MessageReader { private _onPartialMessage = new rpc.Emitter(); - private _listeners = new rpc.Emitter(); - - private _readers: rpc.MessageReader[] = []; + private _callback: rpc.DataCallback = () => {}; private _disposables: rpc.Disposable[] = []; + private _readers: rpc.MessageReader[] = []; + constructor() { - this._disposables.push(this._onClose, this._onError, this._onPartialMessage, this._listeners); + this._disposables.push(this._onClose, this._onError, this._onPartialMessage); } onError: rpc.Event = this._onError.event; @@ -103,34 +106,41 @@ class CombinedReader implements rpc.MessageReader { onPartialMessage: rpc.Event = this._onPartialMessage.event; listen(callback: rpc.DataCallback): rpc.Disposable { - return this._listeners.event(callback); + this._callback = callback; + return new Disposable(() => (this._callback = () => {})); } add(reader: rpc.MessageReader): void { this._readers.push(reader); - this._disposables.push( - reader.onError((error) => this._onError.fire(error)), - reader.onClose(() => this.dispose()), - reader.onPartialMessage((info) => this._onPartialMessage.fire(info)), - reader.listen((msg) => { - this._listeners.fire(msg as rpc.NotificationMessage); - }), - ); + reader.listen((msg) => { + this._callback(msg as rpc.NotificationMessage); + }); + this._disposables.push(reader); + reader.onClose(() => { + this.remove(reader); + if (this._readers.length === 0) { + this._onClose.fire(); + } + }); + reader.onError((e) => { + this.remove(reader); + this._onError.fire(e); + }); } - error(error: Error): void { - this._onError.fire(error); + remove(reader: rpc.MessageReader): void { + const found = this._readers.find((r) => r === reader); + if (found) { + this._readers = this._readers.filter((r) => r !== reader); + reader.dispose(); + } } dispose(): void { - this._onClose.fire(); - this._disposables.forEach((disposable) => { - try { - disposable.dispose(); - } catch (e) { - /* noop */ - } - }); + this._readers.forEach((r) => r.dispose()); + this._readers = []; + this._disposables.forEach((disposable) => disposable.dispose()); + this._disposables = []; } } @@ -166,10 +176,11 @@ export async function createReaderPipe(pipeName: string, token?: CancellationTok deferred.resolve(combined); return deferred.promise; } - // linux implementation of FIFO + // mac/linux implementation of FIFO await mkfifo(pipeName); - const reader = fs.createReadStream(pipeName, { - encoding: 'utf-8', - }); + try { + await fs.chmod(pipeName, 0o666); + } catch {} + const reader = fs.createReadStream(pipeName, { encoding: 'utf-8' }); return new rpc.StreamMessageReader(reader, 'utf-8'); } diff --git a/src/client/testing/testController/common/utils.ts b/src/client/testing/testController/common/utils.ts index 19cb9b20ce88..241de6d8c9d4 100644 --- a/src/client/testing/testController/common/utils.ts +++ b/src/client/testing/testController/common/utils.ts @@ -230,44 +230,48 @@ export async function startRunResultNamedPipe( dataReceivedCallback: (payload: ExecutionTestPayload | EOTTestPayload) => void, deferredTillServerClose: Deferred, cancellationToken?: CancellationToken, -): Promise<{ name: string } & Disposable> { +): Promise { traceVerbose('Starting Test Result named pipe'); - const pipeName: string = generateRandomPipeName('python-test-results'); // '/Users/eleanorboyd/testingFiles/inc_dec_example/temp.txt'; // + const pipeName: string = generateRandomPipeName('python-test-results'); - let disposeOfServer: () => void = () => { - deferredTillServerClose.resolve(); - /* noop */ - }; - const reader = await createReaderPipe(pipeName, cancellationToken); - traceVerbose(`Test Discovery named pipe ${pipeName} connected`); - let perConnectionDisposables: (Disposable | undefined)[] = [reader]; - // create a function to dispose of the server - disposeOfServer = () => { - // dispose of all data listeners and cancelation listeners - perConnectionDisposables.forEach((d) => d?.dispose()); - perConnectionDisposables = []; + const reader = await createReaderPipe(pipeName, cancellationToken); + traceVerbose(`Test Results named pipe ${pipeName} connected`); + let disposables: Disposable[] = []; + const disposable = new Disposable(() => { + traceVerbose(`Test Results named pipe ${pipeName} disposed`); + disposables.forEach((d) => d.dispose()); + disposables = []; deferredTillServerClose.resolve(); - }; - perConnectionDisposables.push( - cancellationToken?.onCancellationRequested(() => { - console.log(`Test Result named pipe ${pipeName} cancelled`); - // if cancel is called on one connection, dispose of all connections - disposeOfServer(); - }), + }); + + if (cancellationToken) { + disposables.push( + cancellationToken?.onCancellationRequested(() => { + console.log(`Test Result named pipe ${pipeName} cancelled`); + disposable.dispose(); + }), + ); + } + disposables.push( + reader, reader.listen((data: Message) => { traceVerbose(`Test Result named pipe ${pipeName} received data`); // if EOT, call decrement connection count (callback) dataReceivedCallback((data as ExecutionResultMessage).params as ExecutionTestPayload | EOTTestPayload); }), + reader.onClose(() => { + // this is called once the server close, once per run instance + traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`); + // dispose of all data listeners and cancelation listeners + disposable.dispose(); + }), + reader.onError((error) => { + traceError(`Test Results named pipe ${pipeName} error:`, error); + }), ); - reader.onClose(() => { - // this is called once the server close, once per run instance - traceVerbose(`Test Result named pipe ${pipeName} closed. Disposing of listener/s.`); - // dispose of all data listeners and cancelation listeners - disposeOfServer(); - }); - return { name: pipeName, dispose: disposeOfServer }; + + return pipeName; } interface DiscoveryResultMessage extends Message { @@ -277,31 +281,31 @@ interface DiscoveryResultMessage extends Message { export async function startDiscoveryNamedPipe( callback: (payload: DiscoveredTestPayload | EOTTestPayload) => void, cancellationToken?: CancellationToken, -): Promise<{ name: string } & Disposable> { +): Promise { traceVerbose('Starting Test Discovery named pipe'); // const pipeName: string = '/Users/eleanorboyd/testingFiles/inc_dec_example/temp33.txt'; const pipeName: string = generateRandomPipeName('python-test-discovery'); - let dispose: () => void = () => { - /* noop */ - }; const reader = await createReaderPipe(pipeName, cancellationToken); - reader.listen((data: Message) => { - traceVerbose(`Test Discovery named pipe ${pipeName} received data`); - callback((data as DiscoveryResultMessage).params as DiscoveredTestPayload | EOTTestPayload); - }); traceVerbose(`Test Discovery named pipe ${pipeName} connected`); - let disposables: (Disposable | undefined)[] = [reader]; - dispose = () => { + let disposables: Disposable[] = []; + const disposable = new Disposable(() => { traceVerbose(`Test Discovery named pipe ${pipeName} disposed`); - disposables.forEach((d) => d?.dispose()); + disposables.forEach((d) => d.dispose()); disposables = []; - }; + }); + + if (cancellationToken) { + disposables.push( + cancellationToken.onCancellationRequested(() => { + traceVerbose(`Test Discovery named pipe ${pipeName} cancelled`); + disposable.dispose(); + }), + ); + } + disposables.push( - cancellationToken?.onCancellationRequested(() => { - traceVerbose(`Test Discovery named pipe ${pipeName} cancelled`); - dispose(); - }), + reader, reader.listen((data: Message) => { traceVerbose(`Test Discovery named pipe ${pipeName} received data`); callback((data as DiscoveryResultMessage).params as DiscoveredTestPayload | EOTTestPayload); @@ -309,14 +313,13 @@ export async function startDiscoveryNamedPipe( reader.onClose(() => { callback(createEOTPayload(false)); traceVerbose(`Test Discovery named pipe ${pipeName} closed`); - dispose(); + disposable.dispose(); }), reader.onError((error) => { traceError(`Test Discovery named pipe ${pipeName} error:`, error); - dispose(); }), ); - return { name: pipeName, dispose }; + return pipeName; } export async function startTestIdServer(testIds: string[]): Promise { diff --git a/src/client/testing/testController/pytest/pytestDiscoveryAdapter.ts b/src/client/testing/testController/pytest/pytestDiscoveryAdapter.ts index dd7bc9b21847..adc79829bc57 100644 --- a/src/client/testing/testController/pytest/pytestDiscoveryAdapter.ts +++ b/src/client/testing/testController/pytest/pytestDiscoveryAdapter.ts @@ -39,7 +39,7 @@ export class PytestTestDiscoveryAdapter implements ITestDiscoveryAdapter { async discoverTests(uri: Uri, executionFactory?: IPythonExecutionFactory): Promise { const deferredTillEOT: Deferred = createDeferred(); - const { name, dispose } = await startDiscoveryNamedPipe((data: DiscoveredTestPayload | EOTTestPayload) => { + const name = await startDiscoveryNamedPipe((data: DiscoveredTestPayload | EOTTestPayload) => { this.resultResolver?.resolveDiscovery(data, deferredTillEOT); }); @@ -48,7 +48,6 @@ export class PytestTestDiscoveryAdapter implements ITestDiscoveryAdapter { } finally { await deferredTillEOT.promise; traceVerbose('deferredTill EOT resolved'); - dispose(); } // this is only a placeholder to handle function overloading until rewrite is finished const discoveryPayload: DiscoveredTestPayload = { cwd: uri.fsPath, status: 'success' }; diff --git a/src/client/testing/testController/pytest/pytestExecutionAdapter.ts b/src/client/testing/testController/pytest/pytestExecutionAdapter.ts index 9d48003525d6..18d49c72ac4a 100644 --- a/src/client/testing/testController/pytest/pytestExecutionAdapter.ts +++ b/src/client/testing/testController/pytest/pytestExecutionAdapter.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -import { TestRun, Uri } from 'vscode'; +import { CancellationToken, CancellationTokenSource, TestRun, Uri } from 'vscode'; import * as path from 'path'; import { ChildProcess } from 'child_process'; import { IConfigurationService, ITestOutputChannel } from '../../../common/types'; @@ -48,16 +48,18 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter { traceError(`No run instance found, cannot resolve execution, for workspace ${uri.fsPath}.`); } }; - const { name, dispose: serverDispose } = await utils.startRunResultNamedPipe( + const cSource = new CancellationTokenSource(); + runInstance?.token.onCancellationRequested(() => cSource.cancel()); + + const name = await utils.startRunResultNamedPipe( dataReceivedCallback, // callback to handle data received deferredTillServerClose, // deferred to resolve when server closes - runInstance?.token, // token to cancel + cSource.token, // token to cancel ); runInstance?.token.onCancellationRequested(() => { traceInfo(`Test run cancelled, resolving 'till EOT' deferred for ${uri.fsPath}.`); // if canceled, stop listening for results deferredTillEOT.resolve(); - serverDispose(); // this will resolve deferredTillServerClose const executionPayload: ExecutionTestPayload = { cwd: uri.fsPath, @@ -73,7 +75,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter { testIds, name, deferredTillEOT, - serverDispose, + cSource, runInstance, debugBool, executionFactory, @@ -100,7 +102,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter { testIds: string[], resultNamedPipeName: string, deferredTillEOT: Deferred, - serverDispose: () => void, + serverCancel: CancellationTokenSource, runInstance?: TestRun, debugBool?: boolean, executionFactory?: IPythonExecutionFactory, @@ -167,7 +169,7 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter { }; traceInfo(`Running DEBUG pytest with arguments: ${testArgs} for workspace ${uri.fsPath} \r\n`); await debugLauncher!.launchDebugger(launchOptions, () => { - serverDispose(); // this will resolve deferredTillServerClose + serverCancel.cancel(); deferredTillEOT?.resolve(); }); } else { @@ -238,11 +240,12 @@ export class PytestTestExecutionAdapter implements ITestExecutionAdapter { } // this doesn't work, it instead directs us to the noop one which is defined first // potentially this is due to the server already being close, if this is the case? - serverDispose(); // this will resolve deferredTillServerClose } + // deferredTillEOT is resolved when all data sent on stdout and stderr is received, close event is only called when this occurs // due to the sync reading of the output. deferredTillExecClose.resolve(); + serverCancel.cancel(); }); await deferredTillExecClose.promise; }