diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 988d982fa..cb3e390d5 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -69,6 +69,8 @@ pub enum SignalError { SendError, #[error("failed to retrieve region info: {0}")] RegionError(String), + #[error("server sent leave during reconnect: reason={reason:?}, action={action:?}")] + LeaveRequest { reason: proto::DisconnectReason, action: proto::leave_request::Action }, } #[derive(Debug, Clone)] @@ -531,11 +533,33 @@ get_async_message!( proto::JoinResponse ); -get_async_message!( - get_reconnect_response, - proto::signal_response::Message::Reconnect(msg) => msg, - proto::ReconnectResponse -); +async fn get_reconnect_response( + receiver: &mut mpsc::UnboundedReceiver>, +) -> SignalResult { + let join = async { + while let Some(event) = receiver.recv().await { + match *event { + proto::signal_response::Message::Reconnect(msg) => return Ok(msg), + proto::signal_response::Message::Leave(leave) => { + return Err(SignalError::LeaveRequest { + reason: leave.reason(), + action: leave.action(), + }); + } + _ => {} + } + } + + Err(WsError::ConnectionClosed)? + }; + + livekit_runtime::timeout(JOIN_RESPONSE_TIMEOUT, join).await.map_err(|_| { + SignalError::Timeout(format!( + "failed to receive {}", + std::any::type_name::() + )) + })? +} #[cfg(test)] mod tests { diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index c14f8d0a6..191601482 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -663,7 +663,12 @@ impl EngineInner { // If we're already reconnecting just update the interval to restart a new attempt // ASAP - running_handle.full_reconnect = full_reconnect; + // Only escalate to full reconnect, never downgrade. Stale signal-close + // events (which request resume) must not override a full reconnect decision + // made by the reconnect loop after a failed resume attempt. + if full_reconnect { + running_handle.full_reconnect = true; + } if retry_now { let inner = self.clone();