Skip to content

Commit

Permalink
Improvements to deployment setup
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 18, 2023
1 parent c218bd5 commit a959b63
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 100 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion hydro_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ nanoid = "0.4.0"
ctrlc = "3.2.5"
nix = "0.26.2"
hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.3.0" }
indicatif = "0.17.3"
indicatif = "0.17.6"
cargo_metadata = "0.15.4"

[dev-dependencies]
4 changes: 3 additions & 1 deletion hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ impl Service for CustomService {
Ok(())
}

async fn start(&mut self) {}
async fn start(&mut self) -> Result<()> {
Ok(())
}

async fn stop(&mut self) -> Result<()> {
Ok(())
Expand Down
18 changes: 10 additions & 8 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct Deployment {

impl Deployment {
pub async fn deploy(&mut self) -> Result<()> {
progress::ProgressTracker::with_group("deploy", || async {
progress::ProgressTracker::with_group("deploy", None, || async {
let mut resource_batch = super::ResourceBatch::new();
let active_services = self
.services
Expand All @@ -41,7 +41,7 @@ impl Deployment {
}

let result = Arc::new(
progress::ProgressTracker::with_group("provision", || async {
progress::ProgressTracker::with_group("provision", None, || async {
resource_batch
.provision(&mut self.resource_pool, self.last_resource_result.clone())
.await
Expand All @@ -50,7 +50,7 @@ impl Deployment {
);
self.last_resource_result = Some(result.clone());

progress::ProgressTracker::with_group("provision", || {
progress::ProgressTracker::with_group("provision", None, || {
let hosts_provisioned =
self.hosts
.iter_mut()
Expand All @@ -61,7 +61,7 @@ impl Deployment {
})
.await;

progress::ProgressTracker::with_group("deploy", || {
progress::ProgressTracker::with_group("deploy", None, || {
let services_future =
self.services
.iter_mut()
Expand All @@ -79,7 +79,7 @@ impl Deployment {
})
.await;

progress::ProgressTracker::with_group("ready", || {
progress::ProgressTracker::with_group("ready", None, || {
let all_services_ready =
self.services
.iter()
Expand All @@ -97,7 +97,7 @@ impl Deployment {
.await
}

pub async fn start(&mut self) {
pub async fn start(&mut self) -> Result<()> {
let active_services = self
.services
.iter()
Expand All @@ -110,10 +110,12 @@ impl Deployment {
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await;
service.upgrade().unwrap().write().await.start().await?;
Ok(()) as Result<()>
});

futures::future::join_all(all_services_start).await;
futures::future::try_join_all(all_services_start).await?;
Ok(())
}

pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(
Expand Down
67 changes: 38 additions & 29 deletions hydro_cli/src/core/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,35 +92,44 @@ impl LaunchedSSHHost for LaunchedComputeEngine {
22,
);

let res = ProgressTracker::leaf(
format!(
"connecting to host @ {}",
self.external_ip.as_ref().unwrap()
),
async_retry(
&|| async {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

session.handshake().await?;

session
.userauth_pubkey_file(
self.user.as_str(),
None,
self.ssh_key_path().as_path(),
None,
)
.await?;

Ok(session)
},
10,
Duration::from_secs(1),
),
let mut attempt_count = 0;

let res = async_retry(
|| {
attempt_count += 1;
ProgressTracker::leaf(
format!(
"connecting to host @ {} (attempt: {})",
self.external_ip.as_ref().unwrap(),
attempt_count
),
async {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

tokio::time::timeout(Duration::from_secs(15), async move {
session.handshake().await?;

session
.userauth_pubkey_file(
self.user.as_str(),
None,
self.ssh_key_path().as_path(),
None,
)
.await?;

Ok(session)
})
.await?
},
)
},
10,
Duration::from_secs(1),
)
.await?;

Expand Down
27 changes: 24 additions & 3 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ impl Service for HydroflowCrate {
.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id)),
None,
|| async {
let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
Expand All @@ -237,6 +238,7 @@ impl Service for HydroflowCrate {
.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id)),
None,
|| async {
let launched_host = self.launched_host.as_ref().unwrap();

Expand All @@ -261,7 +263,7 @@ impl Service for HydroflowCrate {
let formatted_bind_config = serde_json::to_string(&bind_config).unwrap();

// request stdout before sending config so we don't miss the "ready" response
let stdout_receiver = binary.write().await.stdout().await;
let stdout_receiver = binary.write().await.cli_stdout().await;

binary
.write()
Expand Down Expand Up @@ -291,9 +293,9 @@ impl Service for HydroflowCrate {
.await
}

async fn start(&mut self) {
async fn start(&mut self) -> Result<()> {
if self.started {
return;
return Ok(());
}

let mut sink_ports = HashMap::new();
Expand All @@ -303,6 +305,15 @@ impl Service for HydroflowCrate {

let formatted_defns = serde_json::to_string(&sink_ports).unwrap();

let stdout_receiver = self
.launched_binary
.as_mut()
.unwrap()
.write()
.await
.cli_stdout()
.await;

self.launched_binary
.as_mut()
.unwrap()
Expand All @@ -314,7 +325,17 @@ impl Service for HydroflowCrate {
.await
.unwrap();

let start_ack_line = ProgressTracker::leaf(
"waiting for ack start".to_string(),
tokio::time::timeout(Duration::from_secs(60), stdout_receiver.recv()),
)
.await??;
if !start_ack_line.starts_with("ack start") {
bail!("expected ack start");
}

self.started = true;
Ok(())
}

async fn stop(&mut self) -> Result<()> {
Expand Down
44 changes: 37 additions & 7 deletions hydro_cli/src/core/localhost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::{
struct LaunchedLocalhostBinary {
child: RwLock<async_process::Child>,
stdin_sender: Sender<String>,
stdout_cli_receivers: Arc<RwLock<Vec<Sender<String>>>>,
stdout_receivers: Arc<RwLock<Vec<Sender<String>>>>,
stderr_receivers: Arc<RwLock<Vec<Sender<String>>>>,
}
Expand All @@ -32,6 +33,13 @@ impl LaunchedBinary for LaunchedLocalhostBinary {
self.stdin_sender.clone()
}

async fn cli_stdout(&self) -> Receiver<String> {
let mut receivers = self.stdout_cli_receivers.write().await;
let (sender, receiver) = async_channel::unbounded::<String>();
receivers.push(sender);
receiver
}

async fn stdout(&self) -> Receiver<String> {
let mut receivers = self.stdout_receivers.write().await;
let (sender, receiver) = async_channel::unbounded::<String>();
Expand Down Expand Up @@ -72,14 +80,34 @@ struct LaunchedLocalhost {}
pub fn create_broadcast<T: AsyncRead + Send + Unpin + 'static>(
source: T,
default: impl Fn(String) + Send + 'static,
) -> Arc<RwLock<Vec<Sender<String>>>> {
) -> (
Arc<RwLock<Vec<Sender<String>>>>,
Arc<RwLock<Vec<Sender<String>>>>,
) {
let cli_receivers = Arc::new(RwLock::new(Vec::<Sender<String>>::new()));
let receivers = Arc::new(RwLock::new(Vec::<Sender<String>>::new()));

let weak_cli_receivers = Arc::downgrade(&cli_receivers);
let weak_receivers = Arc::downgrade(&receivers);

tokio::spawn(async move {
let mut lines = BufReader::new(source).lines();

while let Some(Result::Ok(line)) = lines.next().await {
'line_loop: while let Some(Result::Ok(line)) = lines.next().await {
if let Some(cli_receivers) = weak_cli_receivers.upgrade() {
let mut cli_receivers = cli_receivers.write().await;
let mut successful_send = false;
for r in cli_receivers.iter() {
successful_send |= r.send(line.clone()).await.is_ok();
}

cli_receivers.retain(|r| !r.is_closed());

if successful_send {
continue 'line_loop;
}
}

if let Some(receivers) = weak_receivers.upgrade() {
let mut receivers = receivers.write().await;
let mut successful_send = false;
Expand All @@ -98,7 +126,7 @@ pub fn create_broadcast<T: AsyncRead + Send + Unpin + 'static>(
}
});

receivers
(cli_receivers, receivers)
}

#[async_trait]
Expand Down Expand Up @@ -158,16 +186,18 @@ impl LaunchedHost for LaunchedLocalhost {
});

let id_clone = id.clone();
let stdout_receivers = create_broadcast(child.stdout.take().unwrap(), move |s| {
println!("[{id_clone}] {s}")
});
let stderr_receivers = create_broadcast(child.stderr.take().unwrap(), move |s| {
let (stdout_cli_receivers, stdout_receivers) =
create_broadcast(child.stdout.take().unwrap(), move |s| {
println!("[{id_clone}] {s}")
});
let (_, stderr_receivers) = create_broadcast(child.stderr.take().unwrap(), move |s| {
eprintln!("[{id}] {s}")
});

Ok(Arc::new(RwLock::new(LaunchedLocalhostBinary {
child: RwLock::new(child),
stdin_sender,
stdout_cli_receivers,
stdout_receivers,
stderr_receivers,
})))
Expand Down
3 changes: 2 additions & 1 deletion hydro_cli/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct ResourceResult {
#[async_trait]
pub trait LaunchedBinary: Send + Sync {
async fn stdin(&self) -> Sender<String>;
async fn cli_stdout(&self) -> Receiver<String>;
async fn stdout(&self) -> Receiver<String>;
async fn stderr(&self) -> Receiver<String>;

Expand Down Expand Up @@ -186,7 +187,7 @@ pub trait Service: Send + Sync {
async fn ready(&mut self) -> Result<()>;

/// Starts the service by having it connect to other services and start computations.
async fn start(&mut self);
async fn start(&mut self) -> Result<()>;

/// Stops the service by having it disconnect from other services and stop computations.
async fn stop(&mut self) -> Result<()>;
Expand Down
Loading

0 comments on commit a959b63

Please sign in to comment.