Skip to content

Commit

Permalink
fix(hydro_deploy): only instantiate Localhost once
Browse files Browse the repository at this point in the history
Fixes #841
  • Loading branch information
shadaj committed Aug 20, 2024
1 parent 5f2789a commit db297a5
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 26 deletions.
5 changes: 2 additions & 3 deletions docs/docs/hydroflow_plus/quickstart/clusters.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,22 @@ use hydroflow_plus_cli_integration::{DeployProcessSpec, DeployClusterSpec};
#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();
let profile = "dev";

let builder = hydroflow_plus::FlowBuilder::new();
let (leader, workers) = flow::broadcast::broadcast(&builder);

flow.with_default_optimize()
.with_process(&leader, DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("broadcast")
.profile(profile)
.display_name("leader")
}))
.with_cluster(&workers, DeployClusterSpec::new({
(0..2)
.map(|idx| {
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("broadcast")
.profile(profile)
.display_name(format!("worker/{}", idx))
Expand Down
25 changes: 21 additions & 4 deletions hydro_deploy/core/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,41 @@ use super::{
};
use crate::{AzureHost, ServiceBuilder};

#[derive(Default)]
pub struct Deployment {
pub hosts: Vec<Weak<dyn Host>>,
pub services: Vec<Weak<RwLock<dyn Service>>>,
pub resource_pool: ResourcePool,
localhost_host: Option<Arc<LocalhostHost>>,
last_resource_result: Option<Arc<ResourceResult>>,
next_host_id: usize,
next_service_id: usize,
}

impl Default for Deployment {
fn default() -> Self {
Self::new()
}
}

impl Deployment {
pub fn new() -> Self {
Self::default()
let mut ret = Self {
hosts: Vec::new(),
services: Vec::new(),
resource_pool: ResourcePool::default(),
localhost_host: None,
last_resource_result: None,
next_host_id: 0,
next_service_id: 0,
};

ret.localhost_host = Some(ret.add_host(LocalhostHost::new));
ret
}

#[allow(non_snake_case)]
pub fn Localhost(&mut self) -> Arc<LocalhostHost> {
self.add_host(LocalhostHost::new)
pub fn Localhost(&self) -> Arc<LocalhostHost> {
self.localhost_host.clone().unwrap()
}

#[allow(non_snake_case)]
Expand Down
4 changes: 1 addition & 3 deletions hydro_deploy/core/src/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,8 @@ mod tests {
async fn test_crate_panic() {
let mut deployment = deployment::Deployment::new();

let localhost = deployment.Localhost();

let service = deployment.add_service(
HydroflowCrate::new("../hydro_cli_examples", localhost.clone())
HydroflowCrate::new("../hydro_cli_examples", deployment.Localhost())
.example("panic_program")
.profile("dev"),
);
Expand Down
4 changes: 2 additions & 2 deletions hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ impl Deployment {
#[new]
fn new() -> Self {
Deployment {
underlying: Arc::new(RwLock::new(core::Deployment::default())),
underlying: Arc::new(RwLock::new(core::Deployment::new())),
}
}

#[allow(non_snake_case)]
fn Localhost(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let arc = self.underlying.blocking_write().Localhost();
let arc = self.underlying.blocking_read().Localhost();

Ok(Py::new(
py,
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus_test/src/cluster/many_to_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ mod tests {
#[tokio::test]
async fn many_to_many() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let cluster = super::many_to_many(&builder);
Expand All @@ -47,7 +46,7 @@ mod tests {
DeployClusterSpec::new({
(0..2)
.map(|_| {
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("many_to_many")
.profile("dev")
})
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus_test/src/cluster/simple_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ mod tests {
#[tokio::test]
async fn simple_cluster() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let (node, cluster) = super::simple_cluster(&builder);
Expand All @@ -62,7 +61,7 @@ mod tests {
.with_process(
&node,
DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("simple_cluster")
.profile("dev")
}),
Expand All @@ -72,7 +71,7 @@ mod tests {
DeployClusterSpec::new({
(0..2)
.map(|_| {
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("simple_cluster")
.profile("dev")
})
Expand Down
5 changes: 2 additions & 3 deletions hydroflow_plus_test/src/distributed/first_ten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ mod tests {
#[tokio::test]
async fn first_ten_distributed() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let builder = hydroflow_plus::FlowBuilder::new();
let (first_node, second_node) = super::first_ten_distributed(&builder);
Expand All @@ -60,15 +59,15 @@ mod tests {
.with_process(
&first_node,
DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("first_ten_distributed")
.profile("dev")
}),
)
.with_process(
&second_node,
DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("first_ten_distributed")
.profile("dev")
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use hydroflow_plus_cli_integration::DeployProcessSpec;
#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let flow = hydroflow_plus::FlowBuilder::new();
let (p1, p2) = flow::first_ten_distributed::first_ten_distributed(&flow);
Expand All @@ -14,15 +13,15 @@ async fn main() {
.with_process(
&p1,
DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("first_ten_distributed")
.profile("dev")
}),
)
.with_process(
&p2,
DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("first_ten_distributed")
.profile("dev")
}),
Expand Down
5 changes: 2 additions & 3 deletions template/hydroflow_plus/flow/src/first_ten_distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ mod tests {
#[tokio::test]
async fn first_ten_distributed() {
let mut deployment = Deployment::new();
let localhost = deployment.Localhost();

let flow = hydroflow_plus::FlowBuilder::new();
let (p1, p2) = super::first_ten_distributed(&flow);
Expand All @@ -51,15 +50,15 @@ mod tests {
.with_process(
&p1,
DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("first_ten_distributed")
.profile("dev")
}),
)
.with_process(
&p2,
DeployProcessSpec::new({
HydroflowCrate::new(".", localhost.clone())
HydroflowCrate::new(".", deployment.Localhost())
.bin("first_ten_distributed")
.profile("dev")
}),
Expand Down

0 comments on commit db297a5

Please sign in to comment.