Skip to content

Commit 0fcf381

Browse files
committed
perf: Use different new_stream method between initial and retry
1 parent 14d6c5b commit 0fcf381

File tree

1 file changed

+33
-3
lines changed

1 file changed

+33
-3
lines changed

lib/llm/src/migration.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ impl RetryManager {
140140
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
141141
if format!("{:?}", err) == STREAM_ERR_MSG {
142142
tracing::info!("Stream disconnected... recreating stream...");
143-
if let Err(err) = self.new_stream().await {
143+
// TODO: Why generate() does not implement Sync?
144+
if let Err(err) = self.new_stream_spawn().await {
144145
tracing::info!("Cannot recreate stream: {:?}", err);
145146
} else {
146147
continue;
@@ -160,8 +161,38 @@ impl RetryManager {
160161
self.retries_left -= 1;
161162
// TODO: Is there anything needed to pass between context?
162163
let request = SingleIn::new(self.request.clone());
164+
response_stream = Some(self.next_generate.generate(request).await);
165+
if let Some(err) = response_stream.as_ref().unwrap().as_ref().err() {
166+
if let Some(req_err) = err.downcast_ref::<NatsRequestError>() {
167+
if matches!(req_err.kind(), NatsNoResponders) {
168+
tracing::info!("Creating new stream... retrying...");
169+
continue;
170+
}
171+
}
172+
}
173+
break;
174+
}
175+
match response_stream {
176+
Some(Ok(next_stream)) => {
177+
self.next_stream = Some(next_stream);
178+
Ok(())
179+
}
180+
Some(Err(err)) => Err(err), // should propagate streaming error if stream started
181+
None => Err(Error::msg(
182+
"Retries exhausted - should propagate streaming error",
183+
)),
184+
}
185+
}
163186

164-
// TODO: Why generate() does not implement Sync?
187+
// Same as `new_stream`, but spawns a new task to create the stream.
188+
// This can be used in place of `new_stream`, but keeping `new_stream` for the initial stream
189+
// creation due to performance reasons.
190+
async fn new_stream_spawn(&mut self) -> Result<()> {
191+
let mut response_stream: Option<Result<ManyOut<Annotated<LLMEngineOutput>>>> = None;
192+
while self.retries_left > 0 {
193+
self.retries_left -= 1;
194+
// TODO: Is there anything needed to pass between context?
195+
let request = SingleIn::new(self.request.clone());
165196
let next = self.next_generate.clone();
166197
let handle = tokio::spawn(async move { next.generate(request).await });
167198
response_stream = Some(match handle.await {
@@ -171,7 +202,6 @@ impl RetryManager {
171202
return Err(Error::msg("Failed to spawn generate stream"));
172203
}
173204
});
174-
175205
if let Some(err) = response_stream.as_ref().unwrap().as_ref().err() {
176206
if let Some(req_err) = err.downcast_ref::<NatsRequestError>() {
177207
if matches!(req_err.kind(), NatsNoResponders) {

0 commit comments

Comments
 (0)