Skip to content

Commit

Permalink
RTC: Support WebRTC re-publish stream. 4.0.87
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Mar 24, 2021
1 parent 0cb05a2 commit d6c16a7
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 19 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ Other documents:

## V4 changes

* v4.0, 2021-03-24, RTC: Support WebRTC re-publish stream. 4.0.87
* v4.0, 2021-03-24, RTC: Use fast parse TWCCID, ignore in packet parsing. 4.0.86
* v4.0, 2021-03-09, DTLS: Fix ARQ bug, use openssl timeout. 4.0.84
* v4.0, 2021-03-08, DTLS: Fix dead loop by duplicated Alert message. 4.0.83
Expand Down
27 changes: 27 additions & 0 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,30 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, Srs
return err;
}

void SrsRtcPlayStream::on_stream_change(SrsRtcStreamDescription* desc)
{
// Refresh the relation for audio.
// TODO: FIMXE: Match by label?
if (desc->audio_track_desc_ && audio_tracks_.size() == 1) {
uint32_t ssrc = desc->audio_track_desc_->ssrc_;
SrsRtcAudioSendTrack* track = audio_tracks_.begin()->second;

audio_tracks_.clear();
audio_tracks_.insert(make_pair(ssrc, track));
}

// Refresh the relation for video.
// TODO: FIMXE: Match by label?
if (desc->video_track_descs_.size() == 1 && desc->video_track_descs_.size() == 1) {
SrsRtcTrackDescription* vdesc = desc->video_track_descs_.at(0);
uint32_t ssrc = vdesc->ssrc_;
SrsRtcVideoSendTrack* track = video_tracks_.begin()->second;

video_tracks_.clear();
video_tracks_.insert(make_pair(ssrc, track));
}
}

srs_error_t SrsRtcPlayStream::on_reload_vhost_play(string vhost)
{
if (req_->vhost != vhost) {
Expand Down Expand Up @@ -546,6 +570,9 @@ srs_error_t SrsRtcPlayStream::cycle()
return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str());
}

srs_assert(consumer);
consumer->set_handler(this);

// TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames.
if ((err = source->consumer_dumps(consumer)) != srs_success) {
return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str());
Expand Down
9 changes: 6 additions & 3 deletions trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class SrsRtcPLIWorker : virtual public ISrsCoroutineHandler

// A RTC play stream, client pull and play stream from SRS.
class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler
, virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback
{
private:
SrsContextId cid_;
Expand All @@ -235,13 +235,16 @@ class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISr
bool nack_enabled_;
bool nack_no_copy_;
private:
// Whether palyer started.
// Whether player started.
bool is_started;
public:
SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid);
virtual ~SrsRtcPlayStream();
public:
srs_error_t initialize(SrsRequest* request, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations);
// Interface ISrsRtcStreamChangeCallback
public:
void on_stream_change(SrsRtcStreamDescription* desc);
// interface ISrsReloadHandler
public:
virtual srs_error_t on_reload_vhost_play(std::string vhost);
Expand All @@ -268,7 +271,7 @@ class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISr
srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp);
srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp);
uint32_t get_video_publish_ssrc(uint32_t play_ssrc);
// inteface ISrsRtcPLIWorkerHandler
// Interface ISrsRtcPLIWorkerHandler
public:
virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid);
};
Expand Down
49 changes: 37 additions & 12 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,19 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp)
return srs_ntp;
}

ISrsRtcStreamChangeCallback::ISrsRtcStreamChangeCallback()
{
}

ISrsRtcStreamChangeCallback::~ISrsRtcStreamChangeCallback()
{
}

SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s)
{
source = s;
should_update_source_id = false;
handler_ = NULL;

mw_wait = srs_cond_new();
mw_min_msgs = 0;
Expand Down Expand Up @@ -231,6 +240,13 @@ void SrsRtcConsumer::wait(int nb_msgs)
srs_cond_wait(mw_wait);
}

void SrsRtcConsumer::on_stream_change(SrsRtcStreamDescription* desc)
{
if (handler_) {
handler_->on_stream_change(desc);
}
}

SrsRtcStreamManager::SrsRtcStreamManager()
{
lock = NULL;
Expand Down Expand Up @@ -354,24 +370,34 @@ void SrsRtcStream::update_auth(SrsRequest* r)
req->update_auth(r);
}

srs_error_t SrsRtcStream::on_source_id_changed(SrsContextId id)
srs_error_t SrsRtcStream::on_source_changed()
{
srs_error_t err = srs_success;

if (!_source_id.compare(id)) {
return err;
}
// Update context id if changed.
bool id_changed = false;
const SrsContextId& id = _srs_context->get_id();
if (_source_id.compare(id)) {
id_changed = true;

if (_pre_source_id.empty()) {
_pre_source_id = id;
if (_pre_source_id.empty()) {
_pre_source_id = id;
}
_source_id = id;
}
_source_id = id;

// notice all consumer
// Notify all consumers.
std::vector<SrsRtcConsumer*>::iterator it;
for (it = consumers.begin(); it != consumers.end(); ++it) {
SrsRtcConsumer* consumer = *it;
consumer->update_source_id();

// Notify if context id changed.
if (id_changed) {
consumer->update_source_id();
}

// Notify about stream description.
consumer->on_stream_change(stream_desc_);
}

return err;
Expand Down Expand Up @@ -456,9 +482,8 @@ srs_error_t SrsRtcStream::on_publish()
is_created_ = true;
is_delivering_packets_ = true;

// whatever, the publish thread is the source or edge source,
// save its id to srouce id.
if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) {
// Notify the consumers about stream change event.
if ((err = on_source_changed()) != srs_success) {
return srs_error_wrap(err, "source id change");
}

Expand Down
25 changes: 22 additions & 3 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ class SrsNtp
static uint64_t kMagicNtpFractionalUnit;
};

// When RTC stream publish and re-publish.
class ISrsRtcStreamChangeCallback
{
public:
ISrsRtcStreamChangeCallback();
virtual ~ISrsRtcStreamChangeCallback();
public:
virtual void on_stream_change(SrsRtcStreamDescription* desc) = 0;
};

// The RTC stream consumer, consume packets from RTC stream source.
class SrsRtcConsumer
{
private:
Expand All @@ -87,6 +98,9 @@ class SrsRtcConsumer
srs_cond_t mw_wait;
bool mw_waiting;
int mw_min_msgs;
private:
// The callback for stream change event.
ISrsRtcStreamChangeCallback* handler_;
public:
SrsRtcConsumer(SrsRtcStream* s);
virtual ~SrsRtcConsumer();
Expand All @@ -100,6 +114,9 @@ class SrsRtcConsumer
virtual srs_error_t dump_packet(SrsRtpPacket2** ppkt);
// Wait for at-least some messages incoming in queue.
virtual void wait(int nb_msgs);
public:
void set_handler(ISrsRtcStreamChangeCallback* h) { handler_ = h; } // SrsRtcConsumer::set_handler()
void on_stream_change(SrsRtcStreamDescription* desc);
};

class SrsRtcStreamManager
Expand Down Expand Up @@ -154,7 +171,7 @@ class SrsRtcStream
// For publish, it's the publish client id.
// For edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know.
// invoke the on_source_changed() to let all clients know.
SrsContextId _source_id;
// previous source id.
SrsContextId _pre_source_id;
Expand All @@ -180,8 +197,10 @@ class SrsRtcStream
virtual srs_error_t initialize(SrsRequest* r);
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
// The source id changed.
virtual srs_error_t on_source_id_changed(SrsContextId id);
private:
// The stream source changed.
virtual srs_error_t on_source_changed();
public:
// Get current source id.
virtual SrsContextId source_id();
virtual SrsContextId pre_source_id();
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core_version4.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@

#define VERSION_MAJOR 4
#define VERSION_MINOR 0
#define VERSION_REVISION 86
#define VERSION_REVISION 87

#endif

0 comments on commit d6c16a7

Please sign in to comment.