@@ -100,10 +100,13 @@ impl RetryManager {
100100 if let Some ( response) = response_stream. next ( ) . await {
101101 if let Some ( err) = response. err ( ) {
102102 const STREAM_ERR_MSG : & str = "Stream ended before generation completed" ;
103- if format ! ( "{:?}" , err) == STREAM_ERR_MSG {
103+ if err
104+ . chain ( )
105+ . any ( |e| e. to_string ( ) . starts_with ( STREAM_ERR_MSG ) )
106+ {
104107 tracing:: warn!( "Stream disconnected... recreating stream..." ) ;
105108 if let Err ( err) = self . new_stream ( ) . await {
106- tracing:: warn!( "Cannot recreate stream: {:? }" , err) ;
109+ tracing:: warn!( "Cannot recreate stream: {:# }" , err) ;
107110 } else {
108111 continue ;
109112 }
@@ -462,6 +465,7 @@ mod tests {
462465 /// Expected behavior: All 10 responses should be received successfully.
463466 #[ tokio:: test]
464467 async fn test_retry_manager_no_migration ( ) {
468+ dynamo_runtime:: logging:: init ( ) ;
465469 let request = create_mock_request ( 10 ) ;
466470 let mock_engine = Arc :: new ( MockEngine :: new ( MockBehavior :: Success , 10 , 100 ) ) ;
467471 let next_generate: ServerStreamingEngine < PreprocessedRequest , Annotated < LLMEngineOutput > > =
@@ -493,6 +497,7 @@ mod tests {
493497 /// Expected behavior: All 10 responses should be received successfully after retry.
494498 #[ tokio:: test]
495499 async fn test_retry_manager_new_request_migration ( ) {
500+ dynamo_runtime:: logging:: init ( ) ;
496501 let request = create_mock_request ( 10 ) ;
497502 let mock_engine = Arc :: new ( MockEngine :: new ( MockBehavior :: FailThenSuccess , 10 , 100 ) ) ;
498503 let next_generate: ServerStreamingEngine < PreprocessedRequest , Annotated < LLMEngineOutput > > =
@@ -524,6 +529,8 @@ mod tests {
524529 /// Expected behavior: 5 responses from first stream + 5 responses from retry stream = 10 total.
525530 #[ tokio:: test]
526531 async fn test_retry_manager_ongoing_request_migration ( ) {
532+ dynamo_runtime:: logging:: init ( ) ;
533+
527534 let request = create_mock_request ( 10 ) ;
528535 let mock_engine = Arc :: new ( MockEngine :: new (
529536 MockBehavior :: MidStreamFail { fail_after : 5 } ,
@@ -560,6 +567,7 @@ mod tests {
560567 /// Expected behavior: Should receive an error after all retries are exhausted, with the original error.
561568 #[ tokio:: test]
562569 async fn test_retry_manager_new_request_migration_indefinite_failure ( ) {
570+ dynamo_runtime:: logging:: init ( ) ;
563571 let request = create_mock_request ( 0 ) ;
564572 let mock_engine = Arc :: new ( MockEngine :: new ( MockBehavior :: AlwaysFail , 0 , 100 ) ) ;
565573 let next_generate: ServerStreamingEngine < PreprocessedRequest , Annotated < LLMEngineOutput > > =
@@ -580,6 +588,7 @@ mod tests {
580588 /// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
581589 #[ tokio:: test]
582590 async fn test_retry_manager_ongoing_request_migration_indefinite_failure ( ) {
591+ dynamo_runtime:: logging:: init ( ) ;
583592 let request = create_mock_request ( 10 ) ;
584593 let mock_engine = Arc :: new ( MockEngine :: new (
585594 MockBehavior :: MidStreamFailAlways { fail_after : 3 } ,
@@ -627,6 +636,7 @@ mod tests {
627636 /// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
628637 #[ tokio:: test]
629638 async fn test_retry_manager_ongoing_request_migration_indefinite_failure_stream_error ( ) {
639+ dynamo_runtime:: logging:: init ( ) ;
630640 let request = create_mock_request ( 10 ) ;
631641 let mock_engine = Arc :: new ( MockEngine :: new (
632642 MockBehavior :: MidStreamFailAlwaysStreamError { fail_after : 3 } ,
0 commit comments