Skip to content

Commit d81daec

Browse files
committed
perf: Use different new_stream method between initial and retry
1 parent c1dd5dc commit d81daec

File tree

1 file changed

+33
-3
lines changed

1 file changed

+33
-3
lines changed

lib/llm/src/migration.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ impl RetryManager {
139139
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
140140
if format!("{:?}", err) == STREAM_ERR_MSG {
141141
tracing::info!("Stream disconnected... recreating stream...");
142-
if let Err(err) = self.new_stream().await {
142+
// TODO: Why generate() does not implement Sync?
143+
if let Err(err) = self.new_stream_spawn().await {
143144
tracing::info!("Cannot recreate stream: {:?}", err);
144145
} else {
145146
continue;
@@ -159,8 +160,38 @@ impl RetryManager {
159160
self.retries_left -= 1;
160161
// TODO: Is there anything needed to pass between context?
161162
let request = SingleIn::new(self.request.clone());
163+
response_stream = Some(self.next_generate.generate(request).await);
164+
if let Some(err) = response_stream.as_ref().unwrap().as_ref().err() {
165+
if let Some(req_err) = err.downcast_ref::<NatsRequestError>() {
166+
if matches!(req_err.kind(), NatsNoResponders) {
167+
tracing::info!("Creating new stream... retrying...");
168+
continue;
169+
}
170+
}
171+
}
172+
break;
173+
}
174+
match response_stream {
175+
Some(Ok(next_stream)) => {
176+
self.next_stream = Some(next_stream);
177+
Ok(())
178+
}
179+
Some(Err(err)) => Err(err), // should propagate streaming error if stream started
180+
None => Err(Error::msg(
181+
"Retries exhausted - should propagate streaming error",
182+
)),
183+
}
184+
}
162185

163-
// TODO: Why generate() does not implement Sync?
186+
// Same as `new_stream`, but spawns a new task to create the stream.
187+
// This can be used in place of `new_stream`, but keeping `new_stream` for the initial stream
188+
// creation due to performance reasons.
189+
async fn new_stream_spawn(&mut self) -> Result<()> {
190+
let mut response_stream: Option<Result<ManyOut<Annotated<LLMEngineOutput>>>> = None;
191+
while self.retries_left > 0 {
192+
self.retries_left -= 1;
193+
// TODO: Is there anything needed to pass between context?
194+
let request = SingleIn::new(self.request.clone());
164195
let next = self.next_generate.clone();
165196
let handle = tokio::spawn(async move { next.generate(request).await });
166197
response_stream = Some(match handle.await {
@@ -170,7 +201,6 @@ impl RetryManager {
170201
return Err(Error::msg("Failed to spawn generate stream"));
171202
}
172203
});
173-
174204
if let Some(err) = response_stream.as_ref().unwrap().as_ref().err() {
175205
if let Some(req_err) = err.downcast_ref::<NatsRequestError>() {
176206
if matches!(req_err.kind(), NatsNoResponders) {

0 commit comments

Comments
 (0)