diff --git a/README.md b/README.md index 5de795541b..3bf5f90ac0 100755 --- a/README.md +++ b/README.md @@ -176,6 +176,7 @@ The ports used by SRS: ## V4 changes +* v4.0, 2021-05-08, Refine shared fast timer. 4.0.105 * v4.0, 2021-05-08, Refine global or thread-local variables initialize. 4.0.104 * v4.0, 2021-05-07, RTC: Support circuit breaker. 4.0.103 * v4.0, 2021-05-07, RTC: Refine play stream find track. 4.0.102 diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index f6801a77f2..0c00142354 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -392,8 +392,6 @@ SrsRtcPlayStream::SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid) SrsRtcPlayStream::~SrsRtcPlayStream() { - _srs_hybrid->timer1s()->unsubscribe(this); - // TODO: FIXME: Should not do callback in de-constructor? if (_srs_rtc_hijacker) { _srs_rtc_hijacker->on_stop_play(session_, this, req_); @@ -534,10 +532,6 @@ srs_error_t SrsRtcPlayStream::start() return srs_error_wrap(err, "rtc_sender"); } - // The timer for play, process TWCC in the future. - // @see SrsRtcPlayStream::on_timer() - _srs_hybrid->timer1s()->subscribe(this); - if ((err = pli_worker_->start()) != srs_success) { return srs_error_wrap(err, "start pli worker"); } @@ -722,17 +716,6 @@ void SrsRtcPlayStream::set_all_tracks_status(bool status) srs_trace("RTC: Init tracks %s ok", merged_log.str().c_str()); } -srs_error_t SrsRtcPlayStream::on_timer(srs_utime_t interval) -{ - srs_error_t err = srs_success; - - if (!is_started) { - return err; - } - - return err; -} - srs_error_t SrsRtcPlayStream::on_rtcp(SrsRtcpCommon* rtcp) { if(SrsRtcpType_rr == rtcp->type()) { @@ -893,6 +876,85 @@ srs_error_t SrsRtcPlayStream::do_request_keyframe(uint32_t ssrc, SrsContextId ci return err; } +SrsRtcPublishRtcpTimer::SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p) : p_(p) +{ + _srs_hybrid->timer1s()->subscribe(this); +} + +SrsRtcPublishRtcpTimer::~SrsRtcPublishRtcpTimer() +{ + _srs_hybrid->timer1s()->unsubscribe(this); +} + +srs_error_t SrsRtcPublishRtcpTimer::on_timer(srs_utime_t interval) +{ + srs_error_t err = srs_success; + + ++_srs_pps_pub->sugar; + + if (!p_->is_started) { + return err; + } + + // For RR and RRTR. + ++_srs_pps_rr->sugar; + + if ((err = p_->send_rtcp_rr()) != srs_success) { + srs_warn("RR err %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + + if ((err = p_->send_rtcp_xr_rrtr()) != srs_success) { + srs_warn("XR err %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + + return err; +} + +SrsRtcPublishTwccTimer::SrsRtcPublishTwccTimer(SrsRtcPublishStream* p) : p_(p) +{ + _srs_hybrid->timer100ms()->subscribe(this); +} + +SrsRtcPublishTwccTimer::~SrsRtcPublishTwccTimer() +{ + _srs_hybrid->timer100ms()->unsubscribe(this); +} + +srs_error_t SrsRtcPublishTwccTimer::on_timer(srs_utime_t interval) +{ + srs_error_t err = srs_success; + + ++_srs_pps_pub->sugar; + + if (!p_->is_started) { + return err; + } + + // For TWCC feedback. + if (!p_->twcc_enabled_) { + return err; + } + + ++_srs_pps_twcc->sugar; + + // If circuit-breaker is dropping packet, disable TWCC. + if (_srs_circuit_breaker->hybrid_critical_water_level()) { + ++_srs_pps_snack4->sugar; + return err; + } + + // We should not depends on the received packet, + // instead we should send feedback every Nms. + if ((err = p_->send_periodic_twcc()) != srs_success) { + srs_warn("TWCC err %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + + return err; +} + SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsContextId& cid) { cid_ = cid; @@ -916,11 +978,15 @@ SrsRtcPublishStream::SrsRtcPublishStream(SrsRtcConnection* session, const SrsCon pli_worker_ = new SrsRtcPLIWorker(this); last_time_send_twcc_ = 0; + + timer_rtcp_ = new SrsRtcPublishRtcpTimer(this); + timer_twcc_ = new SrsRtcPublishTwccTimer(this); } SrsRtcPublishStream::~SrsRtcPublishStream() { - _srs_hybrid->timer100ms()->unsubscribe(this); + srs_freep(timer_rtcp_); + srs_freep(timer_twcc_); // TODO: FIXME: Should remove and delete source. if (source) { @@ -1056,10 +1122,6 @@ srs_error_t SrsRtcPublishStream::start() return err; } - // For publisher timer, such as TWCC and RR. - // @see SrsRtcPublishStream::on_timer() - _srs_hybrid->timer100ms()->subscribe(this); - if ((err = source->on_publish()) != srs_success) { return srs_error_wrap(err, "on publish"); } @@ -1529,52 +1591,6 @@ srs_error_t SrsRtcPublishStream::do_request_keyframe(uint32_t ssrc, SrsContextId return err; } -srs_error_t SrsRtcPublishStream::on_timer(srs_utime_t interval) -{ - srs_error_t err = srs_success; - - ++_srs_pps_pub->sugar; - - if (!is_started) { - return err; - } - - // For RR and RRTR. - if (true) { - ++_srs_pps_rr->sugar; - - if ((err = send_rtcp_rr()) != srs_success) { - srs_warn("RR err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - - if ((err = send_rtcp_xr_rrtr()) != srs_success) { - srs_warn("XR err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - } - - // For TWCC feedback. - if (twcc_enabled_) { - ++_srs_pps_twcc->sugar; - - // If circuit-breaker is dropping packet, disable TWCC. - if (_srs_circuit_breaker->hybrid_critical_water_level()) { - ++_srs_pps_snack4->sugar; - return err; - } - - // We should not depends on the received packet, - // instead we should send feedback every Nms. - if ((err = send_periodic_twcc()) != srs_success) { - srs_warn("TWCC err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - } - - return err; -} - void SrsRtcPublishStream::simulate_nack_drop(int nn) { nn_simulate_nack_drop = nn; @@ -1692,6 +1708,45 @@ ISrsRtcConnectionHijacker::~ISrsRtcConnectionHijacker() { } +SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection* p) : p_(p) +{ + _srs_hybrid->timer20ms()->subscribe(this); +} + +SrsRtcConnectionNackTimer::~SrsRtcConnectionNackTimer() +{ + _srs_hybrid->timer20ms()->unsubscribe(this); +} + +srs_error_t SrsRtcConnectionNackTimer::on_timer(srs_utime_t interval) +{ + srs_error_t err = srs_success; + + if (!p_->nack_enabled_) { + return err; + } + + ++_srs_pps_conn->sugar; + + // If circuit-breaker is enabled, disable nack. + if (_srs_circuit_breaker->hybrid_critical_water_level()) { + ++_srs_pps_snack4->sugar; + return err; + } + + std::map::iterator it; + for (it = p_->publishers_.begin(); it != p_->publishers_.end(); it++) { + SrsRtcPublishStream* publisher = it->second; + + if ((err = publisher->check_send_nacks()) != srs_success) { + srs_warn("ignore nack err %s", srs_error_desc(err).c_str()); + srs_freep(err); + } + } + + return err; +} + SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) { req = NULL; @@ -1719,16 +1774,17 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) pli_epp = new SrsErrorPithyPrint(); nack_enabled_ = false; + timer_nack_ = new SrsRtcConnectionNackTimer(this); _srs_rtc_manager->subscribe(this); } SrsRtcConnection::~SrsRtcConnection() { - _srs_hybrid->timer20ms()->unsubscribe(this); - _srs_rtc_manager->unsubscribe(this); + srs_freep(timer_nack_); + // Cleanup publishers. for(map::iterator it = publishers_.begin(); it != publishers_.end(); ++it) { SrsRtcPublishStream* publisher = it->second; @@ -1972,10 +2028,6 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st return srs_error_wrap(err, "init"); } - // The RTC connection start a timer, handle nacks. - // @see SrsRtcConnection::on_timer() - _srs_hybrid->timer20ms()->subscribe(this); - // TODO: FIXME: Support reload. session_timeout = _srs_config->get_rtc_stun_timeout(req->vhost); last_stun_time = srs_get_system_time(); @@ -2351,35 +2403,6 @@ void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) sendonly_skt = addr_cache; } -srs_error_t SrsRtcConnection::on_timer(srs_utime_t interval) -{ - srs_error_t err = srs_success; - - if (!nack_enabled_) { - return err; - } - - ++_srs_pps_conn->sugar; - - // If circuit-breaker is enabled, disable nack. - if (_srs_circuit_breaker->hybrid_critical_water_level()) { - ++_srs_pps_snack4->sugar; - return err; - } - - std::map::iterator it; - for (it = publishers_.begin(); it != publishers_.end(); it++) { - SrsRtcPublishStream* publisher = it->second; - - if ((err = publisher->check_send_nacks()) != srs_success) { - srs_warn("ignore nack err %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - } - - return err; -} - srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 66eaada694..b57ef5b341 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -66,6 +66,7 @@ class SrsPithyPrint; class SrsStatistic; class SrsRtcUserConfig; class SrsRtcSendTrack; +class SrsRtcPublishStream; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -213,7 +214,7 @@ class SrsRtcPLIWorker : virtual public ISrsCoroutineHandler // A RTC play stream, client pull and play stream from SRS. class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler - , public ISrsFastTimer, public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback + , public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback { private: SrsContextId cid_; @@ -269,9 +270,6 @@ class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler public: // Directly set the status of track, generally for init to set the default value. void set_all_tracks_status(bool status); -// interface ISrsFastTimer -private: - srs_error_t on_timer(srs_utime_t interval); public: srs_error_t on_rtcp(SrsRtcpCommon* rtcp); private: @@ -285,10 +283,41 @@ class SrsRtcPlayStream : public ISrsCoroutineHandler, public ISrsReloadHandler virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); }; +// A fast timer for publish stream, for RTCP feedback. +class SrsRtcPublishRtcpTimer : public ISrsFastTimer +{ +private: + SrsRtcPublishStream* p_; +public: + SrsRtcPublishRtcpTimer(SrsRtcPublishStream* p); + virtual ~SrsRtcPublishRtcpTimer(); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval); +}; + +// A fast timer for publish stream, for TWCC feedback. +class SrsRtcPublishTwccTimer : public ISrsFastTimer +{ +private: + SrsRtcPublishStream* p_; +public: + SrsRtcPublishTwccTimer(SrsRtcPublishStream* p); + virtual ~SrsRtcPublishTwccTimer(); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval); +}; + // A RTC publish stream, client push and publish stream to SRS. -class SrsRtcPublishStream : public ISrsFastTimer, public ISrsRtpPacketDecodeHandler +class SrsRtcPublishStream : public ISrsRtpPacketDecodeHandler , public ISrsRtcPublishStream, public ISrsRtcPLIWorkerHandler { +private: + friend class SrsRtcPublishRtcpTimer; + friend class SrsRtcPublishTwccTimer; + SrsRtcPublishRtcpTimer* timer_rtcp_; + SrsRtcPublishTwccTimer* timer_twcc_; private: SrsContextId cid_; uint64_t nn_audio_frames; @@ -353,9 +382,6 @@ class SrsRtcPublishStream : public ISrsFastTimer, public ISrsRtpPacketDecodeHand public: void request_keyframe(uint32_t ssrc); virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); -// interface ISrsFastTimer -private: - srs_error_t on_timer(srs_utime_t interval); public: void simulate_nack_drop(int nn); private: @@ -396,15 +422,31 @@ class ISrsRtcConnectionHijacker virtual srs_error_t on_dtls_done() = 0; }; +// A fast timer for conntion, for NACK feedback. +class SrsRtcConnectionNackTimer : public ISrsFastTimer +{ +private: + SrsRtcConnection* p_; +public: + SrsRtcConnectionNackTimer(SrsRtcConnection* p); + virtual ~SrsRtcConnectionNackTimer(); +// interface ISrsFastTimer +private: + srs_error_t on_timer(srs_utime_t interval); +}; + // A RTC Peer Connection, SDP level object. // // For performance, we use non-virtual public from resource, // see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a -class SrsRtcConnection : public ISrsResource, public ISrsFastTimer, public ISrsDisposingHandler +class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler { friend class SrsSecurityTransport; friend class SrsRtcPlayStream; friend class SrsRtcPublishStream; +private: + friend class SrsRtcConnectionNackTimer; + SrsRtcConnectionNackTimer* timer_nack_; public: bool disposing_; SrsRtcConnectionStatistic* stat_; @@ -513,9 +555,6 @@ class SrsRtcConnection : public ISrsResource, public ISrsFastTimer, public ISrsD bool is_alive(); void alive(); void update_sendonly_socket(SrsUdpMuxSocket* skt); -// interface ISrsFastTimer -private: - srs_error_t on_timer(srs_utime_t interval); public: // send rtcp srs_error_t send_rtcp(char *data, int nb_data); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index de2be5499f..1948b47981 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -644,13 +644,14 @@ srs_error_t SrsRtcStream::on_timer(srs_utime_t interval) return err; } - pli_elapsed_ += interval; - if (pli_elapsed_ < pli_for_rtmp_) { - return err; - } - // Request PLI and reset the timer. - pli_elapsed_ = 0; + if (true) { + pli_elapsed_ += interval; + if (pli_elapsed_ < pli_for_rtmp_) { + return err; + } + pli_elapsed_ = 0; + } for (int i = 0; i < (int)stream_desc_->video_track_descs_.size(); i++) { SrsRtcTrackDescription* desc = stream_desc_->video_track_descs_.at(i); diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index 56062eeec4..511b1fcad6 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 104 +#define VERSION_REVISION 105 #endif