Skip to content

Commit

Permalink
feat(hydro_cli): added basic wrapper for hydro deploy Maelstrom integ…
Browse files Browse the repository at this point in the history
…ration
  • Loading branch information
Ryan Alameddine committed Nov 18, 2023
1 parent 3136e0f commit f84c542
Show file tree
Hide file tree
Showing 19 changed files with 1,406 additions and 12 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"benches",
"hydro_cli",
"hydro_cli_examples",
"hydro_cli_maelstrom",
"hydroflow",
"hydroflow_cli_integration",
"hydroflow_datalog",
Expand Down
11 changes: 10 additions & 1 deletion hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::any::Any;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Weak};

Expand Down Expand Up @@ -65,11 +66,19 @@ impl Service for CustomService {
Ok(())
}

async fn start(&mut self) {}
async fn start(&mut self, names: &HashMap<usize, String>) {}

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

unused variable: `names`

async fn stop(&mut self) -> Result<()> {
Ok(())
}

fn name(&self) -> String {
self._id.to_string()
}

fn id(&self) -> usize {
self._id
}
}

pub struct CustomClientPort {
Expand Down
22 changes: 20 additions & 2 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::{Arc, Weak};

use anyhow::Result;
use futures::future::join_all;
use tokio::sync::RwLock;

use super::{progress, Host, ResourcePool, ResourceResult, Service};
Expand Down Expand Up @@ -106,14 +108,30 @@ impl Deployment {
.collect::<Vec<_>>();
self.services = active_services;

let node_names: HashMap<usize, String> =
join_all(self.services.iter().map(|service| async {
let service = service.upgrade().unwrap();
let service = service.read().await;
(service.id(), service.name())
}))
.await
.into_iter()
.collect();

let all_services_start =
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await;
service
.upgrade()
.unwrap()
.write()
.await
.start(&node_names)
.await;
});

futures::future::join_all(all_services_start).await;
join_all(all_services_start).await;
}

pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(
Expand Down
18 changes: 15 additions & 3 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl Service for HydroflowCrate {
.await
}

async fn start(&mut self) {
async fn start(&mut self, names: &HashMap<usize, String>) {
if self.started {
return;
}
Expand All @@ -296,7 +296,9 @@ impl Service for HydroflowCrate {
sink_ports.insert(port_name.clone(), outgoing.load_instantiated(&|p| p).await);
}

let formatted_defns = serde_json::to_string(&sink_ports).unwrap();
let payload = (&sink_ports, self.id, names);

let formatted_start = serde_json::to_string(&payload).unwrap();

self.launched_binary
.as_mut()
Expand All @@ -305,7 +307,7 @@ impl Service for HydroflowCrate {
.await
.stdin()
.await
.send(format!("start: {formatted_defns}\n"))
.send(format!("start: {formatted_start}\n"))
.await
.unwrap();

Expand Down Expand Up @@ -333,4 +335,14 @@ impl Service for HydroflowCrate {

Ok(())
}

fn name(&self) -> String {
self.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id))
}

fn id(&self) -> usize {
self.id
}
}
8 changes: 7 additions & 1 deletion hydro_cli/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,14 @@ pub trait Service: Send + Sync {
async fn ready(&mut self) -> Result<()>;

/// Starts the service by having it connect to other services and start computations.
async fn start(&mut self);
/// Takes in a map from service id to service name for all services.
async fn start(&mut self, names: &HashMap<usize, String>);

/// Stops the service by having it disconnect from other services and stop computations.
async fn stop(&mut self) -> Result<()>;

/// Returns the id of the service
fn id(&self) -> usize;
/// Returns the display name of the service
fn name(&self) -> String;
}
6 changes: 6 additions & 0 deletions hydro_cli_examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ name = "pn_counter_delta"
[[example]]
name = "ws_chat_server"

[[example]]
name = "maelstrom_unique_id"

[[example]]
name = "maelstrom_broadcast"

[dev-dependencies]
hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] }
hydroflow_datalog = { path = "../hydroflow_datalog" }
Expand Down
71 changes: 71 additions & 0 deletions hydro_cli_examples/examples/echo/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use hydroflow::hydroflow_syntax;
use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource};
use hydroflow::util::serialize_to_bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EchoMsg {
pub msg_id: Value,
pub echo: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EchoOkMsg {
pub echo: String,
pub in_reply_to: Value,
}

impl EchoMsg {
/// Generate EchoOkMsg response to this EchoMsg
fn response(
EchoMsg {
echo,
msg_id: source_msg_id,
}: Self,
) -> EchoOkMsg {
EchoOkMsg {
echo,
in_reply_to: source_msg_id,
}
}
}

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;

// TODO: use ConnectedDemux?
let echo_in = ports
.port("echo_in")
.connect::<ConnectedDirect>()
.await
.into_source();
let echo_out = ports
.port("echo_out")
.connect::<ConnectedDirect>()
.await
.into_sink();

let df = hydroflow_syntax! {
input = source_stream(echo_in)
-> map(Result::unwrap)
-> map(|x| x.to_vec())
-> map(String::from_utf8)
-> map(Result::unwrap);

output = map(|x| serde_json::to_string(&x))
-> map(Result::unwrap)
-> map(serialize_to_bytes)
-> dest_sink(echo_out);


input
-> map(|x| serde_json::from_str::<EchoMsg>(&x).unwrap())
//-> map(|x| EchoMsg {msg_id: x.msg_id, echo: x.echo + "hi"})
-> map(EchoMsg::response)
-> output;
};

hydroflow::util::cli::launch_flow(df).await;
}
Loading

0 comments on commit f84c542

Please sign in to comment.