Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Subscribed streams are associated with subscriptions. (#488)
Browse files Browse the repository at this point in the history
When subscribe a stream in conference mode, the subscribed stream no
longer associated with RemoteStream. It allows a RemoteStream to be
subscribed multiple times. It also allows a subscription has audio and
video track from different remote streams.

Since the signaling protocol defined by server side does not provide an
ID for a track, the SDK usually use stream ID + track kind to identify a
track. The stream ID and track ID mentioned in conference mode indicate
the ID assigned by conference sever, they could be different from
MediaStream ID and MediaStreamTrack ID.
  • Loading branch information
jianjunz authored Apr 25, 2021
1 parent d7ab0bd commit 7d31677
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 50 deletions.
2 changes: 2 additions & 0 deletions docs/mdfiles/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
Change Log
==========
# 5.1
* When subscribe a stream in conference mode, the subscribed MediaStream or BidirectionalStream is associated with a `Owt.Conference.Subscription` instead of a `Owt.Base.RemoteStream`. The `stream` property of a RemoteStream in conference mode is always undefined, while a new property `stream` is added to `Subscription`. It allows a RemoteStream to be subscribed multiple times, as well as subscribing audio and video tracks from different streams.
# 5.0
* Add WebTransport support for conference mode, see [this design doc](../../design/webtransport.md) for detailed information.
* All publications and subscriptions for the same conference use the same `PeerConnection`.
Expand Down
4 changes: 2 additions & 2 deletions src/samples/conference/public/scripts/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ const runSocketIOSample = function() {
}).then((
subscription) => {
subscirptionLocal = subscription;
$(`#${stream.id}`).get(0).srcObject = stream.mediaStream;
$(`#${stream.id}`).get(0).srcObject = subscription.stream;
});
}
let $p = createResolutionButtons(stream, subscribeDifferentResolution);
conference.subscribe(stream)
.then((subscription)=>{
subscirptionLocal = subscription;
let $video = $(`<video controls autoplay id=${stream.id} style="display:block" >this browser does not supported video tag</video>`);
$video.get(0).srcObject = stream.mediaStream;
$video.get(0).srcObject = subscription.stream;
$p.append($video);
}, (err)=>{ console.log('subscribe failed', err);
});
Expand Down
4 changes: 3 additions & 1 deletion src/sdk/base/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ export class LocalStream extends Stream {
}
/**
* @class RemoteStream
* @classDesc Stream sent from a remote endpoint.
* @classDesc Stream sent from a remote endpoint. In conference mode,
* RemoteStream's stream is always undefined. Please get MediaStream or
* ReadableStream from subscription's stream property.
* Events:
*
* | Event Name | Argument Type | Fired when |
Expand Down
75 changes: 29 additions & 46 deletions src/sdk/conference/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
this._pendingCandidates = [];
this._subscribePromises = new Map(); // internalId => { resolve, reject }
this._publishPromises = new Map(); // internalId => { resolve, reject }
this._subscribedStreams = new Map(); // intenalId => RemoteStream
this._publications = new Map(); // PublicationId => Publication
this._subscriptions = new Map(); // SubscriptionId => Subscription
this._publishTransceivers = new Map(); // internalId => { id, transceivers: [Transceiver] }
Expand All @@ -52,7 +51,6 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
// Timer for PeerConnection disconnected. Will stop connection after timer.
this._disconnectTimer = null;
this._ended = false;
this._stopped = false;
// Channel ID assigned by conference
this._id = undefined;
// Used to create internal ID for publication/subscription
Expand All @@ -61,6 +59,7 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
this._sdpResolverMap = new Map(); // internalId => {finish, resolve, reject}
this._sdpResolvers = []; // [{finish, resolve, reject}]
this._sdpResolveNum = 0;
this._remoteMediaStreams = new Map(); // Key is subscription ID, value is MediaStream.
}

/**
Expand Down Expand Up @@ -483,7 +482,6 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
offerOptions.offerToReceiveVideo = !!options.video;
}
this._subscribeTransceivers.set(internalId, {transceivers});
this._subscribedStreams.set(internalId, stream);

let localDesc;
this._pc.createOffer(offerOptions).then((desc) => {
Expand Down Expand Up @@ -674,11 +672,6 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
new ConferenceError('Failed to subscribe'));
}
}
// Clear media stream
if (this._subscribedStreams.has(internalId)) {
this._subscribedStreams.get(internalId).mediaStream = null;
this._subscribedStreams.delete(internalId);
}
if (this._sdpResolverMap.has(internalId)) {
const resolver = this._sdpResolverMap.get(internalId);
if (!resolver.finish) {
Expand Down Expand Up @@ -732,31 +725,24 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {

_onRemoteStreamAdded(event) {
Logger.debug('Remote stream added.');
let find = false;
for (const [internalId, sub] of this._subscribeTransceivers) {
const subscriptionId = sub.id;
if (sub.transceivers.find((t) => t.transceiver === event.transceiver)) {
find = true;
const subscribedStream = this._subscribedStreams.get(internalId);
if (!subscribedStream.mediaStream) {
this._subscribedStreams.get(internalId).mediaStream =
event.streams[0];
// Resolve subscription if ready handler has been called
const subscription = this._subscriptions.get(subscriptionId);
if (subscription) {
if (this._subscriptions.has(sub.id)) {
const subscription = this._subscriptions.get(sub.id);
subscription.stream = event.streams[0];
if (this._subscribePromises.has(internalId)) {
this._subscribePromises.get(internalId).resolve(subscription);
this._subscribePromises.delete(internalId);
}
} else {
// Add track to the existing stream
subscribedStream.mediaStream.addTrack(event.track);
this._remoteMediaStreams.set(sub.id, event.streams[0]);
}
return;
}
}
if (!find) {
// This is not expected path. However, this is going to happen on Safari
// because it does not support setting direction of transceiver.
Logger.warning('Received remote stream without subscription.');
}
// This is not expected path. However, this is going to happen on Safari
// because it does not support setting direction of transceiver.
Logger.warning('Received remote stream without subscription.');
}

_onLocalIceCandidate(event) {
Expand Down Expand Up @@ -889,23 +875,18 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
_readyHandler(sessionId) {
const internalId = this._reverseIdMap.get(sessionId);
if (this._subscribePromises.has(internalId)) {
const subscription = new Subscription(sessionId, () => {
const mediaStream = this._remoteMediaStreams.get(sessionId);
const subscription = new Subscription(sessionId, mediaStream, () => {
this._unsubscribe(internalId);
}, () => this._getStats(),
(trackKind) => this._muteOrUnmute(sessionId, true, false, trackKind),
(trackKind) => this._muteOrUnmute(sessionId, false, false, trackKind),
(options) => this._applyOptions(sessionId, options));
this._subscriptions.set(sessionId, subscription);
// Fire subscription's ended event when associated stream is ended.
this._subscribedStreams.get(internalId).addEventListener('ended', () => {
if (this._subscriptions.has(sessionId)) {
this._subscriptions.get(sessionId).dispatchEvent(
'ended', new OwtEvent('ended'));
}
});
// Resolve subscription if mediaStream is ready
if (this._subscribedStreams.get(internalId).mediaStream) {
// Resolve subscription if mediaStream is ready.
if (this._subscriptions.get(sessionId).stream) {
this._subscribePromises.get(internalId).resolve(subscription);
this._subscribePromises.delete(internalId);
}
} else if (this._publishPromises.has(internalId)) {
const publication = new Publication(sessionId, () => {
Expand Down Expand Up @@ -1056,21 +1037,23 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
return sdp;
}

// Handle stream event sent from MCU. Some stream events should be publication
// event or subscription event. It will be handled here.
// Handle stream event sent from MCU. Some stream update events sent from
// server, more specifically audio.status and video.status events should be
// publication event or subscription events. They don't change MediaStream's
// status. See
// https://github.com/open-webrtc-toolkit/owt-server/blob/master/doc/Client-Portal%20Protocol.md#339-participant-is-notified-on-streams-update-in-room
// for more information.
_onStreamEvent(message) {
const eventTargets = [];
if (this._publications.has(message.id)) {
eventTargets.push(this._publications.get(message.id));
}
for (const [internalId, subscribedStream] of this._subscribedStreams) {
if (message.id === subscribedStream.id) {
const subscriptionId = this._subscribeTransceivers.get(internalId).id;
eventTargets.push(this._subscriptions.get(subscriptionId));
break;
for (const subscription of this._subscriptions) {
if (message.id === subscription._audioTrackId ||
message.id === subscription._videoTrackId) {
eventTargets.push(subscription);
}
}

if (!eventTargets.length) {
return;
}
Expand Down Expand Up @@ -1100,9 +1083,9 @@ export class ConferencePeerConnectionChannel extends EventDispatcher {
// Only check the first one.
const param = obj[0];
return !!(
param.codecPayloadType || param.dtx || param.active || param.ptime ||
param.maxFramerate || param.scaleResolutionDownBy || param.rid ||
param.scalabilityMode);
param.codecPayloadType || param.dtx || param.active || param.ptime ||
param.maxFramerate || param.scaleResolutionDownBy || param.rid ||
param.scalabilityMode);
}

_isOwtEncodingParameters(obj) {
Expand Down
20 changes: 19 additions & 1 deletion src/sdk/conference/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export class SubscriptionUpdateOptions {
*/
export class Subscription extends EventDispatcher {
// eslint-disable-next-line require-jsdoc
constructor(id, stop, getStats, mute, unmute, applyOptions) {
constructor(id, stream, stop, getStats, mute, unmute, applyOptions) {
super();
if (!id) {
throw new TypeError('ID cannot be null or undefined.');
Expand All @@ -285,6 +285,19 @@ export class Subscription extends EventDispatcher {
writable: false,
value: id,
});
/**
* @member {MediaStream | BidirectionalStream} stream
* @instance
* @memberof Owt.Conference.Subscription
*/
Object.defineProperty(this, 'stream', {
configurable: false,
// TODO: It should be a readonly property, but current implementation
// creates Subscription after receiving 'ready' from server. At this time,
// MediaStream may not be available.
writable: true,
value: stream,
});
/**
* @function stop
* @instance
Expand Down Expand Up @@ -328,5 +341,10 @@ export class Subscription extends EventDispatcher {
* @returns {Promise<undefined, Error>}
*/
this.applyOptions = applyOptions;

// Track is not defined in server protocol. So these IDs are equal to their
// stream's ID at this time.
this._audioTrackId = undefined;
this._videoTrackId = undefined;
}
}

0 comments on commit 7d31677

Please sign in to comment.