diff --git a/hydro_deploy/core/src/custom_service.rs b/hydro_deploy/core/src/custom_service.rs index 2a9d707838f8..cf06285138f2 100644 --- a/hydro_deploy/core/src/custom_service.rs +++ b/hydro_deploy/core/src/custom_service.rs @@ -1,6 +1,6 @@ use std::any::Any; use std::ops::Deref; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, OnceLock, Weak}; use anyhow::{bail, Result}; use async_trait::async_trait; @@ -11,6 +11,7 @@ use super::hydroflow_crate::ports::{ HydroflowServer, HydroflowSink, HydroflowSource, ServerConfig, SourcePath, }; use super::{Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service}; +use crate::hydroflow_crate::ports::ReverseSinkInstantiator; /// Represents an unknown, third-party service that is not part of the Hydroflow ecosystem. pub struct CustomService { @@ -81,20 +82,20 @@ impl Service for CustomService { pub struct CustomClientPort { pub on: Weak>, - client_port: Option, + client_port: OnceLock, } impl CustomClientPort { pub fn new(on: Weak>) -> Self { Self { on, - client_port: None, + client_port: OnceLock::new(), } } pub async fn server_port(&self) -> ServerPort { self.client_port - .as_ref() + .get() .unwrap() .load_instantiated(&|p| p) .await @@ -102,7 +103,7 @@ impl CustomClientPort { pub async fn connect(&self) -> ConnectedDirect { self.client_port - .as_ref() + .get() .unwrap() .load_instantiated(&|p| p) .await @@ -125,17 +126,20 @@ impl HydroflowSource for CustomClientPort { panic!("Custom services cannot be used as the server") } - fn record_server_config(&mut self, config: ServerConfig) { - self.client_port = Some(config); + fn record_server_config(&self, config: ServerConfig) { + self.client_port + .set(config) + .map_err(drop) // `ServerConfig` doesn't implement `Debug` for `.expect()`. + .expect("Cannot call `record_server_config()` multiple times."); } - fn record_server_strategy(&mut self, _config: ServerStrategy) { + fn record_server_strategy(&self, _config: ServerStrategy) { panic!("Custom services cannot be used as the server") } } impl HydroflowSink for CustomClientPort { - fn as_any_mut(&mut self) -> &mut dyn Any { + fn as_any(&self) -> &dyn Any { self } @@ -148,7 +152,7 @@ impl HydroflowSink for CustomClientPort { server_host: &Arc>, server_sink: Arc, wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, - ) -> Result ServerStrategy>> { + ) -> Result { let client = self.on.upgrade().unwrap(); let client_read = client.try_read().unwrap(); @@ -163,7 +167,9 @@ impl HydroflowSink for CustomClientPort { let server_host_clone = server_host_clone.clone(); Ok(Box::new(move |me| { let mut server_host = server_host_clone.try_write().unwrap(); - me.downcast_mut::().unwrap().client_port = Some(client_port); + me.downcast_ref::() + .unwrap() + .record_server_config(client_port); bind_type(server_host.as_any_mut()) })) } diff --git a/hydro_deploy/core/src/hydroflow_crate/ports.rs b/hydro_deploy/core/src/hydroflow_crate/ports.rs index b72a8130ae06..dd7bac51c710 100644 --- a/hydro_deploy/core/src/hydroflow_crate/ports.rs +++ b/hydro_deploy/core/src/hydroflow_crate/ports.rs @@ -15,17 +15,17 @@ use crate::{ClientStrategy, Host, HostStrategyGetter, LaunchedHost, ServerStrate pub trait HydroflowSource: Send + Sync { fn source_path(&self) -> SourcePath; - fn record_server_config(&mut self, config: ServerConfig); + fn record_server_config(&self, config: ServerConfig); fn host(&self) -> Arc>; fn server(&self) -> Arc; - fn record_server_strategy(&mut self, config: ServerStrategy); + fn record_server_strategy(&self, config: ServerStrategy); fn wrap_reverse_server_config(&self, config: ServerConfig) -> ServerConfig { config } - fn send_to(&mut self, sink: &mut dyn HydroflowSink) { + fn send_to(&self, sink: &dyn HydroflowSink) { let forward_res = sink.instantiate(&self.source_path()); if let Ok(instantiated) = forward_res { self.record_server_config(instantiated()); @@ -36,7 +36,7 @@ pub trait HydroflowSource: Send + Sync { self.wrap_reverse_server_config(p) }) .unwrap(); - self.record_server_strategy(instantiated(sink.as_any_mut())); + self.record_server_strategy(instantiated(sink.as_any())); } } } @@ -47,10 +47,10 @@ pub trait HydroflowServer: DynClone + Send + Sync { async fn launched_host(&self) -> Arc; } -pub type ReverseSinkInstantiator = Box ServerStrategy>; +pub type ReverseSinkInstantiator = Box ServerStrategy>; pub trait HydroflowSink: Send + Sync { - fn as_any_mut(&mut self) -> &mut dyn Any; + fn as_any(&self) -> &dyn Any; /// Instantiate the sink as the source host connecting to the sink host. /// Returns a thunk that can be called to perform mutations that instantiate the sink. @@ -67,42 +67,33 @@ pub trait HydroflowSink: Send + Sync { } pub struct TaggedSource { - pub source: Arc>, + pub source: Arc, pub tag: u32, } impl HydroflowSource for TaggedSource { fn source_path(&self) -> SourcePath { - SourcePath::Tagged( - Box::new(self.source.try_read().unwrap().source_path()), - self.tag, - ) + SourcePath::Tagged(Box::new(self.source.source_path()), self.tag) } - fn record_server_config(&mut self, config: ServerConfig) { - self.source - .try_write() - .unwrap() - .record_server_config(config); + fn record_server_config(&self, config: ServerConfig) { + self.source.record_server_config(config); } fn host(&self) -> Arc> { - self.source.try_read().unwrap().host() + self.source.host() } fn server(&self) -> Arc { - self.source.try_read().unwrap().server() + self.source.server() } fn wrap_reverse_server_config(&self, config: ServerConfig) -> ServerConfig { ServerConfig::Tagged(Box::new(config), self.tag) } - fn record_server_strategy(&mut self, config: ServerStrategy) { - self.source - .try_write() - .unwrap() - .record_server_strategy(config); + fn record_server_strategy(&self, config: ServerStrategy) { + self.source.record_server_strategy(config); } } @@ -121,12 +112,12 @@ impl HydroflowSource for NullSourceSink { panic!("null source has no server") } - fn record_server_config(&mut self, _config: ServerConfig) {} - fn record_server_strategy(&mut self, _config: ServerStrategy) {} + fn record_server_config(&self, _config: ServerConfig) {} + fn record_server_strategy(&self, _config: ServerStrategy) {} } impl HydroflowSink for NullSourceSink { - fn as_any_mut(&mut self) -> &mut dyn Any { + fn as_any(&self) -> &dyn Any { self } @@ -145,18 +136,18 @@ impl HydroflowSink for NullSourceSink { } pub struct DemuxSink { - pub demux: HashMap>>, + pub demux: HashMap>, } impl HydroflowSink for DemuxSink { - fn as_any_mut(&mut self) -> &mut dyn Any { + fn as_any(&self) -> &dyn Any { self } fn instantiate(&self, client_host: &SourcePath) -> Result ServerConfig>> { let mut thunk_map = HashMap::new(); for (key, target) in &self.demux { - thunk_map.insert(*key, target.try_read().unwrap().instantiate(client_host)?); + thunk_map.insert(*key, target.instantiate(client_host)?); } Ok(Box::new(move || { @@ -174,12 +165,12 @@ impl HydroflowSink for DemuxSink { server_host: &Arc>, server_sink: Arc, wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, - ) -> Result ServerStrategy>> { + ) -> Result { let mut thunk_map = HashMap::new(); for (key, target) in &self.demux { thunk_map.insert( *key, - target.try_write().unwrap().instantiate_reverse( + target.instantiate_reverse( server_host, server_sink.clone(), // the parent wrapper selects the demux port for the parent defn, so do that first @@ -189,22 +180,10 @@ impl HydroflowSink for DemuxSink { } Ok(Box::new(move |me| { - let me = me.downcast_mut::().unwrap(); + let me = me.downcast_ref::().unwrap(); let instantiated_map = thunk_map .into_iter() - .map(|(key, thunk)| { - ( - key, - thunk( - me.demux - .get_mut(&key) - .unwrap() - .try_write() - .unwrap() - .as_any_mut(), - ), - ) - }) + .map(|(key, thunk)| (key, thunk(me.demux.get(&key).unwrap().as_any()))) .collect(); ServerStrategy::Demux(instantiated_map) @@ -263,7 +242,7 @@ impl HydroflowSource for HydroflowPortConfig { }) } - fn record_server_config(&mut self, config: ServerConfig) { + fn record_server_config(&self, config: ServerConfig) { let from = self.service.upgrade().unwrap(); let mut from_write = from.try_write().unwrap(); @@ -272,7 +251,7 @@ impl HydroflowSource for HydroflowPortConfig { from_write.port_to_server.insert(self.port.clone(), config); } - fn record_server_strategy(&mut self, config: ServerStrategy) { + fn record_server_strategy(&self, config: ServerStrategy) { let from = self.service.upgrade().unwrap(); let mut from_write = from.try_write().unwrap(); @@ -334,7 +313,7 @@ impl SourcePath { } impl HydroflowSink for HydroflowPortConfig { - fn as_any_mut(&mut self) -> &mut dyn Any { + fn as_any(&self) -> &dyn Any { self } @@ -380,7 +359,7 @@ impl HydroflowSink for HydroflowPortConfig { server_host: &Arc>, server_sink: Arc, wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, - ) -> Result ServerStrategy>> { + ) -> Result { let client = self.service.upgrade().unwrap(); let client_read = client.try_read().unwrap(); diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index eb3df054c500..3b829bf67f03 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -108,7 +108,7 @@ impl HydroflowCrateService { &mut self, self_arc: &Arc>, my_port: String, - sink: &mut dyn HydroflowSink, + sink: &dyn HydroflowSink, ) -> Result<()> { let forward_res = sink.instantiate(&SourcePath::Direct(self.on.clone())); if let Ok(instantiated) = forward_res { @@ -132,7 +132,7 @@ impl HydroflowCrateService { assert!(!self.port_to_bind.contains_key(&my_port)); self.port_to_bind - .insert(my_port, instantiated(sink.as_any_mut())); + .insert(my_port, instantiated(sink.as_any())); Ok(()) } diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index c2c3e8cffa4c..88ca19915254 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -4,7 +4,7 @@ use core::hydroflow_crate::ports::HydroflowSource; use std::cell::OnceCell; use std::collections::HashMap; -use std::ops::DerefMut; +use std::ops::Deref; use std::path::PathBuf; use std::pin::Pin; use std::sync::{Arc, OnceLock}; @@ -143,7 +143,7 @@ impl AnyhowWrapper { #[pyclass(subclass)] #[derive(Clone)] struct HydroflowSink { - underlying: Arc>, + underlying: Arc, } #[pyclass(name = "Deployment")] @@ -508,8 +508,8 @@ struct CustomService { #[pymethods] impl CustomService { fn client_port(&self, py: Python<'_>) -> PyResult> { - let arc = Arc::new(RwLock::new(core::custom_service::CustomClientPort::new( - Arc::downgrade(&self.underlying), + let arc = Arc::new(core::custom_service::CustomClientPort::new(Arc::downgrade( + &self.underlying, ))); Ok(Py::new( @@ -526,31 +526,27 @@ impl CustomService { #[pyclass(extends=HydroflowSink, subclass)] #[derive(Clone)] struct CustomClientPort { - underlying: Arc>, + underlying: Arc, } #[pymethods] impl CustomClientPort { - fn send_to(&mut self, to: &HydroflowSink) { - self.underlying - .try_write() - .unwrap() - .send_to(to.underlying.try_write().unwrap().deref_mut()); + fn send_to(&self, to: &HydroflowSink) { + self.underlying.send_to(to.underlying.deref()); } fn tagged(&self, tag: u32) -> TaggedSource { TaggedSource { - underlying: Arc::new(RwLock::new(core::hydroflow_crate::ports::TaggedSource { + underlying: Arc::new(core::hydroflow_crate::ports::TaggedSource { source: self.underlying.clone(), tag, - })), + }), } } fn server_port<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> { let underlying = self.underlying.clone(); interruptible_future_to_py(py, async move { - let underlying = underlying.read().await; Ok(ServerPort { underlying: underlying.server_port().await, }) @@ -610,12 +606,12 @@ struct HydroflowCratePorts { #[pymethods] impl HydroflowCratePorts { fn __getattribute__(&self, name: String, py: Python<'_>) -> PyResult> { - let arc = Arc::new(RwLock::new( + let arc = Arc::new( self.underlying .try_read() .unwrap() .get_port(name, &self.underlying), - )); + ); Ok(Py::new( py, @@ -631,15 +627,13 @@ impl HydroflowCratePorts { #[pyclass(extends=HydroflowSink, subclass)] #[derive(Clone)] struct HydroflowCratePort { - underlying: Arc>, + underlying: Arc, } #[pymethods] impl HydroflowCratePort { fn merge(&self, py: Python<'_>) -> PyResult> { - let arc = Arc::new(RwLock::new( - self.underlying.try_read().unwrap().clone().merge(), - )); + let arc = Arc::new(self.underlying.clone().merge()); Ok(Py::new( py, @@ -651,19 +645,16 @@ impl HydroflowCratePort { .into_py(py)) } - fn send_to(&mut self, to: &HydroflowSink) { - self.underlying - .try_write() - .unwrap() - .send_to(to.underlying.try_write().unwrap().deref_mut()); + fn send_to(&self, to: &HydroflowSink) { + self.underlying.send_to(to.underlying.deref()); } fn tagged(&self, tag: u32) -> TaggedSource { TaggedSource { - underlying: Arc::new(RwLock::new(core::hydroflow_crate::ports::TaggedSource { + underlying: Arc::new(core::hydroflow_crate::ports::TaggedSource { source: self.underlying.clone(), tag, - })), + }), } } } @@ -671,7 +662,7 @@ impl HydroflowCratePort { #[pyfunction] fn demux(mapping: &PyDict) -> HydroflowSink { HydroflowSink { - underlying: Arc::new(RwLock::new(core::hydroflow_crate::ports::DemuxSink { + underlying: Arc::new(core::hydroflow_crate::ports::DemuxSink { demux: mapping .into_iter() .map(|(k, v)| { @@ -680,31 +671,28 @@ fn demux(mapping: &PyDict) -> HydroflowSink { (k, v.underlying) }) .collect(), - })), + }), } } #[pyclass(subclass)] #[derive(Clone)] struct TaggedSource { - underlying: Arc>, + underlying: Arc, } #[pymethods] impl TaggedSource { - fn send_to(&mut self, to: &HydroflowSink) { - self.underlying - .try_write() - .unwrap() - .send_to(to.underlying.try_write().unwrap().deref_mut()); + fn send_to(&self, to: &HydroflowSink) { + self.underlying.send_to(to.underlying.deref()); } fn tagged(&self, tag: u32) -> TaggedSource { TaggedSource { - underlying: Arc::new(RwLock::new(core::hydroflow_crate::ports::TaggedSource { + underlying: Arc::new(core::hydroflow_crate::ports::TaggedSource { source: self.underlying.clone(), tag, - })), + }), } } } @@ -712,31 +700,28 @@ impl TaggedSource { #[pyclass(extends=HydroflowSink, subclass)] #[derive(Clone)] struct HydroflowNull { - underlying: Arc>, + underlying: Arc, } #[pymethods] impl HydroflowNull { - fn send_to(&mut self, to: &HydroflowSink) { - self.underlying - .try_write() - .unwrap() - .send_to(to.underlying.try_write().unwrap().deref_mut()); + fn send_to(&self, to: &HydroflowSink) { + self.underlying.send_to(to.underlying.deref()); } fn tagged(&self, tag: u32) -> TaggedSource { TaggedSource { - underlying: Arc::new(RwLock::new(core::hydroflow_crate::ports::TaggedSource { + underlying: Arc::new(core::hydroflow_crate::ports::TaggedSource { source: self.underlying.clone(), tag, - })), + }), } } } #[pyfunction] fn null(py: Python<'_>) -> PyResult> { - let arc = Arc::new(RwLock::new(core::hydroflow_crate::ports::NullSourceSink)); + let arc = Arc::new(core::hydroflow_crate::ports::NullSourceSink); Ok(Py::new( py, @@ -766,7 +751,7 @@ impl ServerPort { } #[allow(clippy::wrong_self_convention)] - fn into_source<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> { + fn into_source<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> { let realized = with_tokio_runtime(|| ServerOrBound::Server((&self.underlying).into())); interruptible_future_to_py(py, async move { @@ -779,7 +764,7 @@ impl ServerPort { } #[allow(clippy::wrong_self_convention)] - fn into_sink<'p>(&mut self, py: Python<'p>) -> PyResult<&'p PyAny> { + fn into_sink<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> { let realized = with_tokio_runtime(|| ServerOrBound::Server((&self.underlying).into())); interruptible_future_to_py(py, async move { @@ -800,7 +785,7 @@ struct PythonSink { #[pymethods] impl PythonSink { - fn send<'p>(&mut self, data: Py, py: Python<'p>) -> PyResult<&'p PyAny> { + fn send<'p>(&self, data: Py, py: Python<'p>) -> PyResult<&'p PyAny> { let underlying = self.underlying.clone(); let bytes = Bytes::from(data.as_bytes(py).to_vec()); interruptible_future_to_py(py, async move { diff --git a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs index 597eed65f666..4c3382904e39 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs +++ b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs @@ -42,14 +42,14 @@ pub trait DeployCrateWrapper { on: &Arc>, ) -> CustomClientPort { let sender_service = deployment.CustomService(on.clone(), vec![]); - let mut sender_port = sender_service.read().await.declare_client(&sender_service); - let mut recipient = self + let sender_port = sender_service.read().await.declare_client(&sender_service); + let recipient = self .underlying() .read() .await .get_port(port.to_string(), &self.underlying()); - sender_port.send_to(&mut recipient); + sender_port.send_to(&recipient); sender_port } @@ -202,19 +202,19 @@ impl HfSendOneToOne for DeployNode { source_port: &DeployPort, recipient_port: &DeployPort, ) { - let mut source_port = self + let source_port = self .underlying .try_read() .unwrap() .get_port(source_port.port.clone(), &self.underlying); - let mut recipient_port = other + let recipient_port = other .underlying .try_read() .unwrap() .get_port(recipient_port.port.clone(), &other.underlying); - source_port.send_to(&mut recipient_port); + source_port.send_to(&recipient_port); } fn gen_sink_statement(&self, _port: &Self::Port) -> syn::Expr { @@ -233,7 +233,7 @@ impl HfSendManyToOne for DeployCluster { source_port: &DeployPort, recipient_port: &DeployPort, ) { - let mut recipient_port = other + let recipient_port = other .underlying .try_read() .unwrap() @@ -248,10 +248,10 @@ impl HfSendManyToOne for DeployCluster { .get_port(source_port.port.clone(), &node.underlying); TaggedSource { - source: Arc::new(RwLock::new(source_port)), + source: Arc::new(source_port), tag: i as u32, } - .send_to(&mut recipient_port); + .send_to(&recipient_port); } } @@ -271,13 +271,13 @@ impl HfSendOneToMany for DeployNode { source_port: &DeployPort, recipient_port: &DeployPort, ) { - let mut source_port = self + let source_port = self .underlying .try_read() .unwrap() .get_port(source_port.port.clone(), &self.underlying); - let mut recipient_port = DemuxSink { + let recipient_port = DemuxSink { demux: other .members .iter() @@ -286,15 +286,14 @@ impl HfSendOneToMany for DeployNode { let n = c.underlying.try_read().unwrap(); ( id as u32, - Arc::new(RwLock::new( - n.get_port(recipient_port.port.clone(), &c.underlying), - )) as Arc>, + Arc::new(n.get_port(recipient_port.port.clone(), &c.underlying)) + as Arc, ) }) .collect(), }; - source_port.send_to(&mut recipient_port); + source_port.send_to(&recipient_port); } fn gen_sink_statement(&self, _port: &Self::Port) -> syn::Expr { @@ -323,7 +322,7 @@ impl HfSendManyToMany for DeployCluster { .unwrap() .get_port(source_port.port.clone(), &sender.underlying); - let mut recipient_port = DemuxSink { + let recipient_port = DemuxSink { demux: other .members .iter() @@ -332,21 +331,20 @@ impl HfSendManyToMany for DeployCluster { let n = c.underlying.try_read().unwrap(); ( id as u32, - Arc::new(RwLock::new( + Arc::new( n.get_port(recipient_port.port.clone(), &c.underlying) .merge(), - )) - as Arc>, + ) as Arc, ) }) .collect(), }; TaggedSource { - source: Arc::new(RwLock::new(source_port)), + source: Arc::new(source_port), tag: i as u32, } - .send_to(&mut recipient_port); + .send_to(&recipient_port); } }