Skip to content

Commit

Permalink
Refine shared fast timer. 4.0.105
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed May 8, 2021
1 parent f370259 commit 2ad24b2
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 122 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ The ports used by SRS:

## V4 changes

* v4.0, 2021-05-08, Refine shared fast timer. 4.0.105
* v4.0, 2021-05-08, Refine global or thread-local variables initialize. 4.0.104
* v4.0, 2021-05-07, RTC: Support circuit breaker. 4.0.103
* v4.0, 2021-05-07, RTC: Refine play stream find track. 4.0.102
Expand Down
229 changes: 126 additions & 103 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,6 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid)

SrsRtcPlayStream::~SrsRtcPlayStream()
{
_srs_hybrid->timer1s()->unsubscribe(this);

// TODO: FIXME: Should not do callback in de-constructor?
if (_srs_rtc_hijacker) {
_srs_rtc_hijacker->on_stop_play(session_, this, req_);
Expand Down Expand Up @@ -534,10 +532,6 @@ srs_error_t SrsRtcPlayStream::start()
return srs_error_wrap(err, "rtc_sender");
}

// The timer for play, process TWCC in the future.
// @see SrsRtcPlayStream::on_timer()
_srs_hybrid->timer1s()->subscribe(this);

if ((err = pli_worker_->start()) != srs_success) {
return srs_error_wrap(err, "start pli worker");
}
Expand Down Expand Up @@ -722,17 +716,6 @@ void SrsRtcPlayStream::set_all_tracks_status(bool status)
srs_trace("RTC: Init tracks %s ok", merged_log.str().c_str());
}

srs_error_t SrsRtcPlayStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

if (!is_started) {
return err;
}

return err;
}

srs_error_t SrsRtcPlayStream::on_rtcp(SrsRtcpCommon* rtcp)
{
if(SrsRtcpType_rr == rtcp->type()) {
Expand Down Expand Up @@ -893,6 +876,85 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci
return err;
}

SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p) : p_(p)
{
_srs_hybrid->timer1s()->subscribe(this);
}

SrsRtcPublishRtcpTimer::~SrsRtcPublishRtcpTimer()
{
_srs_hybrid->timer1s()->unsubscribe(this);
}

srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

++_srs_pps_pub->sugar;

if (!p_->is_started) {
return err;
}

// For RR and RRTR.
++_srs_pps_rr->sugar;

if ((err = p_->send_rtcp_rr()) != srs_success) {
srs_warn("RR err %s", srs_error_desc(err).c_str());
srs_freep(err);
}

if ((err = p_->send_rtcp_xr_rrtr()) != srs_success) {
srs_warn("XR err %s", srs_error_desc(err).c_str());
srs_freep(err);
}

return err;
}

SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(SrsRtcPublishStream* p) : p_(p)
{
_srs_hybrid->timer100ms()->subscribe(this);
}

SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer()
{
_srs_hybrid->timer100ms()->unsubscribe(this);
}

srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

++_srs_pps_pub->sugar;

if (!p_->is_started) {
return err;
}

// For TWCC feedback.
if (!p_->twcc_enabled_) {
return err;
}

++_srs_pps_twcc->sugar;

// If circuit-breaker is dropping packet, disable TWCC.
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
return err;
}

// We should not depends on the received packet,
// instead we should send feedback every Nms.
if ((err = p_->send_periodic_twcc()) != srs_success) {
srs_warn("TWCC err %s", srs_error_desc(err).c_str());
srs_freep(err);
}

return err;
}

SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid)
{
cid_ = cid;
Expand All @@ -916,11 +978,15 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon

pli_worker_ = new SrsRtcPLIWorker(this);
last_time_send_twcc_ = 0;

timer_rtcp_ = new SrsRtcPublishRtcpTimer(this);
timer_twcc_ = new SrsRtcPublishTwccTimer(this);
}

SrsRtcPublishStream::~SrsRtcPublishStream()
{
_srs_hybrid->timer100ms()->unsubscribe(this);
srs_freep(timer_rtcp_);
srs_freep(timer_twcc_);

// TODO: FIXME: Should remove and delete source.
if (source) {
Expand Down Expand Up @@ -1056,10 +1122,6 @@ srs_error_t SrsRtcPublishStream::start()
return err;
}

// For publisher timer, such as TWCC and RR.
// @see SrsRtcPublishStream::on_timer()
_srs_hybrid->timer100ms()->subscribe(this);

if ((err = source->on_publish()) != srs_success) {
return srs_error_wrap(err, "on publish");
}
Expand Down Expand Up @@ -1529,52 +1591,6 @@ srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId
return err;
}

srs_error_t SrsRtcPublishStream::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

++_srs_pps_pub->sugar;

if (!is_started) {
return err;
}

// For RR and RRTR.
if (true) {
++_srs_pps_rr->sugar;

if ((err = send_rtcp_rr()) != srs_success) {
srs_warn("RR err %s", srs_error_desc(err).c_str());
srs_freep(err);
}

if ((err = send_rtcp_xr_rrtr()) != srs_success) {
srs_warn("XR err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}

// For TWCC feedback.
if (twcc_enabled_) {
++_srs_pps_twcc->sugar;

// If circuit-breaker is dropping packet, disable TWCC.
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
return err;
}

// We should not depends on the received packet,
// instead we should send feedback every Nms.
if ((err = send_periodic_twcc()) != srs_success) {
srs_warn("TWCC err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}

return err;
}

void SrsRtcPublishStream::simulate_nack_drop(int nn)
{
nn_simulate_nack_drop = nn;
Expand Down Expand Up @@ -1692,6 +1708,45 @@ ISrsRtcConnectionHijacker::~ISrsRtcConnectionHijacker()
{
}

SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection* p) : p_(p)
{
_srs_hybrid->timer20ms()->subscribe(this);
}

SrsRtcConnectionNackTimer::~SrsRtcConnectionNackTimer()
{
_srs_hybrid->timer20ms()->unsubscribe(this);
}

srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

if (!p_->nack_enabled_) {
return err;
}

++_srs_pps_conn->sugar;

// If circuit-breaker is enabled, disable nack.
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
return err;
}

std::map<std::string, SrsRtcPublishStream*>::iterator it;
for (it = p_->publishers_.begin(); it != p_->publishers_.end(); it++) {
SrsRtcPublishStream* publisher = it->second;

if ((err = publisher->check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}

return err;
}

SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
{
req = NULL;
Expand Down Expand Up @@ -1719,16 +1774,17 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid)
pli_epp = new SrsErrorPithyPrint();

nack_enabled_ = false;
timer_nack_ = new SrsRtcConnectionNackTimer(this);

_srs_rtc_manager->subscribe(this);
}

SrsRtcConnection::~SrsRtcConnection()
{
_srs_hybrid->timer20ms()->unsubscribe(this);

_srs_rtc_manager->unsubscribe(this);

srs_freep(timer_nack_);

// Cleanup publishers.
for(map<string, SrsRtcPublishStream*>::iterator it = publishers_.begin(); it != publishers_.end(); ++it) {
SrsRtcPublishStream* publisher = it->second;
Expand Down Expand Up @@ -1972,10 +2028,6 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st
return srs_error_wrap(err, "init");
}

// The RTC connection start a timer, handle nacks.
// @see SrsRtcConnection::on_timer()
_srs_hybrid->timer20ms()->subscribe(this);

// TODO: FIXME: Support reload.
session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost);
last_stun_time = srs_get_system_time();
Expand Down Expand Up @@ -2351,35 +2403,6 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt)
sendonly_skt = addr_cache;
}

srs_error_t SrsRtcConnection::on_timer(srs_utime_t interval)
{
srs_error_t err = srs_success;

if (!nack_enabled_) {
return err;
}

++_srs_pps_conn->sugar;

// If circuit-breaker is enabled, disable nack.
if (_srs_circuit_breaker->hybrid_critical_water_level()) {
++_srs_pps_snack4->sugar;
return err;
}

std::map<std::string, SrsRtcPublishStream*>::iterator it;
for (it = publishers_.begin(); it != publishers_.end(); it++) {
SrsRtcPublishStream* publisher = it->second;

if ((err = publisher->check_send_nacks()) != srs_success) {
srs_warn("ignore nack err %s", srs_error_desc(err).c_str());
srs_freep(err);
}
}

return err;
}

srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data)
{
srs_error_t err = srs_success;
Expand Down
Loading

0 comments on commit 2ad24b2

Please sign in to comment.