Skip to content

Commit

Permalink
correctly close the room when the disconnection comes from the server (
Browse files Browse the repository at this point in the history
…#333)

* fix close

* reason

* avoid double calls to disconnected

* Update mod.rs
  • Loading branch information
theomonnom authored May 6, 2024
1 parent 567bbf8 commit 5085034
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 70 deletions.
8 changes: 8 additions & 0 deletions livekit/src/room/e2ee/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self, Debug, Formatter};

use self::key_provider::KeyProvider;

pub mod key_provider;
Expand All @@ -30,3 +32,9 @@ pub struct E2eeOptions {
pub encryption_type: EncryptionType,
pub key_provider: KeyProvider,
}

impl Debug for E2eeOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("E2eeOptions").field("encryption_type", &self.encryption_type).finish()
}
}
132 changes: 68 additions & 64 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ impl Default for DataPacket {
}
}

#[derive(Clone)]
#[derive(Default, Debug, Clone)]
pub struct Transcription {
pub participant_identity: String,
pub track_id: String,
pub segments: Vec<TranscriptionSegment>,
pub language: String,
}

#[derive(Clone)]
#[derive(Default, Debug, Clone)]
pub struct TranscriptionSegment {
pub id: String,
pub text: String,
Expand All @@ -213,7 +213,7 @@ pub struct TranscriptionSegment {
pub r#final: bool,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct RoomOptions {
pub auto_subscribe: bool,
pub adaptive_stream: bool,
Expand Down Expand Up @@ -243,14 +243,8 @@ impl Default for RoomOptions {
}
}

struct RoomHandle {
session_task: JoinHandle<()>,
close_emitter: oneshot::Sender<()>,
}

pub struct Room {
inner: Arc<RoomSession>,
handle: AsyncMutex<Option<RoomHandle>>,
}

impl Debug for Room {
Expand All @@ -263,12 +257,46 @@ impl Debug for Room {
}
}

struct RoomInfo {
metadata: String,
state: ConnectionState,
}

pub(crate) struct RoomSession {
rtc_engine: Arc<RtcEngine>,
sid: RoomSid,
name: String,
info: RwLock<RoomInfo>,
dispatcher: Dispatcher<RoomEvent>,
options: RoomOptions,
active_speakers: RwLock<Vec<Participant>>,
local_participant: LocalParticipant,
participants: RwLock<(
// Keep track of participants by sid and identity
HashMap<ParticipantSid, RemoteParticipant>,
HashMap<ParticipantIdentity, RemoteParticipant>,
)>,
e2ee_manager: E2eeManager,
room_task: AsyncMutex<Option<(JoinHandle<()>, oneshot::Sender<()>)>>,
}

impl Debug for RoomSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionInner")
.field("sid", &self.sid)
.field("name", &self.name)
.field("rtc_engine", &self.rtc_engine)
.finish()
}
}

impl Room {
pub async fn connect(
url: &str,
token: &str,
options: RoomOptions,
) -> RoomResult<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
// TODO(theomonnom): move connection logic to the RoomSession
let e2ee_manager = E2eeManager::new(options.e2ee.clone());
let (rtc_engine, join_response, engine_events) = RtcEngine::connect(
url,
Expand Down Expand Up @@ -378,6 +406,7 @@ impl Room {
local_participant,
dispatcher: dispatcher.clone(),
e2ee_manager: e2ee_manager.clone(),
room_task: Default::default(),
});

e2ee_manager.on_state_changed({
Expand Down Expand Up @@ -430,28 +459,15 @@ impl Room {
inner.dispatcher.dispatch(&RoomEvent::Connected { participants_with_tracks });
inner.update_connection_state(ConnectionState::Connected);

let (close_emitter, close_receiver) = oneshot::channel();
let session_task =
livekit_runtime::spawn(inner.clone().room_task(engine_events, close_receiver));
let (close_tx, close_rx) = oneshot::channel();
let room_task = livekit_runtime::spawn(inner.clone().room_task(engine_events, close_rx));
inner.room_task.lock().await.replace((room_task, close_tx));

Ok((
Self {
inner,
handle: AsyncMutex::new(Some(RoomHandle { session_task, close_emitter })),
},
events,
))
Ok((Self { inner }, events))
}

pub async fn close(&self) -> RoomResult<()> {
if let Some(handle) = self.handle.lock().await.take() {
self.inner.close().await;
let _ = handle.close_emitter.send(());
let _ = handle.session_task.await;
Ok(())
} else {
Err(RoomError::AlreadyClosed)
}
self.inner.close(DisconnectReason::ClientInitiated).await
}

pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
Expand Down Expand Up @@ -495,38 +511,6 @@ impl Room {
}
}

struct RoomInfo {
metadata: String,
state: ConnectionState,
}

pub(crate) struct RoomSession {
rtc_engine: Arc<RtcEngine>,
sid: RoomSid,
name: String,
info: RwLock<RoomInfo>,
dispatcher: Dispatcher<RoomEvent>,
options: RoomOptions,
active_speakers: RwLock<Vec<Participant>>,
local_participant: LocalParticipant,
participants: RwLock<(
// Keep track of participants by sid and identity
HashMap<ParticipantSid, RemoteParticipant>,
HashMap<ParticipantIdentity, RemoteParticipant>,
)>,
e2ee_manager: E2eeManager,
}

impl Debug for RoomSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionInner")
.field("sid", &self.sid)
.field("name", &self.name)
.field("rtc_engine", &self.rtc_engine)
.finish()
}
}

impl RoomSession {
async fn room_task(
self: Arc<Self>,
Expand Down Expand Up @@ -598,9 +582,19 @@ impl RoomSession {
Ok(())
}

async fn close(&self) {
self.rtc_engine.close().await;
self.e2ee_manager.cleanup();
async fn close(&self, reason: DisconnectReason) -> RoomResult<()> {
if let Some((room_task, close_tx)) = self.room_task.lock().await.take() {
self.rtc_engine.close(reason).await;
self.e2ee_manager.cleanup();

let _ = close_tx.send(());
let _ = room_task.await;

self.dispatcher.clear();
Ok(())
} else {
Err(RoomError::AlreadyClosed)
}
}

/// Change the connection state and emit an event
Expand Down Expand Up @@ -956,11 +950,21 @@ impl RoomSession {
let _ = tx.send(());
}

fn handle_disconnected(&self, reason: DisconnectReason) {
log::debug!("disconnected from room: {:?}", reason);
fn handle_disconnected(self: &Arc<Self>, reason: DisconnectReason) {
if self.update_connection_state(ConnectionState::Disconnected) {
self.dispatcher.dispatch(&RoomEvent::Disconnected { reason });
}

if reason != DisconnectReason::ClientInitiated {
log::error!("unexpectedly disconnected from room: {:?}", reason);

livekit_runtime::spawn({
let inner = self.clone();
async move {
let _ = inner.close(reason).await;
}
});
}
}

fn handle_data(
Expand Down
12 changes: 6 additions & 6 deletions livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ impl RtcEngine {
Ok((Self { inner }, join_response, engine_events))
}

pub async fn close(&self) {
self.inner.close(DisconnectReason::ClientInitiated).await
pub async fn close(&self, reason: DisconnectReason) {
self.inner.close(reason).await
}

pub async fn publish_data(
Expand Down Expand Up @@ -343,7 +343,7 @@ impl EngineInner {
async fn engine_task(
self: Arc<Self>,
mut session_events: SessionEvents,
mut close_receiver: oneshot::Receiver<()>,
mut close_rx: oneshot::Receiver<()>,
) {
loop {
tokio::select! {
Expand All @@ -368,7 +368,7 @@ impl EngineInner {

task.await;
},
_ = &mut close_receiver => {
_ = &mut close_rx => {
break;
}
}
Expand Down Expand Up @@ -442,8 +442,8 @@ impl EngineInner {
session.close().await;
let _ = close_tx.send(());
let _ = engine_task.await;
let _ = self.engine_tx.send(EngineEvent::Disconnected { reason });
}
let _ = self.engine_tx.send(EngineEvent::Disconnected { reason });
}

/// When waiting for reconnection, it ensures we're always using the latest session.
Expand Down Expand Up @@ -504,7 +504,7 @@ impl EngineInner {
}
res = inner.reconnect_task() => {
if res.is_err() {
log::error!("failed to reconnect");
log::error!("failed to reconnect to the livekit room");
inner.close(DisconnectReason::UnknownReason).await;
} else {
log::info!("RtcEngine successfully recovered")
Expand Down

0 comments on commit 5085034

Please sign in to comment.