Skip to content

Commit

Permalink
Create a macro to build new socket names
Browse files Browse the repository at this point in the history
Then we can re-use it throughout without affecting
the underlying API too much
  • Loading branch information
jonathanrainer committed Jun 4, 2024
1 parent e4724b3 commit 821d5ea
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 54 deletions.
16 changes: 8 additions & 8 deletions src/command/dev/do_dev.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use anyhow::{anyhow, Context};
use camino::Utf8PathBuf;
use rover_std::Emoji;
use crossbeam_channel::bounded as sync_channel;

use super::protocol::{FollowerChannel, FollowerMessenger, LeaderChannel, LeaderSession};
use super::router::RouterConfigHandler;
use super::Dev;
use rover_std::Emoji;

use crate::command::dev::protocol::FollowerMessage;
use crate::utils::client::StudioClientConfig;
use crate::{RoverError, RoverOutput, RoverResult};

use crossbeam_channel::bounded as sync_channel;
use super::protocol::{FollowerChannel, FollowerMessenger, LeaderChannel, LeaderSession};
use super::router::RouterConfigHandler;
use super::Dev;

pub fn log_err_and_continue(err: RoverError) -> RoverError {
let _ = err.print();
Expand All @@ -29,7 +29,7 @@ impl Dev {

let router_config_handler = RouterConfigHandler::try_from(&self.opts.supergraph_opts)?;
let router_address = router_config_handler.get_router_address();
let ipc_socket_addr = router_config_handler.get_ipc_address()?;
let raw_socket_name = router_config_handler.get_raw_socket_name();
let leader_channel = LeaderChannel::new();
let follower_channel = FollowerChannel::new();

Expand Down Expand Up @@ -116,14 +116,14 @@ impl Dev {
.join()
.expect("could not wait for subgraph watcher thread");
} else {
let follower_messenger = FollowerMessenger::from_attached_session(&ipc_socket_addr);
let follower_messenger = FollowerMessenger::from_attached_session(&raw_socket_name);
let mut subgraph_refresher = self.opts.subgraph_opts.get_subgraph_watcher(
router_address,
&client_config,
follower_messenger.clone(),
)?;
tracing::info!(
"connecting to existing `rover dev` process by communicating via the interprocess socket located at {ipc_socket_addr}"
"connecting to existing `rover dev` process by communicating via the interprocess socket located at {raw_socket_name}",
);

// start the interprocess socket health check in the background
Expand Down
29 changes: 17 additions & 12 deletions src/command/dev/protocol/follower/messenger.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::{fmt::Debug, io::BufReader, time::Duration};

use anyhow::anyhow;
use apollo_federation_types::build::SubgraphDefinition;
use crossbeam_channel::{Receiver, Sender};
use interprocess::local_socket::LocalSocketStream;
use std::{fmt::Debug, io::BufReader, time::Duration};

use crate::{RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION};
use interprocess::local_socket::traits::Stream;
use interprocess::local_socket::{
GenericFilePath, GenericNamespaced, NameType, ToFsName, ToNsName,
};

use crate::command::dev::protocol::{
socket_read, socket_write, FollowerMessage, LeaderMessageKind, SubgraphKeys, SubgraphName,
create_socket_name, socket_read, socket_write, FollowerMessage, LeaderMessageKind,
SubgraphKeys, SubgraphName,
};
use crate::{RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION};

#[derive(Clone, Debug)]
pub struct FollowerMessenger {
Expand All @@ -30,9 +34,9 @@ impl FollowerMessenger {
}

/// Create a [`FollowerMessenger`] for an attached session that can talk to the main session via a socket.
pub fn from_attached_session(ipc_socket_addr: &str) -> Self {
pub fn from_attached_session(raw_socket_name: &str) -> Self {
Self {
kind: FollowerMessengerKind::from_attached_session(ipc_socket_addr.to_string()),
kind: FollowerMessengerKind::from_attached_session(raw_socket_name.to_string()),
}
}

Expand Down Expand Up @@ -108,7 +112,7 @@ enum FollowerMessengerKind {
leader_message_receiver: Receiver<LeaderMessageKind>,
},
FromAttachedSession {
ipc_socket_addr: String,
raw_socket_name: String,
},
}

Expand All @@ -123,8 +127,8 @@ impl FollowerMessengerKind {
}
}

fn from_attached_session(ipc_socket_addr: String) -> Self {
Self::FromAttachedSession { ipc_socket_addr }
fn from_attached_session(raw_socket_name: String) -> Self {
Self::FromAttachedSession { raw_socket_name }
}

fn message_leader(
Expand All @@ -149,8 +153,9 @@ impl FollowerMessengerKind {

leader_message
}
FromAttachedSession { ipc_socket_addr } => {
let stream = LocalSocketStream::connect(&**ipc_socket_addr).map_err(|_| {
FromAttachedSession { raw_socket_name } => {
let socket_name = create_socket_name!(raw_socket_name);
let stream = Stream::connect(socket_name).map_err(|_| {
let mut err = RoverError::new(anyhow!(
"there is not a main `rover dev` process to report updates to"
));
Expand Down
74 changes: 44 additions & 30 deletions src/command/dev/protocol/leader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
use std::{collections::HashMap, fmt::Debug, io::BufReader, net::TcpListener};

use anyhow::{anyhow, Context};
use apollo_federation_types::{
build::SubgraphDefinition,
config::{FederationVersion, SupergraphConfig},
};
use camino::Utf8PathBuf;
use crossbeam_channel::{bounded, Receiver, Sender};
use interprocess::local_socket::traits::{ListenerExt, Stream};
use interprocess::local_socket::{
GenericFilePath, GenericNamespaced, ListenerOptions, NameType, ToFsName, ToNsName,
};
use semver::Version;
use serde::{Deserialize, Serialize};

use rover_std::Emoji;

use crate::{
command::dev::{
compose::ComposeRunner,
Expand All @@ -9,21 +27,9 @@ use crate::{
utils::client::StudioClientConfig,
RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION,
};
use anyhow::{anyhow, Context};
use apollo_federation_types::{
build::SubgraphDefinition,
config::{FederationVersion, SupergraphConfig},
};
use camino::Utf8PathBuf;
use crossbeam_channel::{bounded, Receiver, Sender};
use interprocess::local_socket::{LocalSocketListener, LocalSocketStream};
use rover_std::Emoji;
use semver::Version;
use serde::{Deserialize, Serialize};

use std::{collections::HashMap, fmt::Debug, io::BufReader, net::TcpListener};

use super::{
create_socket_name,
socket::{handle_socket_error, socket_read, socket_write},
types::{
CompositionResult, SubgraphEntry, SubgraphKey, SubgraphKeys, SubgraphName, SubgraphSdl,
Expand All @@ -34,7 +40,7 @@ use super::{
#[derive(Debug)]
pub struct LeaderSession {
subgraphs: HashMap<SubgraphKey, SubgraphSdl>,
ipc_socket_addr: String,
raw_socket_name: String,
compose_runner: ComposeRunner,
router_runner: RouterRunner,
follower_channel: FollowerChannel,
Expand All @@ -58,10 +64,12 @@ impl LeaderSession {
plugin_opts: PluginOpts,
router_config_handler: RouterConfigHandler,
) -> RoverResult<Option<Self>> {
let ipc_socket_addr = router_config_handler.get_ipc_address()?;
let raw_socket_name = router_config_handler.get_raw_socket_name();
let router_socket_addr = router_config_handler.get_router_address();
if let Ok(stream) = LocalSocketStream::connect(&*ipc_socket_addr) {
// write to the socket so we don't make the other session deadlock waiting on a message
let socket_name = create_socket_name!(raw_socket_name);

if let Ok(stream) = Stream::connect(socket_name.clone()) {
// write to the socket, so we don't make the other session deadlock waiting on a message
let mut stream = BufReader::new(stream);
socket_write(&FollowerMessage::health_check(false)?, &mut stream)?;
let _ = LeaderSession::socket_read(&mut stream);
Expand All @@ -75,7 +83,7 @@ impl LeaderSession {
//
// remove the socket file before starting in case it was here from last time
// if we can't connect to it, it's safe to remove
let _ = std::fs::remove_file(&ipc_socket_addr);
let _ = std::fs::remove_file(&raw_socket_name);

if TcpListener::bind(router_socket_addr).is_err() {
let mut err =
Expand Down Expand Up @@ -121,7 +129,7 @@ impl LeaderSession {

Ok(Some(Self {
subgraphs: HashMap::new(),
ipc_socket_addr,
raw_socket_name,
compose_runner,
router_runner,
follower_channel,
Expand Down Expand Up @@ -160,15 +168,19 @@ impl LeaderSession {

/// Listen on the socket for incoming [`FollowerMessageKind`] messages.
fn receive_messages_from_attached_sessions(&self) -> RoverResult<()> {
let listener = LocalSocketListener::bind(&*self.ipc_socket_addr).with_context(|| {
format!(
"could not start local socket server at {}",
&self.ipc_socket_addr
)
})?;
let socket_name = create_socket_name!(self.raw_socket_name);
let listener = ListenerOptions::new()
.name(socket_name)
.create_sync()
.with_context(|| {
format!(
"could not start local socket server at {:?}",
&self.raw_socket_name
)
})?;
tracing::info!(
"connected to socket {}, waiting for messages",
&self.ipc_socket_addr
"connected to socket {:?}, waiting for messages",
&self.raw_socket_name
);

let follower_message_sender = self.follower_channel.sender.clone();
Expand Down Expand Up @@ -306,7 +318,9 @@ impl LeaderSession {
}

/// Reads a [`FollowerMessage`] from an open socket connection.
fn socket_read(stream: &mut BufReader<LocalSocketStream>) -> RoverResult<FollowerMessage> {
fn socket_read(
stream: &mut BufReader<interprocess::local_socket::Stream>,
) -> RoverResult<FollowerMessage> {
socket_read(stream)
.map(|message| {
tracing::debug!("leader received message {:?}", &message);
Expand All @@ -321,7 +335,7 @@ impl LeaderSession {
/// Writes a [`LeaderMessageKind`] to an open socket connection.
fn socket_write(
message: LeaderMessageKind,
stream: &mut BufReader<LocalSocketStream>,
stream: &mut BufReader<interprocess::local_socket::Stream>,
) -> RoverResult<()> {
tracing::debug!("leader sending message {:?}", message);
socket_write(&message, stream)
Expand Down Expand Up @@ -349,7 +363,7 @@ impl LeaderSession {
/// Shuts the router down, removes the socket file, and exits the process.
pub fn shutdown(&mut self) {
let _ = self.router_runner.kill().map_err(log_err_and_continue);
let _ = std::fs::remove_file(&self.ipc_socket_addr);
let _ = std::fs::remove_file(&self.raw_socket_name);
std::process::exit(1)
}

Expand Down
26 changes: 22 additions & 4 deletions src/command/dev/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
pub use follower::*;
pub use leader::*;
pub(crate) use socket::*;
pub use types::*;

mod follower;
mod leader;
mod socket;
mod types;

pub use follower::*;
pub use leader::*;
pub(crate) use socket::*;
pub use types::*;
macro_rules! create_socket_name {
($raw_socket_name:expr) => {
if GenericFilePath::is_supported() {
$raw_socket_name
.clone()
.to_fs_name::<GenericFilePath>()
.unwrap()
} else {
$raw_socket_name
.clone()
.to_ns_name::<GenericNamespaced>()
.unwrap()
}
};
}

pub(crate) use create_socket_name;

0 comments on commit 821d5ea

Please sign in to comment.