Skip to content

Commit

Permalink
Only reject connection promise if it was triggered by a call to conne…
Browse files Browse the repository at this point in the history
…ct() (#359)

* Only reject connection promise if it was triggered by a call to connect()

* changeset

* immediately return promise from connect function, clean up failed connection attempts

* address comments

* only reject promise in disconnect if it's a reconnection
  • Loading branch information
lukasIO authored Jul 28, 2022
1 parent 72f7abb commit 31e3883
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 150 deletions.
5 changes: 5 additions & 0 deletions .changeset/breezy-chefs-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Only reject connection promise if it was triggered by a call to connect()
288 changes: 143 additions & 145 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
/** used for aborting pending connections to a LiveKit server */
private abortController?: AbortController;

/** future holding client initiated connection attempt */
private connectFuture?: Future<void>;

/** future holding sdk initiated reconnection attempt */
private reconnectFuture?: Future<void>;

/**
* Creates a new Room, the primary construct for a LiveKit session.
* @param options
Expand Down Expand Up @@ -156,12 +160,17 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
.on(EngineEvent.ActiveSpeakersUpdate, this.handleActiveSpeakersUpdate)
.on(EngineEvent.DataPacketReceived, this.handleDataPacket)
.on(EngineEvent.Resuming, () => {
if (!this.reconnectFuture) {
this.reconnectFuture = new Future();
}
if (this.setAndEmitConnectionState(ConnectionState.Reconnecting)) {
this.emit(RoomEvent.Reconnecting);
}
})
.on(EngineEvent.Resumed, () => {
this.setAndEmitConnectionState(ConnectionState.Connected);
this.reconnectFuture?.resolve();
this.reconnectFuture = undefined;
this.emit(RoomEvent.Reconnected);
this.updateSubscriptions();
})
Expand Down Expand Up @@ -193,157 +202,159 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
return DeviceManager.getInstance().getDevices(kind, requestPermissions);
}

connect = async (url: string, token: string, opts?: RoomConnectOptions): Promise<void> => {
connect = (url: string, token: string, opts?: RoomConnectOptions): Promise<void> => {
if (this.state === ConnectionState.Connected) {
// when the state is reconnecting or connected, this function returns immediately
log.warn(`already connected to room ${this.name}`);
return;
return Promise.resolve();
}

if (this.connectFuture) {
return this.connectFuture.promise;
} else if (this.reconnectFuture) {
this.connectFuture = this.reconnectFuture;
return this.connectFuture.promise;
}
this.setAndEmitConnectionState(ConnectionState.Connecting);
const connectPromise = new Promise<void>(async (resolve, reject) => {
this.setAndEmitConnectionState(ConnectionState.Connecting);
if (!this.abortController || this.abortController.signal.aborted) {
this.abortController = new AbortController();
}

if (!this.abortController || this.abortController.signal.aborted) {
this.abortController = new AbortController();
}
// recreate engine if previously disconnected
this.createEngine();

// recreate engine if previously disconnected
this.createEngine();
this.acquireAudioContext();

this.acquireAudioContext();
if (opts?.rtcConfig) {
this.engine.rtcConfig = opts.rtcConfig;
}

if (opts?.rtcConfig) {
this.engine.rtcConfig = opts.rtcConfig;
}
this.connOptions = opts;

try {
const joinResponse = await this.engine.join(
url,
token,
{
autoSubscribe: opts?.autoSubscribe,
publishOnly: opts?.publishOnly,
adaptiveStream:
typeof this.options?.adaptiveStream === 'object'
? true
: this.options?.adaptiveStream,
},
this.abortController.signal,
);
log.debug(
`connected to Livekit Server version: ${joinResponse.serverVersion}, region: ${joinResponse.serverRegion}`,
);

this.connOptions = opts;
if (!joinResponse.serverVersion) {
throw new UnsupportedServer('unknown server version');
}

try {
const joinResponse = await this.engine.join(
url,
token,
{
autoSubscribe: opts?.autoSubscribe,
publishOnly: opts?.publishOnly,
adaptiveStream:
typeof this.options?.adaptiveStream === 'object' ? true : this.options?.adaptiveStream,
},
this.abortController.signal,
);
log.debug(
`connected to Livekit Server version: ${joinResponse.serverVersion}, region: ${joinResponse.serverRegion}`,
);
if (joinResponse.serverVersion === '0.15.1' && this.options.dynacast) {
log.debug('disabling dynacast due to server version');
// dynacast has a bug in 0.15.1, so we cannot use it then
this.options.dynacast = false;
}

if (!joinResponse.serverVersion) {
throw new UnsupportedServer('unknown server version');
}
const pi = joinResponse.participant!;

this.localParticipant.sid = pi.sid;
this.localParticipant.identity = pi.identity;

this.localParticipant.updateInfo(pi);
// forward metadata changed for the local participant
this.localParticipant
.on(ParticipantEvent.ParticipantMetadataChanged, this.onLocalParticipantMetadataChanged)
.on(ParticipantEvent.TrackMuted, this.onLocalTrackMuted)
.on(ParticipantEvent.TrackUnmuted, this.onLocalTrackUnmuted)
.on(ParticipantEvent.LocalTrackPublished, this.onLocalTrackPublished)
.on(ParticipantEvent.LocalTrackUnpublished, this.onLocalTrackUnpublished)
.on(ParticipantEvent.ConnectionQualityChanged, this.onLocalConnectionQualityChanged)
.on(ParticipantEvent.MediaDevicesError, this.onMediaDevicesError)
.on(
ParticipantEvent.ParticipantPermissionsChanged,
this.onLocalParticipantPermissionsChanged,
);

if (joinResponse.serverVersion === '0.15.1' && this.options.dynacast) {
log.debug('disabling dynacast due to server version');
// dynacast has a bug in 0.15.1, so we cannot use it then
this.options.dynacast = false;
}
// populate remote participants, these should not trigger new events
joinResponse.otherParticipants.forEach((info) => {
if (
info.sid !== this.localParticipant.sid &&
info.identity !== this.localParticipant.identity
) {
this.getOrCreateParticipant(info.sid, info);
} else {
log.warn('received info to create local participant as remote participant', {
info,
localParticipant: this.localParticipant,
});
}
});

const pi = joinResponse.participant!;

this.localParticipant.sid = pi.sid;
this.localParticipant.identity = pi.identity;

this.localParticipant.updateInfo(pi);
// forward metadata changed for the local participant
this.localParticipant
.on(ParticipantEvent.ParticipantMetadataChanged, this.onLocalParticipantMetadataChanged)
.on(ParticipantEvent.TrackMuted, this.onLocalTrackMuted)
.on(ParticipantEvent.TrackUnmuted, this.onLocalTrackUnmuted)
.on(ParticipantEvent.LocalTrackPublished, this.onLocalTrackPublished)
.on(ParticipantEvent.LocalTrackUnpublished, this.onLocalTrackUnpublished)
.on(ParticipantEvent.ConnectionQualityChanged, this.onLocalConnectionQualityChanged)
.on(ParticipantEvent.MediaDevicesError, this.onMediaDevicesError)
.on(
ParticipantEvent.ParticipantPermissionsChanged,
this.onLocalParticipantPermissionsChanged,
);
this.name = joinResponse.room!.name;
this.sid = joinResponse.room!.sid;
this.metadata = joinResponse.room!.metadata;
this.emit(RoomEvent.SignalConnected);
} catch (err) {
this.recreateEngine();
this.handleDisconnect(this.options.stopLocalTrackOnUnpublish);
reject(new ConnectionError('could not establish signal connection'));
}

// populate remote participants, these should not trigger new events
joinResponse.otherParticipants.forEach((info) => {
if (
info.sid !== this.localParticipant.sid &&
info.identity !== this.localParticipant.identity
) {
this.getOrCreateParticipant(info.sid, info);
} else {
log.warn('received info to create local participant as remote participant', {
info,
localParticipant: this.localParticipant,
});
// don't return until ICE connected
const connectTimeout = setTimeout(() => {
// timeout
this.recreateEngine();
this.handleDisconnect(this.options.stopLocalTrackOnUnpublish);
reject(new ConnectionError('could not connect PeerConnection after timeout'));
}, maxICEConnectTimeout);
const abortHandler = () => {
log.warn('closing engine');
clearTimeout(connectTimeout);
this.recreateEngine();
this.handleDisconnect(this.options.stopLocalTrackOnUnpublish);
reject(new ConnectionError('room connection has been cancelled'));
};
if (this.abortController?.signal.aborted) {
abortHandler();
}
this.abortController?.signal.addEventListener('abort', abortHandler);

this.engine.once(EngineEvent.Connected, () => {
clearTimeout(connectTimeout);
this.abortController?.signal.removeEventListener('abort', abortHandler);
// also hook unload event
if (isWeb()) {
window.addEventListener('beforeunload', this.onBeforeUnload);
navigator.mediaDevices?.addEventListener('devicechange', this.handleDeviceChange);
}
this.setAndEmitConnectionState(ConnectionState.Connected);
resolve();
});

this.name = joinResponse.room!.name;
this.sid = joinResponse.room!.sid;
this.metadata = joinResponse.room!.metadata;
this.emit(RoomEvent.SignalConnected);
} catch (err) {
this.recreateEngine();
this.setAndEmitConnectionState(
ConnectionState.Disconnected,
new ConnectionError('could not establish signal connection'),
);
throw err;
}

// don't return until ICE connected
const connectTimeout = setTimeout(() => {
// timeout
this.recreateEngine();
this.setAndEmitConnectionState(
ConnectionState.Disconnected,
new ConnectionError('could not connect PeerConnection after timeout'),
);
}, maxICEConnectTimeout);
const abortHandler = () => {
log.warn('closing engine');
clearTimeout(connectTimeout);
this.recreateEngine();
this.setAndEmitConnectionState(
ConnectionState.Disconnected,
new ConnectionError('room connection has been cancelled'),
);
};
if (this.abortController?.signal.aborted) {
abortHandler();
}
this.abortController?.signal.addEventListener('abort', abortHandler);

this.engine.once(EngineEvent.Connected, () => {
clearTimeout(connectTimeout);
this.abortController?.signal.removeEventListener('abort', abortHandler);
// also hook unload event
if (isWeb()) {
window.addEventListener('beforeunload', this.onBeforeUnload);
navigator.mediaDevices?.addEventListener('devicechange', this.handleDeviceChange);
}
this.setAndEmitConnectionState(ConnectionState.Connected);
});
this.connectFuture = new Future(connectPromise);

if (this.connectFuture) {
/** @ts-ignore */
return this.connectFuture.promise;
}
this.connectFuture.promise.finally(() => (this.connectFuture = undefined));

return this.connectFuture.promise;
};

/**
* disconnects the room, emits [[RoomEvent.Disconnected]]
*/
disconnect = async (stopTracks = true) => {
log.info('disconnect from room', { identity: this.localParticipant.identity });
if (this.state === ConnectionState.Connecting) {
if (this.state === ConnectionState.Connecting || this.state === ConnectionState.Reconnecting) {
// try aborting pending connection attempt
log.warn('abort connection attempt');
this.abortController?.abort();
return;
// in case the abort controller didn't manage to cancel the connection attempt, reject the connect promise explicitly
this.connectFuture?.reject(new ConnectionError('Client initiated disconnect'));
this.connectFuture = undefined;
}
// send leave
if (this.engine?.client.isConnected) {
Expand All @@ -353,7 +364,6 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
if (this.engine) {
this.engine.close();
}

this.handleDisconnect(stopTracks, DisconnectReason.CLIENT_INITIATED);
/* @ts-ignore */
this.engine = undefined;
Expand Down Expand Up @@ -571,6 +581,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}

private handleRestarting = () => {
if (!this.reconnectFuture) {
this.reconnectFuture = new Future();
}
// also unwind existing participants & existing subscriptions
for (const p of this.participants.values()) {
this.handleParticipantDisconnected(p.sid, p);
Expand All @@ -587,6 +600,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
this.setAndEmitConnectionState(ConnectionState.Connected);
this.emit(RoomEvent.Reconnected);
this.reconnectFuture?.resolve();
this.reconnectFuture = undefined;

// rehydrate participants
if (joinResponse.participant) {
Expand Down Expand Up @@ -630,6 +645,13 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
if (this.state === ConnectionState.Disconnected) {
return;
}
// reject potentially ongoing reconnection attempt
if (this.connectFuture === this.reconnectFuture) {
this.connectFuture?.reject(undefined);
this.connectFuture = undefined;
this.reconnectFuture = undefined;
}

this.participants.forEach((p) => {
p.tracks.forEach((pub) => {
p.unpublishTrack(pub.trackSid);
Expand Down Expand Up @@ -1030,35 +1052,11 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private setAndEmitConnectionState(state: ConnectionState, error?: Error): boolean {
private setAndEmitConnectionState(state: ConnectionState): boolean {
if (state === this.state) {
// unchanged
return false;
}
switch (state) {
case ConnectionState.Connecting:
case ConnectionState.Reconnecting:
if (!this.connectFuture) {
// reuse existing connect future if possible
this.connectFuture = new Future<void>();
}
break;
case ConnectionState.Connected:
if (this.connectFuture) {
this.connectFuture.resolve();
this.connectFuture = undefined;
}
break;
case ConnectionState.Disconnected:
if (this.connectFuture) {
error ??= new Error('disconnected from Room');
this.connectFuture.reject(error);
this.connectFuture = undefined;
}
break;
default:
// nothing
}
this.state = state;
this.emit(RoomEvent.ConnectionStateChanged, this.state);
return true;
Expand Down
Loading

0 comments on commit 31e3883

Please sign in to comment.