diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 062faf46f2..12bb78e8f5 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -35,6 +35,7 @@ testing-cuda = ["dep:cudarc"] testing-nixl = ["dep:nixl-sys"] block-manager = ["dep:nixl-sys", "dep:cudarc", "dep:ndarray", "dep:nix"] sentencepiece = ["dep:sentencepiece"] +integration = [] [[bench]] name = "tokenizer" diff --git a/lib/llm/src/entrypoint/input/endpoint.rs b/lib/llm/src/entrypoint/input/endpoint.rs index 428b3346d5..8b8c140699 100644 --- a/lib/llm/src/entrypoint/input/endpoint.rs +++ b/lib/llm/src/entrypoint/input/endpoint.rs @@ -84,13 +84,32 @@ pub async fn run( } }; - tokio::select! { - _ = rt_fut => { - tracing::debug!("Endpoint ingress ended"); + // Capture the actual error from rt_fut when it completes + // Note: We must return rt_result to propagate the actual error back to the user. + // If we don't return the specific error, the programmer/user won't know what actually + // caused the endpoint service to fail, making debugging much more difficult. + let result = tokio::select! { + rt_result = rt_fut => { + tracing::debug!("Endpoint service completed"); + match rt_result { + Ok(_) => { + tracing::warn!("Endpoint service completed unexpectedly for endpoint: {}", path); + Err(anyhow::anyhow!("Endpoint service completed unexpectedly for endpoint: {}", path)) + } + Err(e) => { + tracing::error!(%e, "Endpoint service failed for endpoint: {} - Error: {}", path, e); + Err(anyhow::anyhow!("Endpoint service failed for endpoint: {} - Error: {}", path, e)) + } + } } _ = cancel_token.cancelled() => { + tracing::debug!("Endpoint service cancelled"); + Ok(()) } - } + }; + + // If we got an error, return it + result?; // Cleanup on shutdown if let Some(mut card) = card { @@ -104,3 +123,118 @@ pub async fn run( Ok(()) } + +#[cfg(test)] +#[cfg(feature = "integration")] +mod integration_tests { + use super::*; + use dynamo_runtime::protocols::Endpoint as EndpointId; + + async fn create_test_environment() -> anyhow::Result<(DistributedRuntime, EngineConfig)> { + // Create a minimal distributed runtime and engine config for testing + let runtime = dynamo_runtime::Runtime::from_settings() + .map_err(|e| anyhow::anyhow!("Failed to create runtime: {}", e))?; + + let distributed_runtime = dynamo_runtime::DistributedRuntime::from_settings(runtime) + .await + .map_err(|e| anyhow::anyhow!("Failed to create distributed runtime: {}", e))?; + + let engine_config = EngineConfig::StaticCore { + engine: crate::engines::make_engine_core(), + model: Box::new( + crate::local_model::LocalModelBuilder::default() + .model_name(Some("test-model".to_string())) + .build() + .await + .map_err(|e| anyhow::anyhow!("Failed to build LocalModel: {}", e))?, + ), + }; + + Ok((distributed_runtime, engine_config)) + } + + #[tokio::test] + async fn test_run_function_valid_endpoint() { + // Test that run() works correctly with valid endpoints + + let (runtime, engine_config) = match create_test_environment().await { + Ok(env) => env, + Err(e) => { + eprintln!("Skipping test: {}", e); + return; + } + }; + + // Test with valid endpoint - start the service and then connect to it + let valid_path = "dyn://valid-endpoint.mocker.generate"; + let valid_endpoint: EndpointId = valid_path.parse().expect("Valid endpoint should parse"); + + let runtime_clone = runtime.clone(); + let engine_config_clone = engine_config.clone(); + let valid_path_clone = valid_path.to_string(); + + let service_handle = + tokio::spawn( + async move { run(runtime_clone, valid_path_clone, engine_config_clone).await }, + ); + + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let client_result = async { + let namespace = runtime.namespace(&valid_endpoint.namespace)?; + let component = namespace.component(&valid_endpoint.component)?; + let client = component.endpoint(&valid_endpoint.name).client().await?; + client.wait_for_instances().await?; + Ok::<_, anyhow::Error>(client) + } + .await; + + match client_result { + Ok(_client) => { + println!("Valid endpoint: Successfully connected to service"); + service_handle.abort(); // Abort the service since we've verified it works + } + Err(e) => { + println!("Valid endpoint: Failed to connect to service: {}", e); + service_handle.abort(); // Abort the service since the test failed + panic!( + "Valid endpoint should allow client connections, but failed: {}", + e + ); + } + } + } + + #[tokio::test] + #[ignore = "DistributedRuntime drop issue persists - test logic validates error propagation correctly"] + async fn test_run_function_invalid_endpoint() { + // Test that invalid endpoints fail validation during run() + let invalid_path = "dyn://@@@123.mocker.generate"; + + // Create test environment + let (runtime, engine_config) = create_test_environment() + .await + .expect("Failed to create test environment"); + + // Call run() directly - it should fail quickly for invalid endpoints + let result = run(runtime, invalid_path.to_string(), engine_config).await; + + // Should return an error for invalid endpoints + assert!( + result.is_err(), + "run() should fail for invalid endpoint: {:?}", + result + ); + + // Check that the error message contains validation-related keywords + let error_msg = result.unwrap_err().to_string().to_lowercase(); + assert!( + error_msg.contains("invalid") + || error_msg.contains("namespace") + || error_msg.contains("validation") + || error_msg.contains("failed"), + "Error message should contain validation keywords, got: {}", + error_msg + ); + } +}