Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(hydro_deploy)!: make HydroflowSource, HydroflowSink traits use &self interior mutability to remove RwLock wrappings #1346

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions hydro_deploy/core/src/custom_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::any::Any;
use std::ops::Deref;
use std::sync::{Arc, Weak};
use std::sync::{Arc, OnceLock, Weak};

use anyhow::{bail, Result};
use async_trait::async_trait;
Expand All @@ -11,6 +11,7 @@ use super::hydroflow_crate::ports::{
HydroflowServer, HydroflowSink, HydroflowSource, ServerConfig, SourcePath,
};
use super::{Host, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, Service};
use crate::hydroflow_crate::ports::ReverseSinkInstantiator;

/// Represents an unknown, third-party service that is not part of the Hydroflow ecosystem.
pub struct CustomService {
Expand Down Expand Up @@ -81,28 +82,28 @@ impl Service for CustomService {

pub struct CustomClientPort {
pub on: Weak<RwLock<CustomService>>,
client_port: Option<ServerConfig>,
client_port: OnceLock<ServerConfig>,
}

impl CustomClientPort {
pub fn new(on: Weak<RwLock<CustomService>>) -> Self {
Self {
on,
client_port: None,
client_port: OnceLock::new(),
}
}

pub async fn server_port(&self) -> ServerPort {
self.client_port
.as_ref()
.get()
.unwrap()
.load_instantiated(&|p| p)
.await
}

pub async fn connect(&self) -> ConnectedDirect {
self.client_port
.as_ref()
.get()
.unwrap()
.load_instantiated(&|p| p)
.await
Expand All @@ -125,17 +126,20 @@ impl HydroflowSource for CustomClientPort {
panic!("Custom services cannot be used as the server")
}

fn record_server_config(&mut self, config: ServerConfig) {
self.client_port = Some(config);
fn record_server_config(&self, config: ServerConfig) {
self.client_port
.set(config)
.map_err(drop) // `ServerConfig` doesn't implement `Debug` for `.expect()`.
.expect("Cannot call `record_server_config()` multiple times.");
MingweiSamuel marked this conversation as resolved.
Show resolved Hide resolved
}

fn record_server_strategy(&mut self, _config: ServerStrategy) {
fn record_server_strategy(&self, _config: ServerStrategy) {
panic!("Custom services cannot be used as the server")
}
}

impl HydroflowSink for CustomClientPort {
fn as_any_mut(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}

Expand All @@ -148,7 +152,7 @@ impl HydroflowSink for CustomClientPort {
server_host: &Arc<RwLock<dyn Host>>,
server_sink: Arc<dyn HydroflowServer>,
wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
) -> Result<Box<dyn FnOnce(&mut dyn Any) -> ServerStrategy>> {
) -> Result<ReverseSinkInstantiator> {
let client = self.on.upgrade().unwrap();
let client_read = client.try_read().unwrap();

Expand All @@ -163,7 +167,9 @@ impl HydroflowSink for CustomClientPort {
let server_host_clone = server_host_clone.clone();
Ok(Box::new(move |me| {
let mut server_host = server_host_clone.try_write().unwrap();
me.downcast_mut::<CustomClientPort>().unwrap().client_port = Some(client_port);
me.downcast_ref::<CustomClientPort>()
.unwrap()
.record_server_config(client_port);
bind_type(server_host.as_any_mut())
}))
}
Expand Down
77 changes: 28 additions & 49 deletions hydro_deploy/core/src/hydroflow_crate/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ use crate::{ClientStrategy, Host, HostStrategyGetter, LaunchedHost, ServerStrate

pub trait HydroflowSource: Send + Sync {
fn source_path(&self) -> SourcePath;
fn record_server_config(&mut self, config: ServerConfig);
fn record_server_config(&self, config: ServerConfig);

fn host(&self) -> Arc<RwLock<dyn Host>>;
fn server(&self) -> Arc<dyn HydroflowServer>;
fn record_server_strategy(&mut self, config: ServerStrategy);
fn record_server_strategy(&self, config: ServerStrategy);

fn wrap_reverse_server_config(&self, config: ServerConfig) -> ServerConfig {
config
}

fn send_to(&mut self, sink: &mut dyn HydroflowSink) {
fn send_to(&self, sink: &dyn HydroflowSink) {
let forward_res = sink.instantiate(&self.source_path());
if let Ok(instantiated) = forward_res {
self.record_server_config(instantiated());
Expand All @@ -36,7 +36,7 @@ pub trait HydroflowSource: Send + Sync {
self.wrap_reverse_server_config(p)
})
.unwrap();
self.record_server_strategy(instantiated(sink.as_any_mut()));
self.record_server_strategy(instantiated(sink.as_any()));
}
}
}
Expand All @@ -47,10 +47,10 @@ pub trait HydroflowServer: DynClone + Send + Sync {
async fn launched_host(&self) -> Arc<dyn LaunchedHost>;
}

pub type ReverseSinkInstantiator = Box<dyn FnOnce(&mut dyn Any) -> ServerStrategy>;
pub type ReverseSinkInstantiator = Box<dyn FnOnce(&dyn Any) -> ServerStrategy>;

pub trait HydroflowSink: Send + Sync {
fn as_any_mut(&mut self) -> &mut dyn Any;
fn as_any(&self) -> &dyn Any;

/// Instantiate the sink as the source host connecting to the sink host.
/// Returns a thunk that can be called to perform mutations that instantiate the sink.
Expand All @@ -67,42 +67,33 @@ pub trait HydroflowSink: Send + Sync {
}

pub struct TaggedSource {
pub source: Arc<RwLock<dyn HydroflowSource>>,
pub source: Arc<dyn HydroflowSource>,
pub tag: u32,
}

impl HydroflowSource for TaggedSource {
fn source_path(&self) -> SourcePath {
SourcePath::Tagged(
Box::new(self.source.try_read().unwrap().source_path()),
self.tag,
)
SourcePath::Tagged(Box::new(self.source.source_path()), self.tag)
}

fn record_server_config(&mut self, config: ServerConfig) {
self.source
.try_write()
.unwrap()
.record_server_config(config);
fn record_server_config(&self, config: ServerConfig) {
self.source.record_server_config(config);
}

fn host(&self) -> Arc<RwLock<dyn Host>> {
self.source.try_read().unwrap().host()
self.source.host()
}

fn server(&self) -> Arc<dyn HydroflowServer> {
self.source.try_read().unwrap().server()
self.source.server()
}

fn wrap_reverse_server_config(&self, config: ServerConfig) -> ServerConfig {
ServerConfig::Tagged(Box::new(config), self.tag)
}

fn record_server_strategy(&mut self, config: ServerStrategy) {
self.source
.try_write()
.unwrap()
.record_server_strategy(config);
fn record_server_strategy(&self, config: ServerStrategy) {
self.source.record_server_strategy(config);
}
}

Expand All @@ -121,12 +112,12 @@ impl HydroflowSource for NullSourceSink {
panic!("null source has no server")
}

fn record_server_config(&mut self, _config: ServerConfig) {}
fn record_server_strategy(&mut self, _config: ServerStrategy) {}
fn record_server_config(&self, _config: ServerConfig) {}
fn record_server_strategy(&self, _config: ServerStrategy) {}
}

impl HydroflowSink for NullSourceSink {
fn as_any_mut(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}

Expand All @@ -145,18 +136,18 @@ impl HydroflowSink for NullSourceSink {
}

pub struct DemuxSink {
pub demux: HashMap<u32, Arc<RwLock<dyn HydroflowSink>>>,
pub demux: HashMap<u32, Arc<dyn HydroflowSink>>,
}

impl HydroflowSink for DemuxSink {
fn as_any_mut(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}

fn instantiate(&self, client_host: &SourcePath) -> Result<Box<dyn FnOnce() -> ServerConfig>> {
let mut thunk_map = HashMap::new();
for (key, target) in &self.demux {
thunk_map.insert(*key, target.try_read().unwrap().instantiate(client_host)?);
thunk_map.insert(*key, target.instantiate(client_host)?);
}

Ok(Box::new(move || {
Expand All @@ -174,12 +165,12 @@ impl HydroflowSink for DemuxSink {
server_host: &Arc<RwLock<dyn Host>>,
server_sink: Arc<dyn HydroflowServer>,
wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
) -> Result<Box<dyn FnOnce(&mut dyn Any) -> ServerStrategy>> {
) -> Result<ReverseSinkInstantiator> {
let mut thunk_map = HashMap::new();
for (key, target) in &self.demux {
thunk_map.insert(
*key,
target.try_write().unwrap().instantiate_reverse(
target.instantiate_reverse(
server_host,
server_sink.clone(),
// the parent wrapper selects the demux port for the parent defn, so do that first
Expand All @@ -189,22 +180,10 @@ impl HydroflowSink for DemuxSink {
}

Ok(Box::new(move |me| {
let me = me.downcast_mut::<DemuxSink>().unwrap();
let me = me.downcast_ref::<DemuxSink>().unwrap();
let instantiated_map = thunk_map
.into_iter()
.map(|(key, thunk)| {
(
key,
thunk(
me.demux
.get_mut(&key)
.unwrap()
.try_write()
.unwrap()
.as_any_mut(),
),
)
})
.map(|(key, thunk)| (key, thunk(me.demux.get(&key).unwrap().as_any())))
.collect();

ServerStrategy::Demux(instantiated_map)
Expand Down Expand Up @@ -263,7 +242,7 @@ impl HydroflowSource for HydroflowPortConfig {
})
}

fn record_server_config(&mut self, config: ServerConfig) {
fn record_server_config(&self, config: ServerConfig) {
let from = self.service.upgrade().unwrap();
let mut from_write = from.try_write().unwrap();

Expand All @@ -272,7 +251,7 @@ impl HydroflowSource for HydroflowPortConfig {
from_write.port_to_server.insert(self.port.clone(), config);
}

fn record_server_strategy(&mut self, config: ServerStrategy) {
fn record_server_strategy(&self, config: ServerStrategy) {
let from = self.service.upgrade().unwrap();
let mut from_write = from.try_write().unwrap();

Expand Down Expand Up @@ -334,7 +313,7 @@ impl SourcePath {
}

impl HydroflowSink for HydroflowPortConfig {
fn as_any_mut(&mut self) -> &mut dyn Any {
fn as_any(&self) -> &dyn Any {
self
}

Expand Down Expand Up @@ -380,7 +359,7 @@ impl HydroflowSink for HydroflowPortConfig {
server_host: &Arc<RwLock<dyn Host>>,
server_sink: Arc<dyn HydroflowServer>,
wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
) -> Result<Box<dyn FnOnce(&mut dyn Any) -> ServerStrategy>> {
) -> Result<ReverseSinkInstantiator> {
let client = self.service.upgrade().unwrap();
let client_read = client.try_read().unwrap();

Expand Down
4 changes: 2 additions & 2 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl HydroflowCrateService {
&mut self,
self_arc: &Arc<RwLock<HydroflowCrateService>>,
my_port: String,
sink: &mut dyn HydroflowSink,
sink: &dyn HydroflowSink,
) -> Result<()> {
let forward_res = sink.instantiate(&SourcePath::Direct(self.on.clone()));
if let Ok(instantiated) = forward_res {
Expand All @@ -132,7 +132,7 @@ impl HydroflowCrateService {

assert!(!self.port_to_bind.contains_key(&my_port));
self.port_to_bind
.insert(my_port, instantiated(sink.as_any_mut()));
.insert(my_port, instantiated(sink.as_any()));

Ok(())
}
Expand Down
Loading
Loading