@@ -114,8 +114,7 @@ impl RetryManager {
114114 const STREAM_ERR_MSG : & str = "Stream ended before generation completed" ;
115115 if format ! ( "{:?}" , err) == STREAM_ERR_MSG {
116116 tracing:: info!( "Stream disconnected... recreating stream..." ) ;
117- // TODO: Why generate() does not implement Sync?
118- if let Err ( err) = self . new_stream_spawn ( ) . await {
117+ if let Err ( err) = self . new_stream ( ) . await {
119118 tracing:: info!( "Cannot recreate stream: {:?}" , err) ;
120119 } else {
121120 continue ;
@@ -158,46 +157,6 @@ impl RetryManager {
158157 }
159158 }
160159
161- // Same as `new_stream`, but spawns a new task to create the stream.
162- // This can be used in place of `new_stream`, but keeping `new_stream` for the initial stream
163- // creation due to performance reasons.
164- async fn new_stream_spawn ( & mut self ) -> Result < ( ) > {
165- let mut response_stream: Option < Result < ManyOut < Annotated < LLMEngineOutput > > > > = None ;
166- while self . retries_left > 0 {
167- self . retries_left -= 1 ;
168- // TODO: Is there anything needed to pass between context?
169- let request = SingleIn :: new ( self . request . clone ( ) ) ;
170- let next = self . next_generate . clone ( ) ;
171- let handle = tokio:: spawn ( async move { next. generate ( request) . await } ) ;
172- response_stream = Some ( match handle. await {
173- Ok ( response_stream) => response_stream,
174- Err ( err) => {
175- tracing:: error!( "Failed to spawn generate stream: {:?}" , err) ;
176- return Err ( Error :: msg ( "Failed to spawn generate stream" ) ) ;
177- }
178- } ) ;
179- if let Some ( err) = response_stream. as_ref ( ) . unwrap ( ) . as_ref ( ) . err ( ) {
180- if let Some ( req_err) = err. downcast_ref :: < NatsRequestError > ( ) {
181- if matches ! ( req_err. kind( ) , NatsNoResponders ) {
182- tracing:: info!( "Creating new stream... retrying..." ) ;
183- continue ;
184- }
185- }
186- }
187- break ;
188- }
189- match response_stream {
190- Some ( Ok ( next_stream) ) => {
191- self . next_stream = Some ( next_stream) ;
192- Ok ( ( ) )
193- }
194- Some ( Err ( err) ) => Err ( err) , // should propagate streaming error if stream started
195- None => Err ( Error :: msg (
196- "Retries exhausted - should propagate streaming error" ,
197- ) ) ,
198- }
199- }
200-
201160 fn track_response ( & mut self , response : & Annotated < LLMEngineOutput > ) {
202161 let llm_engine_output = match response. data . as_ref ( ) {
203162 Some ( output) => output,
0 commit comments