Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for alternative RegistrationFile #576

Merged
merged 15 commits into from
Oct 10, 2024
Merged

Conversation

DavisVaughan
Copy link
Contributor

@DavisVaughan DavisVaughan commented Oct 8, 2024

Closes #563
Joint work with @lionel-

This PR implements the alternative RegistrationFile approach outlined in JEP 66 that allows for a "handshake" to occur between the client and server on startup. In particular, it allows the server to be in charge of picking the ports, and immediately binds to them as it picks them, avoiding any race conditions here.

If the --connection_file argument provided to ark can parse into this structure:

pub struct RegistrationFile {
    /// The transport type to use for ZeroMQ; generally "tcp"
    pub transport: String,

    /// The signature scheme to use for messages; generally "hmac-sha256"
    pub signature_scheme: String,

    /// The IP address to bind to
    pub ip: String,

    /// The HMAC-256 signing key, or an empty string for an unauthenticated
    /// connection
    pub key: String,

    /// ZeroMQ port: Registration messages (handshake)
    pub registration_port: u16,
}

Then we assume we are going to be using the handshake method of connecting. Otherwise we parse into the typical ConnectionFile structure and assume the Client picked the ports.

We expect that the Client binds to a zmq::REP socket on registration_port. Ark, as the Server, will then connect to this registration_port as a zmq::REQ socket.

Ark will pick ports, bind to them, and send this message over the registration socket:

pub struct HandshakeRequest {
    /// ZeroMQ port: Control channel (kernel interrupts)
    pub control_port: u16,

    /// ZeroMQ port: Shell channel (execution, completion)
    pub shell_port: u16,

    /// ZeroMQ port: Standard input channel (prompts)
    pub stdin_port: u16,

    /// ZeroMQ port: IOPub channel (broadcasts input/output)
    pub iopub_port: u16,

    /// ZeroMQ port: Heartbeat messages (echo)
    pub hb_port: u16,
}

Ark will then immediately block, waiting for this HandshakeReply:

pub struct HandshakeReply {
    /// The execution status ("ok" or "error")
    pub status: Status,
}

This is just a receipt from the Client that confirms that it received the socket information.

If ark does not receive this reply after a few seconds, it will shut itself down.

Ark disconnects from the registration socket after receiving the HandshakeReply, and the kernel proceeds to start up.

Co-authored-by: Lionel Henry lionel@posit.co
Co-authored-by: Davis Vaughan davis@posit.co

@lionel- lionel- force-pushed the feature/race-connection branch from 70a04cf to dbbad02 Compare October 9, 2024 12:12
@lionel-

This comment was marked as resolved.

@DavisVaughan DavisVaughan marked this pull request as ready for review October 9, 2024 21:17
Comment on lines +188 to +195
// TODO!: Without this sleep, `IOPub` `Busy` messages sporadically
// don't arrive when running integration tests. I believe this is a result
// of PUB sockets dropping messages while in a "mute" state (i.e. no subscriber
// connected yet). Even though we run `iopub_socket.subscribe()` to subscribe,
// it seems like we can return from this function even before our socket
// has fully subscribed, causing messages to get dropped.
// https://libzmq.readthedocs.io/en/latest/zmq_socket.html
std::thread::sleep(std::time::Duration::from_millis(500));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #577

Comment on lines -84 to -86
// Give the kernel a little time to start up
info!("Waiting 500ms for kernel startup to complete");
std::thread::sleep(std::time::Duration::from_millis(500));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notably I don't think we need this sleep anymore

Comment on lines +66 to 85
// Start the kernel and REPL in a background thread, does not return and is never joined.
// Must run `start_kernel()` in a background thread because it blocks until it receives
// a `HandshakeReply`, which we send from `from_connection()` below.
stdext::spawn!("dummy_kernel", move || {
crate::start::start_kernel(
connection_file,
Some(registration_file),
vec![
String::from("--interactive"),
String::from("--vanilla"),
String::from("--no-save"),
String::from("--no-restore"),
],
None,
session_mode,
false,
);

RMain::start();
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lionel- since we are moving the kernel setup() back onto the same thread as the RMain::start(), I think we can move R_MAIN_THREAD_ID back into the setup() method to solve posit-dev/positron#4973

In fact, after we merge this we can probably combine the RMain::setup() and RMain::start() again. I see no reason they should be separate now.

Comment on lines +250 to 260
if let Some(registration_file) = registration_file {
handshake(
registration_file,
&ctx,
&session,
control_port,
shell_port,
stdin_port,
iopub_port,
hb_port,
)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we bind to all our sockets, we do this optional handshake with the client

Comment on lines +537 to +541
// Wait for the handshake reply with a 5 second timeout.
// If we don't get a handshake reply, we are going to eventually panic and shut down.
if !registration_socket
.poll_incoming(5000)
.map_err(|err| Error::ZmqError(registration_socket.name.clone(), err))?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ark waits 5 seconds before self imploding

Comment on lines +577 to +617
pub(crate) fn port_from_socket(socket: &Socket) -> crate::Result<u16> {
let name = socket.name.as_str();

let address = match socket.socket.get_last_endpoint() {
Ok(address) => address,
Err(err) => {
return Err(crate::anyhow!(
"Can't access last endpoint of '{name}' socket due to {err:?}"
));
},
};

let address = match address {
Ok(address) => address,
Err(_) => {
return Err(crate::anyhow!(
"Can't access last endpoint of '{name}' socket."
));
},
};

// We've got the full address but we only want the port at the very end
let Some(loc) = address.rfind(":") else {
return Err(crate::anyhow!(
"Failed to find port in the '{name}' socket address."
));
};

let port = &address[(loc + 1)..];

let port = match port.parse::<u16>() {
Ok(port) => port,
Err(err) => {
return Err(crate::anyhow!(
"Can't parse port '{port}' into a `u16` due to {err:?}"
));
},
};

Ok(port)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was kind of ugly but should work for TCP sockets. The only other option is to do what the Python implementation does when they pick a random port, which is to just choose some random port number and try to bind() to it. If it works, great, and they know the port number. If it doesn't they pick another one.

We just use 0 and let the OS pick a port, but its kind of hard to get at the port number.

Co-authored-by: Davis Vaughan <davis@rstudio.com>
None,
connection_file.endpoint(connection_file.shell_port),
)?;
let shell_port = port_finalize(&shell_socket, connection_file.shell_port)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the port might be chosen randomly by the OS, depending on the details of the connection file, we now potentially have to retrieve the port from the socket with port_finalize().

Comment on lines -46 to -70
pub struct Kernel {
/// The name of the kernel.
name: String,

/// The connection metadata.
connection: ConnectionFile,

/// The unique session information for this kernel session.
session: Session,

/// Sends messages to the IOPub socket. This field is used throughout the
/// kernel codebase to send events to the frontend; use `create_iopub_tx`
/// to access it.
iopub_tx: Sender<IOPubMessage>,

/// Receives message sent to the IOPub socket
iopub_rx: Option<Receiver<IOPubMessage>>,

/// Sends notifications about comm changes and events to the comm manager.
/// Use `create_comm_manager_tx` to access it.
comm_manager_tx: Sender<CommManagerEvent>,

/// Receives notifications about comm changes and events
comm_manager_rx: Receiver<CommManagerEvent>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were able to remove all of this state from Amalthea's Kernel. For instance the IOPub and CommManager channels are now created by the caller ofconnect().

That's why the diff in this file is pretty big. We removed the Kernel struct altogether and all impls are now free functions in the kernel module. Most of the diff comes from the ensuing unnesting.

@lionel- lionel- merged commit 8194c2a into main Oct 10, 2024
6 checks passed
@lionel- lionel- deleted the feature/race-connection branch October 10, 2024 07:31
@github-actions github-actions bot locked and limited conversation to collaborators Oct 10, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fix Jupyter connection race condition
2 participants