Skip to content

Commit f3d24a9

Browse files
committed
feat: implement StreamData trait for local client engine registry
This change addresses trait bound conflicts in the local client system by introducing a new StreamData trait that only requires Send (not Sync) for response types. Key changes: - Add StreamData trait for response types that don't need Sync - Create StreamAnyEngineWrapper for type erasure with StreamData responses - Update LocalClient to use StreamData constraint for response types - Add downcast_stream() method for StreamData-compatible downcasting - Fix clippy warnings about complex types and unused variables - Rewrite local_client_demo.rs to use new endpoint builder pattern This allows ManyOut<T> types (Pin<Box<dyn AsyncEngineStream<T>>>) to work properly with the local engine registry, since streams are consumed sequentially and don't need to be shared between threads. Signed-off-by: Ryan Olson <rolson@nvidia.com>
1 parent 565615d commit f3d24a9

File tree

7 files changed

+204
-61
lines changed

7 files changed

+204
-61
lines changed

lib/runtime/examples/local_client_demo.rs

Lines changed: 74 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,38 @@
33

44
//! Example demonstrating LocalClient functionality
55
6-
use dynamo_runtime::DistributedRuntime;
7-
use dynamo_runtime::component::register_local_engine;
8-
use dynamo_runtime::engine::{AsyncEngine, async_trait};
6+
use dynamo_runtime::component::LocalClient;
7+
use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, async_trait};
8+
use dynamo_runtime::pipeline::network::Ingress;
9+
use dynamo_runtime::pipeline::{Context, ManyOut, ResponseStream, SingleIn};
10+
use dynamo_runtime::protocols::annotated::Annotated;
11+
use dynamo_runtime::{DistributedRuntime, Runtime, distributed::DistributedConfig};
12+
use futures::StreamExt;
913
use std::sync::Arc;
1014

1115
/// Simple test engine that echoes strings
1216
struct SimpleEchoEngine;
1317

1418
#[async_trait]
15-
impl AsyncEngine<String, String, String> for SimpleEchoEngine {
16-
async fn generate(&self, request: String) -> Result<String, String> {
17-
println!("Engine received: {}", request);
18-
Ok(format!("Echo: {}", request))
19+
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for SimpleEchoEngine {
20+
async fn generate(
21+
&self,
22+
request: SingleIn<String>,
23+
) -> Result<ManyOut<Annotated<String>>, anyhow::Error> {
24+
println!("Engine received: {}", *request);
25+
26+
let response = Annotated {
27+
data: Some(format!("Echo: {}", *request)),
28+
id: None,
29+
event: None,
30+
comment: None,
31+
};
32+
33+
let context = request.context();
34+
35+
// Create a simple stream that yields the response once
36+
let stream = futures::stream::once(async move { response });
37+
Ok(ResponseStream::new(Box::pin(stream), context))
1938
}
2039
}
2140

@@ -28,16 +47,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2847

2948
// Create runtime and DRT
3049
println!("1. Creating runtime...");
31-
let runtime = dynamo_runtime::Runtime::builder()
32-
.app_name("local-client-demo")
33-
.build()
34-
.await?;
50+
let runtime = Runtime::from_current()?;
3551

36-
let config = dynamo_runtime::DistributedConfig::builder()
37-
.etcd_config(Default::default())
38-
.nats_config(Default::default())
39-
.is_static(true)
40-
.build()?;
52+
let config = DistributedConfig {
53+
etcd_config: Default::default(),
54+
nats_config: Default::default(),
55+
is_static: true,
56+
};
4157

4258
let drt = DistributedRuntime::new(runtime, config).await?;
4359
println!(" ✓ Runtime created\n");
@@ -52,40 +68,51 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5268

5369
// Create and register an engine
5470
println!("3. Creating and registering engine...");
55-
let engine: Arc<dyn AsyncEngine<String, String, String>> = Arc::new(SimpleEchoEngine);
71+
let engine: Arc<dyn AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error>> =
72+
Arc::new(SimpleEchoEngine);
73+
74+
// Wrap the engine in an Ingress to make it a PushWorkHandler
75+
let ingress = Ingress::for_engine(engine)?;
5676

57-
// Register the engine for local access
58-
let key = register_local_engine(&endpoint, engine.clone()).await?;
59-
println!(" ✓ Registered engine with key: {}\n", key);
77+
// Create the endpoint instance with the ingress as handler (setup phase)
78+
let _endpoint_instance = endpoint
79+
.endpoint_builder()
80+
.handler(ingress)
81+
.create()
82+
.await?;
83+
println!(" ✓ Engine registered automatically during endpoint creation\n");
6084

61-
// Demonstrate direct local invocation
62-
println!("4. Testing direct local invocation...");
85+
// Create a LocalClient using the endpoint's convenience method
86+
println!("4. Creating LocalClient...");
87+
let local_client: LocalClient<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> =
88+
endpoint.local_client().await?;
89+
println!(" ✓ LocalClient created successfully\n");
90+
91+
// Demonstrate local client usage
92+
println!("5. Testing LocalClient invocation...");
6393
println!(" (This bypasses all network layers and invokes the engine directly)");
6494

65-
// Direct invocation through the registered engine
66-
let response = engine
67-
.generate("Hello from direct call!".to_string())
68-
.await?;
69-
println!(" Response: {}\n", response);
70-
71-
// Show what LocalClient would do
72-
println!("5. LocalClient usage (conceptual):");
73-
println!(" - LocalClient::from_endpoint(&endpoint) would retrieve the engine");
74-
println!(" - local_client.generate(request) would call the engine directly");
75-
println!(" - No network overhead, no etcd watching, no instance discovery\n");
76-
77-
// Show the registered engines
78-
println!("6. Registry information:");
79-
println!(" - Key format: namespace/component/endpoint");
80-
println!(" - Registered key: {}", key);
81-
println!(" - Engine is type-erased as AnyAsyncEngine");
82-
println!(" - LocalClient downcasts back to specific types\n");
83-
84-
// Cleanup
85-
println!("7. Cleanup...");
86-
drt.unregister_local_engine(&key).await;
87-
println!(" ✓ Unregistered engine\n");
88-
89-
println!("=== Demo Complete ===");
95+
// Create a request with context
96+
let request = Context::new("Hello from LocalClient!".to_string());
97+
98+
// Generate response using the local client
99+
let mut response_stream = local_client.generate(request).await?;
100+
let response = response_stream.next().await.expect("Expected response");
101+
102+
println!(" Request: Hello from LocalClient!");
103+
if let Some(data) = &response.data {
104+
println!(" Response: {}", data);
105+
}
106+
println!();
107+
108+
// Show the benefits
109+
println!("6. LocalClient Benefits:");
110+
println!(" ✓ No network overhead");
111+
println!(" ✓ No etcd watching required");
112+
println!(" ✓ No instance discovery needed");
113+
println!(" ✓ Direct in-process engine invocation");
114+
println!(" ✓ Perfect for testing and local development\n");
115+
116+
println!("Demo completed successfully!");
90117
Ok(())
91118
}

lib/runtime/src/component.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ impl Endpoint {
443443
) -> Result<local_client::LocalClient<Req, Resp, E>>
444444
where
445445
Req: crate::engine::Data,
446-
Resp: crate::engine::Data + crate::engine::AsyncEngineContextProvider,
446+
Resp: crate::engine::StreamData + crate::engine::AsyncEngineContextProvider,
447447
E: crate::engine::Data,
448448
{
449449
local_client::LocalClient::from_endpoint(self).await

lib/runtime/src/component/local_client.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
66
use crate::engine::{
77
AnyAsyncEngine, AsyncEngine, AsyncEngineContextProvider, Data, DowncastAnyAsyncEngine,
8+
StreamData,
89
};
910
use crate::traits::DistributedRuntimeProvider;
1011
use crate::v2::entity::{ComponentDescriptor, EndpointDescriptor, NamespaceDescriptor};
@@ -18,7 +19,7 @@ use super::Endpoint;
1819
pub struct LocalClient<Req, Resp, E>
1920
where
2021
Req: Data,
21-
Resp: Data + AsyncEngineContextProvider,
22+
Resp: StreamData + AsyncEngineContextProvider,
2223
E: Data,
2324
{
2425
engine: Arc<dyn AsyncEngine<Req, Resp, E>>,
@@ -28,7 +29,7 @@ where
2829
impl<Req, Resp, E> LocalClient<Req, Resp, E>
2930
where
3031
Req: Data,
31-
Resp: Data + AsyncEngineContextProvider,
32+
Resp: StreamData + AsyncEngineContextProvider,
3233
E: Data,
3334
{
3435
/// Create a LocalClient from an endpoint descriptor
@@ -47,7 +48,7 @@ where
4748

4849
// Downcast to the specific types
4950
let engine = any_engine
50-
.downcast::<Req, Resp, E>()
51+
.downcast_stream::<Req, Resp, E>()
5152
.ok_or_else(|| error!("Type mismatch when downcasting local engine for: {}", key))?;
5253

5354
Ok(Self { engine, descriptor })

lib/runtime/src/engine.rs

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ use futures::stream::Stream;
9090
pub trait Data: Send + Sync + 'static {}
9191
impl<T: Send + Sync + 'static> Data for T {}
9292

93+
/// A trait for stream data that only requires Send, not Sync.
94+
/// This is used for response types that are consumed sequentially, not shared between threads.
95+
/// **Do not manually implement this trait** - the blanket implementation covers all valid types.
96+
pub trait StreamData: Send + 'static {}
97+
impl<T: Send + 'static> StreamData for T {}
98+
9399
/// [`DataStream`] is a type alias for a stream of [`Data`] items. This can be adapted to a [`ResponseStream`]
94100
/// by associating it with a [`AsyncEngineContext`].
95101
pub type DataUnary<T> = Pin<Box<dyn Future<Output = T> + Send>>;
@@ -350,6 +356,41 @@ where
350356
}
351357
}
352358

359+
/// A variant of AnyEngineWrapper that uses StreamData for the response type.
360+
/// This allows non-Sync response types (like streams) to be stored in the registry.
361+
struct StreamAnyEngineWrapper<Req, Resp, E>
362+
where
363+
Req: Data,
364+
Resp: StreamData + AsyncEngineContextProvider,
365+
E: Data,
366+
{
367+
engine: Arc<dyn AsyncEngine<Req, Resp, E>>,
368+
_phantom: PhantomData<fn(Req, Resp, E)>,
369+
}
370+
371+
impl<Req, Resp, E> AnyAsyncEngine for StreamAnyEngineWrapper<Req, Resp, E>
372+
where
373+
Req: Data,
374+
Resp: StreamData + AsyncEngineContextProvider,
375+
E: Data,
376+
{
377+
fn request_type_id(&self) -> TypeId {
378+
TypeId::of::<Req>()
379+
}
380+
381+
fn response_type_id(&self) -> TypeId {
382+
TypeId::of::<Resp>()
383+
}
384+
385+
fn error_type_id(&self) -> TypeId {
386+
TypeId::of::<E>()
387+
}
388+
389+
fn as_any(&self) -> &dyn Any {
390+
&self.engine
391+
}
392+
}
393+
353394
/// An extension trait that provides a convenient way to type-erase an `AsyncEngine`.
354395
///
355396
/// This trait provides the `.into_any_engine()` method on any `Arc<dyn AsyncEngine<...>>`,
@@ -381,6 +422,38 @@ where
381422
}
382423
}
383424

425+
/// An extension trait for converting AsyncEngine with StreamData response to AnyAsyncEngine.
426+
/// This is used for engines whose response types don't need to be Sync.
427+
pub trait AsStreamAnyAsyncEngine {
428+
/// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`.
429+
fn into_stream_any_engine(self) -> Arc<dyn AnyAsyncEngine>;
430+
431+
/// Converts a typed `AsyncEngine` with StreamData response into a type-erased `AnyAsyncEngine`.
432+
/// This is the non-consuming version of into_stream_any_engine.
433+
fn as_stream_any(&self) -> Option<Arc<dyn AnyAsyncEngine>>;
434+
}
435+
436+
impl<Req, Resp, E> AsStreamAnyAsyncEngine for Arc<dyn AsyncEngine<Req, Resp, E>>
437+
where
438+
Req: Data,
439+
Resp: StreamData + AsyncEngineContextProvider,
440+
E: Data,
441+
{
442+
fn into_stream_any_engine(self) -> Arc<dyn AnyAsyncEngine> {
443+
Arc::new(StreamAnyEngineWrapper {
444+
engine: self,
445+
_phantom: PhantomData,
446+
})
447+
}
448+
449+
fn as_stream_any(&self) -> Option<Arc<dyn AnyAsyncEngine>> {
450+
Some(Arc::new(StreamAnyEngineWrapper {
451+
engine: self.clone(),
452+
_phantom: PhantomData,
453+
}))
454+
}
455+
}
456+
384457
/// An extension trait that provides a convenient method to downcast an `AnyAsyncEngine`.
385458
///
386459
/// This trait provides the `.downcast<Req, Resp, E>()` method on `Arc<dyn AnyAsyncEngine>`,
@@ -410,6 +483,16 @@ pub trait DowncastAnyAsyncEngine {
410483
Req: Data,
411484
Resp: Data + AsyncEngineContextProvider,
412485
E: Data;
486+
487+
/// Attempts to downcast an `AnyAsyncEngine` to a specific `AsyncEngine` type with StreamData response.
488+
///
489+
/// Returns `Some(engine)` if the type parameters match the original engine,
490+
/// or `None` if the types don't match.
491+
fn downcast_stream<Req, Resp, E>(&self) -> Option<Arc<dyn AsyncEngine<Req, Resp, E>>>
492+
where
493+
Req: Data,
494+
Resp: StreamData + AsyncEngineContextProvider,
495+
E: Data;
413496
}
414497

415498
impl DowncastAnyAsyncEngine for Arc<dyn AnyAsyncEngine> {
@@ -430,6 +513,24 @@ impl DowncastAnyAsyncEngine for Arc<dyn AnyAsyncEngine> {
430513
None
431514
}
432515
}
516+
517+
fn downcast_stream<Req, Resp, E>(&self) -> Option<Arc<dyn AsyncEngine<Req, Resp, E>>>
518+
where
519+
Req: Data,
520+
Resp: StreamData + AsyncEngineContextProvider,
521+
E: Data,
522+
{
523+
if self.request_type_id() == TypeId::of::<Req>()
524+
&& self.response_type_id() == TypeId::of::<Resp>()
525+
&& self.error_type_id() == TypeId::of::<E>()
526+
{
527+
self.as_any()
528+
.downcast_ref::<Arc<dyn AsyncEngine<Req, Resp, E>>>()
529+
.cloned()
530+
} else {
531+
None
532+
}
533+
}
433534
}
434535

435536
#[cfg(test)]
@@ -482,7 +583,7 @@ mod tests {
482583
let typed_engine: Arc<dyn AsyncEngine<Req1, Resp1, Err1>> = Arc::new(MockEngine);
483584

484585
// 4. Use the extension trait to erase the type
485-
let any_engine = typed_engine.into_any_engine();
586+
let any_engine = AsAnyAsyncEngine::into_any_engine(typed_engine);
486587

487588
// Check type IDs are preserved
488589
assert_eq!(any_engine.request_type_id(), TypeId::of::<Req1>());

lib/runtime/src/pipeline/network/ingress/push_handler.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,23 @@ where
139139
}
140140

141141
fn as_any_engine(&self) -> Option<Arc<dyn crate::engine::AnyAsyncEngine>> {
142+
use crate::engine::AsStreamAnyAsyncEngine;
142143
use crate::pipeline::network::Ingress;
143-
// Return the pre-stored type-erased engine
144-
Ingress::any_engine(self)
144+
145+
// First try to get the pre-stored type-erased engine
146+
if let Some(engine) = Ingress::any_engine(self) {
147+
return Some(engine);
148+
}
149+
150+
// If no pre-stored engine, try to create one from the underlying engine
151+
if let Some(service_engine) = Ingress::engine(self) {
152+
// Try to convert using StreamData-compatible wrapper
153+
if let Some(stream_engine) = service_engine.as_stream_any() {
154+
return Some(stream_engine);
155+
}
156+
}
157+
158+
None
145159
}
146160

147161
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError> {

lib/runtime/tests/local_client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn test_local_client_registration_and_retrieval() -> Result<(), Box<dyn st
7878
let response = response_stream.next().await.expect("Expected response");
7979

8080
assert_eq!(response.data, Some("Hello, LocalClient!".to_string()));
81-
println!("LocalClient test passed: received '{}'", response);
81+
println!("LocalClient test passed: received '{:?}'", response);
8282

8383
// Note: We don't need to start the endpoint for local client testing
8484
// The engine is registered during create() and available for local access
@@ -128,7 +128,7 @@ async fn test_local_client_with_ingress() -> Result<(), Box<dyn std::error::Erro
128128

129129
assert_eq!(response.data, Some("Test with Ingress".to_string()));
130130
println!(
131-
"LocalClient with Ingress test passed: received '{}'",
131+
"LocalClient with Ingress test passed: received '{:?}'",
132132
response
133133
);
134134

@@ -166,8 +166,8 @@ async fn test_local_client_type_mismatch() -> Result<(), Box<dyn std::error::Err
166166
println!("Created endpoint with String engine");
167167

168168
// Try to create a LocalClient with different types (this should fail)
169-
let result: Result<LocalClient<SingleIn<i32>, ManyOut<Annotated<i32>>, anyhow::Error>, _> =
170-
endpoint.local_client().await;
169+
type TestLocalClient = LocalClient<SingleIn<i32>, ManyOut<Annotated<i32>>, anyhow::Error>;
170+
let result: Result<TestLocalClient, _> = endpoint.local_client().await;
171171

172172
assert!(result.is_err(), "Expected type mismatch error");
173173
if let Err(e) = result {

0 commit comments

Comments
 (0)