Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement back end -> front end comm messages #218

Merged
merged 28 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
66da9d4
add function for emitting a comm channel msg from kernel
jmcphers Feb 23, 2023
bb40cfc
FnOnce based emitter
jmcphers Feb 23, 2023
b6bef59
make message emitter static
jmcphers Feb 23, 2023
bce362e
use closure to send message over iopub socket
jmcphers Feb 23, 2023
0c55078
Merge remote-tracking branch 'origin/main' into feature/comm-channel-msg
jmcphers Feb 23, 2023
2876028
send_request -> handle_request
jmcphers Feb 23, 2023
ea7ef59
use channels exclusively for delivery of msgs to front end
jmcphers Feb 24, 2023
affe88e
use message types so we can emit close commands
jmcphers Feb 24, 2023
e8b1d6c
emit and handle closed/open messages
jmcphers Feb 24, 2023
bee652d
more robust error handling, comments
jmcphers Feb 24, 2023
aaddaef
graceful error handling for comm sockets
jmcphers Feb 25, 2023
4e58f7d
clarify methods & data in comm socket
jmcphers Feb 25, 2023
043f2ac
implement test comm
jmcphers Feb 25, 2023
90a01dd
plumb initial client creation params through ext host
jmcphers Feb 27, 2023
6c5d9e8
Merge remote-tracking branch 'origin/main' into feature/comm-channel-msg
jmcphers Feb 27, 2023
3b041a6
promote lsp invocation to adapter level
jmcphers Feb 28, 2023
d12bfb2
implement client message and state emitters
jmcphers Feb 28, 2023
1fcae1d
some plumbing for comm data
jmcphers Feb 28, 2023
eb6e419
deliver comm messages from kernel to adapter
jmcphers Feb 28, 2023
3c9ef0a
separate message for comm closed from server side
jmcphers Feb 28, 2023
c956d7f
finish plumbing for close messages
jmcphers Feb 28, 2023
379dd34
make comm open an async task
jmcphers Mar 1, 2023
4f74011
port backoff-retry LSP connection to positron-r
jmcphers Mar 2, 2023
bad940d
Merge remote-tracking branch 'origin/main' into feature/comm-channel-msg
jmcphers Mar 2, 2023
a6276a6
don't mix runtimes in adapter output
jmcphers Mar 2, 2023
e35a090
use kernel's log in more places
jmcphers Mar 2, 2023
ffb7ecb
clarify lsp comm receiver
jmcphers Mar 6, 2023
2b376ba
improve typing on comm data payload
jmcphers Mar 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions extensions/jupyter-adapter/src/JupyterCommClose.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@
export interface JupyterCommClose {
/** The ID of the comm to tear down (as a GUID) */
comm_id: string; // eslint-disable-line

/** The message payload */
data: object;
}
151 changes: 67 additions & 84 deletions extensions/jupyter-adapter/src/JupyterKernel.ts

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions extensions/jupyter-adapter/src/JupyterSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class JupyterSocket implements vscode.Disposable {
* @param _channel The output channel to use for debugging
*/
constructor(title: string, socket: zmq.Socket,
private readonly _channel: vscode.OutputChannel) {
private readonly _logger: (msg: string) => void) {
this._socket = socket;
this._title = title;

Expand Down Expand Up @@ -48,7 +48,7 @@ export class JupyterSocket implements vscode.Disposable {
const maxTries = 10;
this._port = port;
this._addr = 'tcp://127.0.0.1:' + port.toString();
this._channel.appendLine(`${this._title} socket connecting to ${this._addr}...`);
this._logger(`${this._title} socket connecting to ${this._addr}...`);

// Monitor the socket for events; this is necessary to get events like
// `connect` to fire (otherwise we just get `message` events from the
Expand All @@ -68,14 +68,14 @@ export class JupyterSocket implements vscode.Disposable {
// Resolve the promise when the socket connects
return new Promise<void>((resolve, reject) => {
this._socket.on('connect', (_evt, addr) => {
this._channel.appendLine(`${this._title} socket connected to ${addr}`);
this._logger(`${this._title} socket connected to ${addr}`);
resolve();
});

// If the socket fails to connect, reject the promise
this._socket.on('connect_delay', (_evt, addr) => {
if (triesLeft-- === 0) {
this._channel.appendLine(`${this._title} socket failed to connect to ${addr} after ${maxTries} attempts`);
this._logger(`${this._title} socket failed to connect to ${addr} after ${maxTries} attempts`);
reject();
}
});
Expand Down
101 changes: 88 additions & 13 deletions extensions/jupyter-adapter/src/LanguageRuntimeAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import { JupyterIsCompleteRequest } from './JupyterIsCompleteRequest';
import { JupyterKernelInfoRequest } from './JupyterKernelInfoRequest';
import { JupyterHistoryReply } from './JupyterHistoryReply';
import { JupyterHistoryRequest } from './JupyterHistoryRequest';
import { findAvailablePort } from './PortFinder';
import { JupyterCommMsg } from './JupyterCommMsg';
import { JupyterCommClose } from './JupyterCommClose';

/**
* LangaugeRuntimeAdapter wraps a JupyterKernel in a LanguageRuntime compatible interface.
Expand Down Expand Up @@ -75,7 +78,7 @@ export class LanguageRuntimeAdapter
// Extract the first 32 characters of the hash as the runtime ID
const runtimeId = digest.digest('hex').substring(0, 32);

this._kernel = new JupyterKernel(this._context, this._spec, runtimeId, this._channel, this._lsp);
this._kernel = new JupyterKernel(this._context, this._spec, runtimeId, this._channel);

// Generate kernel metadata and ID
this.metadata = {
Expand Down Expand Up @@ -121,7 +124,7 @@ export class LanguageRuntimeAdapter
mode: positron.RuntimeCodeExecutionMode,
errorBehavior: positron.RuntimeErrorBehavior): void {

this._channel.appendLine(`Sending code to ${this.metadata.languageName}: ${code}`);
this._kernel.log(`Sending code to ${this.metadata.languageName}: ${code}`);

// Forward execution request to the kernel
this._kernel.execute(code, id, mode, errorBehavior);
Expand Down Expand Up @@ -155,7 +158,7 @@ export class LanguageRuntimeAdapter
* @param reply The user's response
*/
public replyToPrompt(id: string, reply: string): void {
this._channel.appendLine(`Sending reply to prompt ${id}: ${reply}`);
this._kernel.log(`Sending reply to prompt ${id}: ${reply}`);
this._kernel.replyToPrompt(id, reply);
}

Expand All @@ -174,7 +177,7 @@ export class LanguageRuntimeAdapter
throw new Error('Cannot interrupt kernel; it has already exited.');
}

this._channel.appendLine(`Interrupting ${this.metadata.languageName}`);
this._kernel.log(`Interrupting ${this.metadata.languageName}`);

// We are interrupting the kernel, so it's possible that message IDs
// that are currently being processed will never be completed. Clear the
Expand Down Expand Up @@ -248,32 +251,38 @@ export class LanguageRuntimeAdapter
* Creates a new client instance.
*
* @param type The type of client to create
* @param params The parameters for the client; the format of this object is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any restrictions on this type (e.g. it must be a JavaScript object? it must be parsable as JSON?) I'm just wondering if there's something mildly more typed than any that we could use. But any is probably fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I checked the spec and it says that data is a dict, so I think that implies it has to be an object of some kind. I've updated the relevant types.

* specific to the client type
* @returns A new client instance, or empty string if the type is not supported
*/
public createClient(type: positron.RuntimeClientType): string {
if (type === positron.RuntimeClientType.Environment) {
public async createClient(type: positron.RuntimeClientType, params: object): Promise<string> {
if (type === positron.RuntimeClientType.Environment ||
type === positron.RuntimeClientType.Lsp) {
// Currently the only supported client type
this._channel.appendLine(`Creating ${type} client for ${this.metadata.languageName}`);
this._kernel.log(`Creating '${type}' client for ${this.metadata.languageName}`);

// Create a new client adapter to wrap the comm channel
const adapter = new RuntimeClientAdapter(type, this._kernel);
const adapter = new RuntimeClientAdapter(type, params, this._kernel);

// Ensure we clean up the client from our internal state when it disconnects
adapter.onDidChangeClientState((e) => {
if (e === positron.RuntimeClientState.Closed) {
if (!this._comms.delete(adapter.getId())) {
this._channel.appendLine(`Warn: Runtime client adapater ${adapter.getId()} (${adapter.getClientType()}) not found`);
this._kernel.log(`Warn: Runtime client adapater ${adapter.getId()} (${adapter.getClientType()}) not found`);
}
}
});

// Open the client (this will send the comm_open message; wait for it to complete)
await adapter.open();

// Add the client to the map
this._comms.set(adapter.getId(), adapter);

// Return the ID of the new client
return adapter.getId();
} else {
this._channel.appendLine(`Info: can't create ${type} client for ${this.metadata.languageName} (not supported)`);
this._kernel.log(`Info: can't create ${type} client for ${this.metadata.languageName} (not supported)`);
}
return '';
}
Expand All @@ -286,10 +295,10 @@ export class LanguageRuntimeAdapter
removeClient(id: string): void {
const comm = this._comms.get(id);
if (comm) {
this._channel.appendLine(`Removing "${comm.getClientType()}" client ${comm.getClientId()} for ${this.metadata.languageName}`);
this._kernel.log(`Removing "${comm.getClientType()}" client ${comm.getClientId()} for ${this.metadata.languageName}`);
comm.dispose();
} else {
this._channel.appendLine(`Error: can't remove client ${id} (not found)`);
this._kernel.log(`Error: can't remove client ${id} (not found)`);
}
}

Expand Down Expand Up @@ -386,6 +395,10 @@ export class LanguageRuntimeAdapter
case 'input_request':
this.onInputRequest(msg, message as JupyterInputRequest);
break;
case 'comm_msg':
this.onCommMessage(msg, message as JupyterCommMsg);
case 'comm_close':
this.onCommClose(msg, message as JupyterCommClose);
}
}

Expand All @@ -407,6 +420,37 @@ export class LanguageRuntimeAdapter
} as positron.LanguageRuntimePrompt);
}

/**
* Delivers a comm_msg message from the kernel to the appropriate client instance.
*
* @param message The outer message packet
* @param msg The inner comm_msg message
*/
private onCommMessage(message: JupyterMessagePacket, msg: JupyterCommMsg): void {
this._messages.fire({
id: message.msgId,
parent_id: message.originId,
when: message.when,
type: positron.LanguageRuntimeMessageType.CommData,
data: msg.data,
} as positron.LanguageRuntimeCommMessage);
}

/**
* Notifies the client that a comm has been closed from the kernel side.
*
* @param message The outer message packet
* @param close The inner comm_msg message
*/
private onCommClose(message: JupyterMessagePacket, msg: JupyterCommMsg): void {
this._messages.fire({
id: message.msgId,
parent_id: message.originId,
when: message.when,
type: positron.LanguageRuntimeMessageType.CommClosed,
data: msg.data,
} as positron.LanguageRuntimeCommClosed);
}
/**
* Converts a Positron event into a language runtime event and emits it.
*
Expand Down Expand Up @@ -580,9 +624,40 @@ export class LanguageRuntimeAdapter
* @param status The new status of the kernel
*/
onStatus(status: positron.RuntimeState) {
this._channel.appendLine(`Kernel status changed to ${status}`);
this._kernel.log(`${this._spec.language} kernel status changed to ${status}`);
this._kernelState = status;
this._state.fire(status);

// When the kernel becomes ready, start the LSP server if it's configured
if (status === positron.RuntimeState.Ready && this._lsp) {
findAvailablePort([], 25).then(port => {
this._kernel.log(`Kernel ready, connecting to ${this._spec.display_name} LSP server on port ${port}...`);
this.startLsp(`127.0.0.1:${port}`);
this._lsp!(port).then(() => {
this._kernel.log(`${this._spec.display_name} LSP server connected`);
});
});
}
}

/**
* Requests that the kernel start a Language Server Protocol server, and
* connect it to the client with the given TCP address.
*
* Note: This is only useful if the kernel hasn't already started an LSP
* server.
*
* @param clientAddress The client's TCP address, e.g. '127.0.0.1:1234'
*/
public startLsp(clientAddress: string) {
// TODO: Should we query the kernel to see if it can create an LSP
// (QueryInterface style) instead of just demanding it?
//
// The Jupyter kernel spec does not provide a way to query for
// supported comms; the only way to know is to try to create one.

this._kernel.log(`Starting LSP server for ${clientAddress}`);
this.createClient(positron.RuntimeClientType.Lsp, { client_address: clientAddress });
}

/**
Expand Down
10 changes: 7 additions & 3 deletions extensions/jupyter-adapter/src/RuntimeClientAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export class RuntimeClientAdapter {

constructor(
private readonly _type: positron.RuntimeClientType,
private readonly _params: object,
private readonly _kernel: JupyterKernel) {

this.id = uuidv4();
Expand All @@ -45,12 +46,15 @@ export class RuntimeClientAdapter {
// Bind to status stream from kernel
this.onStatus = this.onStatus.bind(this);
this._kernel.addListener('status', this.onStatus);
}

/**
* Opens the communications channel between the client and the runtime.
*/
public async open() {
// Ask the kernel to open a comm channel for us
this._state.fire(positron.RuntimeClientState.Opening);
this._kernel.openComm(this._type, this.id, null);

// Consider the client connected once we've opened the comm
await this._kernel.openComm(this._type, this.id, this._params);
this._state.fire(positron.RuntimeClientState.Connected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@
use serde_json::Value;
use strum_macros::EnumString;

/// Rust trait that defines a custom Jupyter communication channel
pub trait CommChannel: Send {
fn send_request(&self, data: &Value);
fn target_name(&self) -> String;
fn close(&self);
}

#[derive(EnumString, PartialEq)]
#[strum(serialize_all = "snake_case")]
pub enum Comm {
Environment,
Lsp
Lsp,
}

pub enum CommChannelMsg {
Data(Value),
Close,
}

This file was deleted.

45 changes: 23 additions & 22 deletions extensions/positron-r/amalthea/crates/amalthea/src/comm/lsp_comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
use std::sync::Arc;
use std::sync::Mutex;

use crossbeam::channel::Sender;
use serde_json::json;
use serde_json::Value;

use crate::comm::comm_channel::CommChannel;
use crate::comm::comm_channel::CommChannelMsg;
use crate::error::Error;
use crate::language::lsp_handler::LspHandler;

Expand All @@ -23,39 +25,38 @@ pub struct StartLsp {
}

pub struct LspComm {
handler: Arc<Mutex<dyn LspHandler>>
handler: Arc<Mutex<dyn LspHandler>>,
msg_tx: Sender<Value>,
}

/**
* LspComm makes an LSP look like a CommChannel; it's used to start the LSP and
* track the server thread.
*/
impl LspComm {
pub fn new(handler: Arc<Mutex<dyn LspHandler>>) -> LspComm {
LspComm {
handler
}
pub fn new(handler: Arc<Mutex<dyn LspHandler>>, msg_tx: Sender<Value>) -> LspComm {
LspComm { handler, msg_tx }
}

pub fn start(&self, data: &StartLsp) -> Result<(), Error> {
pub fn start(&self, data: &StartLsp) -> Result<(), Error> {
let mut handler = self.handler.lock().unwrap();
handler.start(data.client_address.clone()).unwrap();
self.msg_tx
.send(json!({
"msg_type": "lsp_started",
"content": {}
}))
.unwrap();
Ok(())
}
}

impl CommChannel for LspComm {
fn send_request(&self, _data: &Value) {
// Not implemented; LSP messages are delivered directly to the LSP
// handler via TCP, not proxied here.
}

fn target_name(&self) -> String {
"lsp".to_string()
/**
* Returns a Sender that can accept comm channel messages (required as part of the
* `CommChannel` contract). Because the LSP communicates over its own TCP connection, it does
* not process messages from the comm, and they are discarded here.
*/
pub fn msg_sender(&self) -> Sender<CommChannelMsg> {
let (msg_tx, _msg_rx) = crossbeam::channel::unbounded();
msg_tx
}

fn close(&self) {
// Not implemented; the LSP is closed automatically when the TCP
// connection is closed.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@
*/

pub mod comm_channel;
pub mod environment_comm;
pub mod lsp_comm;
Loading