Skip to content

Commit

Permalink
Presence of Track subscription to take priority over allowed status. (#…
Browse files Browse the repository at this point in the history
…306)

* Presence of Track subscription to take priority over allowed status.

Client may end up with a copy of incorrect RemoteTrackPublication.allowed
status, if rapid successions of subscribe/unsubscribe/permission updates
occur.

* removed unused function

* fix disconnect failure due to request queue flushing

* improve event handling for Track subscriptions

* changeset

* improved disconnection handling

* mark internal
  • Loading branch information
davidzhao authored Jul 3, 2022
1 parent e1ddb16 commit c786143
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 38 deletions.
5 changes: 5 additions & 0 deletions .changeset/lucky-icons-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Determine track allowed status primarily by precense of Track
5 changes: 5 additions & 0 deletions .changeset/olive-maps-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Improved Track event handling for permission changed
5 changes: 5 additions & 0 deletions .changeset/violet-hats-scream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Room.disconnect is now an async function
2 changes: 1 addition & 1 deletion src/api/SignalClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'webrtc-adapter';
import Queue from 'async-await-queue';
import 'webrtc-adapter';
import log from '../logger';
import {
ClientInfo,
Expand Down
36 changes: 20 additions & 16 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
/**
* disconnects the room, emits [[RoomEvent.Disconnected]]
*/
disconnect = (stopTracks = true) => {
disconnect = async (stopTracks = true) => {
log.info('disconnect from room', { identity: this.localParticipant.identity });
if (this.state === ConnectionState.Connecting) {
// try aborting pending connection attempt
log.warn('abort connection attempt');
Expand All @@ -367,14 +368,14 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
// send leave
if (this.engine?.client.isConnected) {
this.engine.client.sendLeave();
await this.engine.client.sendLeave();
}
// close engine (also closes client)
if (this.engine) {
this.engine.close();
}

this.handleDisconnect(stopTracks);
this.handleDisconnect(stopTracks, DisconnectReason.CLIENT_INITIATED);
/* @ts-ignore */
this.engine = undefined;
};
Expand Down Expand Up @@ -647,6 +648,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
};

private handleDisconnect(shouldStopTracks = true, reason?: DisconnectReason) {
if (this.state === ConnectionState.Disconnected) {
return;
}
this.participants.forEach((p) => {
p.tracks.forEach((pub) => {
p.unpublishTrack(pub.trackSid);
Expand All @@ -662,6 +666,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
pub.track?.stop();
}
});
this.localParticipant.tracks.clear();
this.localParticipant.videoTracks.clear();
this.localParticipant.audioTracks.clear();

this.participants.clear();
this.activeSpeakers = [];
Expand Down Expand Up @@ -823,18 +830,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
return;
}

pub._allowed = update.allowed;
participant.emit(
ParticipantEvent.TrackSubscriptionPermissionChanged,
pub,
pub.subscriptionStatus,
);
this.emitWhenConnected(
RoomEvent.TrackSubscriptionPermissionChanged,
pub,
pub.subscriptionStatus,
participant,
);
pub.setAllowed(update.allowed);
};

private handleDataPacket = (userPacket: UserPacket, kind: DataPacket_Kind) => {
Expand Down Expand Up @@ -971,7 +967,15 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
participant,
);
},
);
)
.on(ParticipantEvent.TrackSubscriptionPermissionChanged, (pub, status) => {
this.emitWhenConnected(
RoomEvent.TrackSubscriptionPermissionChanged,
pub,
status,
participant,
);
});

// update info at the end after callbacks have been set up
if (info) {
Expand Down
7 changes: 7 additions & 0 deletions src/room/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ export enum TrackEvent {
Muted = 'muted',
Unmuted = 'unmuted',
Ended = 'ended',
Subscribed = 'subscribed',
Unsubscribed = 'unsubscribed',
/** @internal */
UpdateSettings = 'updateSettings',
/** @internal */
Expand Down Expand Up @@ -433,4 +435,9 @@ export enum TrackEvent {
* Only fires on LocalTracks
*/
UpstreamResumed = 'upstreamResumed',
/**
* @internal
* Fires on RemoteTrackPublication
*/
SubscriptionPermissionChanged = 'subscriptionPermissionChanged',
}
22 changes: 12 additions & 10 deletions src/room/participant/RemoteParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import RemoteAudioTrack from '../track/RemoteAudioTrack';
import RemoteTrackPublication from '../track/RemoteTrackPublication';
import RemoteVideoTrack from '../track/RemoteVideoTrack';
import { Track } from '../track/Track';
import { TrackPublication } from '../track/TrackPublication';
import { AdaptiveStreamSettings, RemoteTrack } from '../track/types';
import Participant, { ParticipantEventCallbacks } from './Participant';

Expand Down Expand Up @@ -49,8 +50,17 @@ export default class RemoteParticipant extends Participant {
});
this.signalClient.sendUpdateSubscription(sub);
});
publication.on(TrackEvent.Ended, (track: RemoteTrack) => {
this.emit(ParticipantEvent.TrackUnsubscribed, track, publication);
publication.on(
TrackEvent.SubscriptionPermissionChanged,
(status: TrackPublication.SubscriptionStatus) => {
this.emit(ParticipantEvent.TrackSubscriptionPermissionChanged, publication, status);
},
);
publication.on(TrackEvent.Subscribed, (track: RemoteTrack) => {
this.emit(ParticipantEvent.TrackSubscribed, track, publication);
});
publication.on(TrackEvent.Unsubscribed, (previousTrack: RemoteTrack) => {
this.emit(ParticipantEvent.TrackUnsubscribed, previousTrack, publication);
});
}

Expand Down Expand Up @@ -156,8 +166,6 @@ export default class RemoteParticipant extends Participant {
track.start();

publication.setTrack(track);
// subscription means participant has permissions to subscribe
publication._allowed = true;
// set participant volume on new microphone tracks
if (
this.volume !== undefined &&
Expand All @@ -166,7 +174,6 @@ export default class RemoteParticipant extends Participant {
) {
track.setVolume(this.volume);
}
this.emit(ParticipantEvent.TrackSubscribed, track, publication);

return publication;
}
Expand Down Expand Up @@ -251,13 +258,8 @@ export default class RemoteParticipant extends Participant {
// also send unsubscribe, if track is actively subscribed
const { track } = publication;
if (track) {
const { isSubscribed } = publication;
track.stop();
publication.setTrack(undefined);
// always send unsubscribed, since apps may rely on this
if (isSubscribed) {
this.emit(ParticipantEvent.TrackUnsubscribed, track, publication);
}
}
if (sendUnpublish) {
this.emit(ParticipantEvent.TrackUnpublished, publication);
Expand Down
48 changes: 37 additions & 11 deletions src/room/track/RemoteTrackPublication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export default class RemoteTrackPublication extends TrackPublication {
track?: RemoteTrack;

/** @internal */
_allowed = true;
protected allowed = true;

// keeps track of client's desire to subscribe to a track
protected subscribed?: boolean;
Expand All @@ -28,6 +28,9 @@ export default class RemoteTrackPublication extends TrackPublication {
*/
setSubscribed(subscribed: boolean) {
this.subscribed = subscribed;
// reset allowed status when desired subscription state changes
// server will notify client via signal message if it's not allowed
this.allowed = true;

const sub: UpdateSubscription = {
trackSids: [this.trackSid],
Expand All @@ -46,11 +49,11 @@ export default class RemoteTrackPublication extends TrackPublication {

get subscriptionStatus(): TrackPublication.SubscriptionStatus {
if (this.subscribed === false || !super.isSubscribed) {
if (!this.allowed) {
return TrackPublication.SubscriptionStatus.NotAllowed;
}
return TrackPublication.SubscriptionStatus.Unsubscribed;
}
if (!this._allowed) {
return TrackPublication.SubscriptionStatus.NotAllowed;
}
return TrackPublication.SubscriptionStatus.Subscribed;
}

Expand All @@ -61,9 +64,6 @@ export default class RemoteTrackPublication extends TrackPublication {
if (this.subscribed === false) {
return false;
}
if (!this._allowed) {
return false;
}
return super.isSubscribed;
}

Expand Down Expand Up @@ -127,11 +127,13 @@ export default class RemoteTrackPublication extends TrackPublication {

/** @internal */
setTrack(track?: Track) {
if (this.track) {
const prevStatus = this.subscriptionStatus;
const prevTrack = this.track;
if (prevTrack) {
// unregister listener
this.track.off(TrackEvent.VideoDimensionsChanged, this.handleVideoDimensionsChange);
this.track.off(TrackEvent.VisibilityChanged, this.handleVisibilityChange);
this.track.off(TrackEvent.Ended, this.handleEnded);
prevTrack.off(TrackEvent.VideoDimensionsChanged, this.handleVideoDimensionsChange);
prevTrack.off(TrackEvent.VisibilityChanged, this.handleVisibilityChange);
prevTrack.off(TrackEvent.Ended, this.handleEnded);
}
super.setTrack(track);
if (track) {
Expand All @@ -140,6 +142,22 @@ export default class RemoteTrackPublication extends TrackPublication {
track.on(TrackEvent.VisibilityChanged, this.handleVisibilityChange);
track.on(TrackEvent.Ended, this.handleEnded);
}
this.emitSubscriptionUpdateIfChanged(prevStatus);
if (!!track !== !!prevTrack) {
// when undefined status changes, there's a subscription changed event
if (track) {
this.emit(TrackEvent.Subscribed, track);
} else {
this.emit(TrackEvent.Unsubscribed, prevTrack);
}
}
}

/** @internal */
setAllowed(allowed: boolean) {
const prevStatus = this.subscriptionStatus;
this.allowed = allowed;
this.emitSubscriptionUpdateIfChanged(prevStatus);
}

/** @internal */
Expand All @@ -149,6 +167,14 @@ export default class RemoteTrackPublication extends TrackPublication {
this.track?.setMuted(info.muted);
}

private emitSubscriptionUpdateIfChanged(previousStatus: TrackPublication.SubscriptionStatus) {
const currentStatus = this.subscriptionStatus;
if (previousStatus === currentStatus) {
return;
}
this.emit(TrackEvent.SubscriptionPermissionChanged, currentStatus, previousStatus);
}

private isManualOperationAllowed(): boolean {
if (this.isAdaptiveStream) {
log.warn('adaptive stream is enabled, cannot change track settings', {
Expand Down

0 comments on commit c786143

Please sign in to comment.