@@ -84,13 +84,32 @@ pub async fn run(
8484 }
8585 } ;
8686
87- tokio:: select! {
88- _ = rt_fut => {
89- tracing:: debug!( "Endpoint ingress ended" ) ;
87+ // Capture the actual error from rt_fut when it completes
88+ // Note: We must return rt_result to propagate the actual error back to the user.
89+ // If we don't return the specific error, the programmer/user won't know what actually
90+ // caused the endpoint service to fail, making debugging much more difficult.
91+ let result = tokio:: select! {
92+ rt_result = rt_fut => {
93+ tracing:: debug!( "Endpoint service completed" ) ;
94+ match rt_result {
95+ Ok ( _) => {
96+ tracing:: warn!( "Endpoint service completed unexpectedly for endpoint: {}" , path) ;
97+ Err ( anyhow:: anyhow!( "Endpoint service completed unexpectedly for endpoint: {}" , path) )
98+ }
99+ Err ( e) => {
100+ tracing:: error!( %e, "Endpoint service failed for endpoint: {} - Error: {}" , path, e) ;
101+ Err ( anyhow:: anyhow!( "Endpoint service failed for endpoint: {} - Error: {}" , path, e) )
102+ }
103+ }
90104 }
91105 _ = cancel_token. cancelled( ) => {
106+ tracing:: debug!( "Endpoint service cancelled" ) ;
107+ Ok ( ( ) )
92108 }
93- }
109+ } ;
110+
111+ // If we got an error, return it
112+ result?;
94113
95114 // Cleanup on shutdown
96115 if let Some ( mut card) = card {
@@ -104,3 +123,118 @@ pub async fn run(
104123
105124 Ok ( ( ) )
106125}
126+
127+ #[ cfg( test) ]
128+ #[ cfg( feature = "integration" ) ]
129+ mod integration_tests {
130+ use super :: * ;
131+ use dynamo_runtime:: protocols:: Endpoint as EndpointId ;
132+
133+ async fn create_test_environment ( ) -> anyhow:: Result < ( DistributedRuntime , EngineConfig ) > {
134+ // Create a minimal distributed runtime and engine config for testing
135+ let runtime = dynamo_runtime:: Runtime :: from_settings ( )
136+ . map_err ( |e| anyhow:: anyhow!( "Failed to create runtime: {}" , e) ) ?;
137+
138+ let distributed_runtime = dynamo_runtime:: DistributedRuntime :: from_settings ( runtime)
139+ . await
140+ . map_err ( |e| anyhow:: anyhow!( "Failed to create distributed runtime: {}" , e) ) ?;
141+
142+ let engine_config = EngineConfig :: StaticCore {
143+ engine : crate :: engines:: make_engine_core ( ) ,
144+ model : Box :: new (
145+ crate :: local_model:: LocalModelBuilder :: default ( )
146+ . model_name ( Some ( "test-model" . to_string ( ) ) )
147+ . build ( )
148+ . await
149+ . map_err ( |e| anyhow:: anyhow!( "Failed to build LocalModel: {}" , e) ) ?,
150+ ) ,
151+ } ;
152+
153+ Ok ( ( distributed_runtime, engine_config) )
154+ }
155+
156+ #[ tokio:: test]
157+ async fn test_run_function_valid_endpoint ( ) {
158+ // Test that run() works correctly with valid endpoints
159+
160+ let ( runtime, engine_config) = match create_test_environment ( ) . await {
161+ Ok ( env) => env,
162+ Err ( e) => {
163+ eprintln ! ( "Skipping test: {}" , e) ;
164+ return ;
165+ }
166+ } ;
167+
168+ // Test with valid endpoint - start the service and then connect to it
169+ let valid_path = "dyn://valid-endpoint.mocker.generate" ;
170+ let valid_endpoint: EndpointId = valid_path. parse ( ) . expect ( "Valid endpoint should parse" ) ;
171+
172+ let runtime_clone = runtime. clone ( ) ;
173+ let engine_config_clone = engine_config. clone ( ) ;
174+ let valid_path_clone = valid_path. to_string ( ) ;
175+
176+ let service_handle =
177+ tokio:: spawn (
178+ async move { run ( runtime_clone, valid_path_clone, engine_config_clone) . await } ,
179+ ) ;
180+
181+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 500 ) ) . await ;
182+
183+ let client_result = async {
184+ let namespace = runtime. namespace ( & valid_endpoint. namespace ) ?;
185+ let component = namespace. component ( & valid_endpoint. component ) ?;
186+ let client = component. endpoint ( & valid_endpoint. name ) . client ( ) . await ?;
187+ client. wait_for_instances ( ) . await ?;
188+ Ok :: < _ , anyhow:: Error > ( client)
189+ }
190+ . await ;
191+
192+ match client_result {
193+ Ok ( _client) => {
194+ println ! ( "Valid endpoint: Successfully connected to service" ) ;
195+ service_handle. abort ( ) ; // Abort the service since we've verified it works
196+ }
197+ Err ( e) => {
198+ println ! ( "Valid endpoint: Failed to connect to service: {}" , e) ;
199+ service_handle. abort ( ) ; // Abort the service since the test failed
200+ panic ! (
201+ "Valid endpoint should allow client connections, but failed: {}" ,
202+ e
203+ ) ;
204+ }
205+ }
206+ }
207+
208+ #[ tokio:: test]
209+ #[ ignore = "DistributedRuntime drop issue persists - test logic validates error propagation correctly" ]
210+ async fn test_run_function_invalid_endpoint ( ) {
211+ // Test that invalid endpoints fail validation during run()
212+ let invalid_path = "dyn://@@@123.mocker.generate" ;
213+
214+ // Create test environment
215+ let ( runtime, engine_config) = create_test_environment ( )
216+ . await
217+ . expect ( "Failed to create test environment" ) ;
218+
219+ // Call run() directly - it should fail quickly for invalid endpoints
220+ let result = run ( runtime, invalid_path. to_string ( ) , engine_config) . await ;
221+
222+ // Should return an error for invalid endpoints
223+ assert ! (
224+ result. is_err( ) ,
225+ "run() should fail for invalid endpoint: {:?}" ,
226+ result
227+ ) ;
228+
229+ // Check that the error message contains validation-related keywords
230+ let error_msg = result. unwrap_err ( ) . to_string ( ) . to_lowercase ( ) ;
231+ assert ! (
232+ error_msg. contains( "invalid" )
233+ || error_msg. contains( "namespace" )
234+ || error_msg. contains( "validation" )
235+ || error_msg. contains( "failed" ) ,
236+ "Error message should contain validation keywords, got: {}" ,
237+ error_msg
238+ ) ;
239+ }
240+ }
0 commit comments