Skip to content

Commit

Permalink
chore(deps): update rust crate interprocess to v2 (#1915)
Browse files Browse the repository at this point in the history
  • Loading branch information
renovate[bot] authored Jun 10, 2024
1 parent 9007bd7 commit 9ac07b2
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 99 deletions.
30 changes: 18 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ humantime = "2.1.0"
http = "1.1.0"
httpmock = "0.7"
hyper = "1.0"
interprocess = { version = "1", default-features = false }
interprocess = { version = "2", default-features = false }
indoc = "2"
lazycell = "1"
lazy_static = "1.4"
Expand Down
16 changes: 8 additions & 8 deletions src/command/dev/do_dev.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use anyhow::{anyhow, Context};
use camino::Utf8PathBuf;
use rover_std::Emoji;
use crossbeam_channel::bounded as sync_channel;

use super::protocol::{FollowerChannel, FollowerMessenger, LeaderChannel, LeaderSession};
use super::router::RouterConfigHandler;
use super::Dev;
use rover_std::Emoji;

use crate::command::dev::protocol::FollowerMessage;
use crate::utils::client::StudioClientConfig;
use crate::{RoverError, RoverOutput, RoverResult};

use crossbeam_channel::bounded as sync_channel;
use super::protocol::{FollowerChannel, FollowerMessenger, LeaderChannel, LeaderSession};
use super::router::RouterConfigHandler;
use super::Dev;

pub fn log_err_and_continue(err: RoverError) -> RoverError {
let _ = err.print();
Expand All @@ -29,7 +29,7 @@ impl Dev {

let router_config_handler = RouterConfigHandler::try_from(&self.opts.supergraph_opts)?;
let router_address = router_config_handler.get_router_address();
let ipc_socket_addr = router_config_handler.get_ipc_address()?;
let raw_socket_name = router_config_handler.get_raw_socket_name();
let leader_channel = LeaderChannel::new();
let follower_channel = FollowerChannel::new();

Expand Down Expand Up @@ -116,14 +116,14 @@ impl Dev {
.join()
.expect("could not wait for subgraph watcher thread");
} else {
let follower_messenger = FollowerMessenger::from_attached_session(&ipc_socket_addr);
let follower_messenger = FollowerMessenger::from_attached_session(&raw_socket_name);
let mut subgraph_refresher = self.opts.subgraph_opts.get_subgraph_watcher(
router_address,
&client_config,
follower_messenger.clone(),
)?;
tracing::info!(
"connecting to existing `rover dev` process by communicating via the interprocess socket located at {ipc_socket_addr}"
"connecting to existing `rover dev` process by communicating via the interprocess socket located at {raw_socket_name}",
);

// start the interprocess socket health check in the background
Expand Down
26 changes: 14 additions & 12 deletions src/command/dev/protocol/follower/messenger.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::{fmt::Debug, io::BufReader, time::Duration};

use anyhow::anyhow;
use apollo_federation_types::build::SubgraphDefinition;
use crossbeam_channel::{Receiver, Sender};
use interprocess::local_socket::LocalSocketStream;
use std::{fmt::Debug, io::BufReader, time::Duration};

use crate::{RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION};
use interprocess::local_socket::traits::Stream;

use crate::command::dev::protocol::{
socket_read, socket_write, FollowerMessage, LeaderMessageKind, SubgraphKeys, SubgraphName,
create_socket_name, socket_read, socket_write, FollowerMessage, LeaderMessageKind,
SubgraphKeys, SubgraphName,
};
use crate::{RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION};

#[derive(Clone, Debug)]
pub struct FollowerMessenger {
Expand All @@ -30,9 +31,9 @@ impl FollowerMessenger {
}

/// Create a [`FollowerMessenger`] for an attached session that can talk to the main session via a socket.
pub fn from_attached_session(ipc_socket_addr: &str) -> Self {
pub fn from_attached_session(raw_socket_name: &str) -> Self {
Self {
kind: FollowerMessengerKind::from_attached_session(ipc_socket_addr.to_string()),
kind: FollowerMessengerKind::from_attached_session(raw_socket_name.to_string()),
}
}

Expand Down Expand Up @@ -108,7 +109,7 @@ enum FollowerMessengerKind {
leader_message_receiver: Receiver<LeaderMessageKind>,
},
FromAttachedSession {
ipc_socket_addr: String,
raw_socket_name: String,
},
}

Expand All @@ -123,8 +124,8 @@ impl FollowerMessengerKind {
}
}

fn from_attached_session(ipc_socket_addr: String) -> Self {
Self::FromAttachedSession { ipc_socket_addr }
fn from_attached_session(raw_socket_name: String) -> Self {
Self::FromAttachedSession { raw_socket_name }
}

fn message_leader(
Expand All @@ -149,8 +150,9 @@ impl FollowerMessengerKind {

leader_message
}
FromAttachedSession { ipc_socket_addr } => {
let stream = LocalSocketStream::connect(&**ipc_socket_addr).map_err(|_| {
FromAttachedSession { raw_socket_name } => {
let socket_name = create_socket_name(raw_socket_name)?;
let stream = Stream::connect(socket_name).map_err(|_| {
let mut err = RoverError::new(anyhow!(
"there is not a main `rover dev` process to report updates to"
));
Expand Down
78 changes: 45 additions & 33 deletions src/command/dev/protocol/leader.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
use crate::{
command::dev::{
compose::ComposeRunner,
do_dev::log_err_and_continue,
router::{RouterConfigHandler, RouterRunner},
OVERRIDE_DEV_COMPOSITION_VERSION,
},
options::PluginOpts,
utils::client::StudioClientConfig,
RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION,
use std::{
collections::{hash_map::Entry::Vacant, HashMap},
fmt::Debug,
io::BufReader,
net::TcpListener,
};

use anyhow::{anyhow, Context};
use apollo_federation_types::{
build::SubgraphDefinition,
config::{FederationVersion, SupergraphConfig},
};
use camino::Utf8PathBuf;
use crossbeam_channel::{bounded, Receiver, Sender};
use interprocess::local_socket::{LocalSocketListener, LocalSocketStream};
use rover_std::Emoji;
use interprocess::local_socket::traits::{ListenerExt, Stream};
use interprocess::local_socket::ListenerOptions;
use semver::Version;
use serde::{Deserialize, Serialize};

use std::{
collections::{hash_map::Entry::Vacant, HashMap},
fmt::Debug,
io::BufReader,
net::TcpListener,
use rover_std::Emoji;

use crate::{
command::dev::{
compose::ComposeRunner,
do_dev::log_err_and_continue,
router::{RouterConfigHandler, RouterRunner},
OVERRIDE_DEV_COMPOSITION_VERSION,
},
options::PluginOpts,
utils::client::StudioClientConfig,
RoverError, RoverErrorSuggestion, RoverResult, PKG_VERSION,
};

use super::{
create_socket_name,
socket::{handle_socket_error, socket_read, socket_write},
types::{
CompositionResult, SubgraphEntry, SubgraphKey, SubgraphKeys, SubgraphName, SubgraphSdl,
Expand All @@ -39,7 +43,7 @@ use super::{
#[derive(Debug)]
pub struct LeaderSession {
subgraphs: HashMap<SubgraphKey, SubgraphSdl>,
ipc_socket_addr: String,
raw_socket_name: String,
compose_runner: ComposeRunner,
router_runner: RouterRunner,
follower_channel: FollowerChannel,
Expand All @@ -63,10 +67,12 @@ impl LeaderSession {
plugin_opts: PluginOpts,
router_config_handler: RouterConfigHandler,
) -> RoverResult<Option<Self>> {
let ipc_socket_addr = router_config_handler.get_ipc_address()?;
let raw_socket_name = router_config_handler.get_raw_socket_name();
let router_socket_addr = router_config_handler.get_router_address();
if let Ok(stream) = LocalSocketStream::connect(&*ipc_socket_addr) {
// write to the socket so we don't make the other session deadlock waiting on a message
let socket_name = create_socket_name(&raw_socket_name)?;

if let Ok(stream) = Stream::connect(socket_name.clone()) {
// write to the socket, so we don't make the other session deadlock waiting on a message
let mut stream = BufReader::new(stream);
socket_write(&FollowerMessage::health_check(false)?, &mut stream)?;
let _ = LeaderSession::socket_read(&mut stream);
Expand All @@ -80,7 +86,7 @@ impl LeaderSession {
//
// remove the socket file before starting in case it was here from last time
// if we can't connect to it, it's safe to remove
let _ = std::fs::remove_file(&ipc_socket_addr);
let _ = std::fs::remove_file(&raw_socket_name);

if TcpListener::bind(router_socket_addr).is_err() {
let mut err =
Expand Down Expand Up @@ -126,7 +132,7 @@ impl LeaderSession {

Ok(Some(Self {
subgraphs: HashMap::new(),
ipc_socket_addr,
raw_socket_name,
compose_runner,
router_runner,
follower_channel,
Expand Down Expand Up @@ -165,15 +171,19 @@ impl LeaderSession {

/// Listen on the socket for incoming [`FollowerMessageKind`] messages.
fn receive_messages_from_attached_sessions(&self) -> RoverResult<()> {
let listener = LocalSocketListener::bind(&*self.ipc_socket_addr).with_context(|| {
format!(
"could not start local socket server at {}",
&self.ipc_socket_addr
)
})?;
let socket_name = create_socket_name(&self.raw_socket_name)?;
let listener = ListenerOptions::new()
.name(socket_name)
.create_sync()
.with_context(|| {
format!(
"could not start local socket server at {:?}",
&self.raw_socket_name
)
})?;
tracing::info!(
"connected to socket {}, waiting for messages",
&self.ipc_socket_addr
&self.raw_socket_name
);

let follower_message_sender = self.follower_channel.sender.clone();
Expand Down Expand Up @@ -307,7 +317,9 @@ impl LeaderSession {
}

/// Reads a [`FollowerMessage`] from an open socket connection.
fn socket_read(stream: &mut BufReader<LocalSocketStream>) -> RoverResult<FollowerMessage> {
fn socket_read(
stream: &mut BufReader<interprocess::local_socket::Stream>,
) -> RoverResult<FollowerMessage> {
socket_read(stream)
.map(|message| {
tracing::debug!("leader received message {:?}", &message);
Expand All @@ -322,7 +334,7 @@ impl LeaderSession {
/// Writes a [`LeaderMessageKind`] to an open socket connection.
fn socket_write(
message: LeaderMessageKind,
stream: &mut BufReader<LocalSocketStream>,
stream: &mut BufReader<interprocess::local_socket::Stream>,
) -> RoverResult<()> {
tracing::debug!("leader sending message {:?}", message);
socket_write(&message, stream)
Expand Down Expand Up @@ -350,7 +362,7 @@ impl LeaderSession {
/// Shuts the router down, removes the socket file, and exits the process.
pub fn shutdown(&mut self) {
let _ = self.router_runner.kill().map_err(log_err_and_continue);
let _ = std::fs::remove_file(&self.ipc_socket_addr);
let _ = std::fs::remove_file(&self.raw_socket_name);
std::process::exit(1)
}

Expand Down
14 changes: 10 additions & 4 deletions src/command/dev/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
mod follower;
mod leader;
mod socket;
mod types;
use interprocess::local_socket::{GenericFilePath, Name, ToFsName};

pub use follower::*;
pub use leader::*;
pub(crate) use socket::*;
pub use types::*;

mod follower;
mod leader;
mod socket;
mod types;

pub(crate) fn create_socket_name(raw_socket_name: &str) -> std::io::Result<Name> {
raw_socket_name.to_fs_name::<GenericFilePath>()
}
Loading

0 comments on commit 9ac07b2

Please sign in to comment.