diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 3ad0c32175..437e4d913c 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-06-13, Merge [#4083](https://github.com/ossrs/srs/pull/4083): SmartPtr: Use shared ptr in RTC TCP connection. v6.0.127 (#4083) * v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080) * v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057) * v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 243dba6ef9..0b3e9c0d41 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1924,6 +1924,7 @@ std::string SrsRtcConnection::desc() void SrsRtcConnection::expire() { + // TODO: FIXME: Should set session to expired and remove it by heartbeat checking. Should not remove it directly. _srs_rtc_manager->remove(this); } diff --git a/trunk/src/app/srs_app_rtc_network.cpp b/trunk/src/app/srs_app_rtc_network.cpp index d0554999ee..3725afaf10 100644 --- a/trunk/src/app/srs_app_rtc_network.cpp +++ b/trunk/src/app/srs_app_rtc_network.cpp @@ -436,7 +436,7 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite) return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT); } -SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) +SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) : owner_(new SrsRtcTcpConn()) { conn_ = conn; delta_ = delta; @@ -444,11 +444,11 @@ SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* de transport_ = new SrsSecurityTransport(this); peer_port_ = 0; state_ = SrsRtcNetworkStateInit; - owner_ = NULL; } SrsRtcTcpNetwork::~SrsRtcTcpNetwork() { + owner_->interrupt(); srs_freep(transport_); } @@ -694,36 +694,54 @@ void SrsRtcTcpNetwork::dispose() #define SRS_RTC_TCP_PACKET_MAX 1500 -SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm) +SrsRtcTcpConn::SrsRtcTcpConn() +{ + wrapper_ = NULL; + owner_coroutine_ = NULL; + owner_cid_ = NULL; + cid_ = _srs_context->get_id(); + + pkt_ = NULL; + delta_ = NULL; + skt_ = NULL; +} + +SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port) : SrsRtcTcpConn() { - manager_ = cm; ip_ = cip; port_ = port; skt_ = skt; delta_ = new SrsNetworkDelta(); delta_->set_io(skt_, skt_); - trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id()); session_ = NULL; pkt_ = new char[SRS_RTC_TCP_PACKET_MAX]; - _srs_rtc_manager->subscribe(this); } SrsRtcTcpConn::~SrsRtcTcpConn() { - _srs_rtc_manager->unsubscribe(this); - trd_->interrupt(); - srs_freep(trd_); - srs_freepa(pkt_); srs_freep(delta_); srs_freep(skt_); } +void SrsRtcTcpConn::setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid) +{ + wrapper_ = wrapper; + owner_coroutine_ = owner_coroutine; + owner_cid_ = owner_cid; +} + ISrsKbpsDelta* SrsRtcTcpConn::delta() { return delta_; } +void SrsRtcTcpConn::interrupt() +{ + session_ = NULL; + if (owner_coroutine_) owner_coroutine_->interrupt(); +} + std::string SrsRtcTcpConn::desc() { return "Tcp"; @@ -731,7 +749,7 @@ std::string SrsRtcTcpConn::desc() const SrsContextId& SrsRtcTcpConn::get_id() { - return trd_->cid(); + return cid_; } std::string SrsRtcTcpConn::remote_ip() @@ -739,9 +757,9 @@ std::string SrsRtcTcpConn::remote_ip() return ip_; } -srs_error_t SrsRtcTcpConn::start() +void SrsRtcTcpConn::on_executor_done(ISrsInterruptable* executor) { - return trd_->start(); + owner_coroutine_ = NULL; } srs_error_t SrsRtcTcpConn::cycle() @@ -752,15 +770,10 @@ srs_error_t SrsRtcTcpConn::cycle() SrsStatistic::instance()->on_disconnect(get_id().c_str(), err); SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), delta_); - // TODO: FIXME: Should manage RTC TCP connection by _srs_rtc_manager. - // Because we use manager to manage this object, not the http connection object, so we must remove it here. - manager_->remove(this); - - // TODO: FIXME: When TCP connection(transport) closed, should notify session to dispose, should not free them simultaneously. // Only remove session when network is established, because client might use other UDP network. if(session_ && session_->tcp()->is_establelished()) { session_->tcp()->set_state(SrsRtcNetworkStateClosed); - _srs_rtc_manager->remove(session_); + session_->expire(); } // For HTTP-API timeout, we think it's done successfully, @@ -801,13 +814,18 @@ srs_error_t SrsRtcTcpConn::do_cycle() { srs_error_t err = srs_success; + // Update all context id to cid of session. + _srs_context->set_id(cid_); + owner_cid_->set_cid(cid_); + if((err = handshake()) != srs_success) { return srs_error_wrap(err, "process rtc tcp pkt"); } // TODO: FIXME: Handle all bytes of TCP Connection. while(true) { - if((err = trd_->pull()) != srs_success) { + if (!owner_coroutine_) return err; + if ((err = owner_coroutine_->pull()) != srs_success) { return srs_error_wrap(err, "rtc tcp conn"); } @@ -859,11 +877,11 @@ srs_error_t SrsRtcTcpConn::handshake() // Should support only one TCP candidate. SrsRtcTcpNetwork* network = dynamic_cast(session->tcp()); - if (!network->owner()) { - network->set_owner(this); + if (network->owner().get() != this) { + network->set_owner(*wrapper_); session_ = session; } - if (network->owner() != this) { + if (network->owner().get() != this) { return srs_error_new(ERROR_RTC_TCP_UNIQUE, "only support one network"); } @@ -939,21 +957,3 @@ srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt) return srs_error_new(ERROR_RTC_UDP, "unknown packet"); } -void SrsRtcTcpConn::on_before_dispose(ISrsResource* c) -{ - if (!session_) return; - - SrsRtcConnection* conn = dynamic_cast(c); - if(conn == session_) { - session_ = NULL; - // the related rtc connection will be disposed - srs_trace("RTC: tcp conn diposing, because of rtc connection"); - trd_->interrupt(); - } -} - -void SrsRtcTcpConn::on_disposing(ISrsResource* c) -{ - return; -} - diff --git a/trunk/src/app/srs_app_rtc_network.hpp b/trunk/src/app/srs_app_rtc_network.hpp index 309dba4b48..82282347fe 100644 --- a/trunk/src/app/srs_app_rtc_network.hpp +++ b/trunk/src/app/srs_app_rtc_network.hpp @@ -180,7 +180,7 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork private: // The DTLS transport over this network. ISrsRtcTransport* transport_; - SrsRtcTcpConn* owner_; + SrsSharedResource owner_; private: std::string peer_ip_; int peer_port_; @@ -189,8 +189,8 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta); virtual ~SrsRtcTcpNetwork(); public: - void set_owner(SrsRtcTcpConn* v) { owner_ = v; } - SrsRtcTcpConn* owner() { return owner_; } + void set_owner(SrsSharedResource v) { owner_ = v; } + SrsSharedResource owner() { return owner_; } void update_sendonly_socket(ISrsProtocolReadWriter* skt); //ISrsRtcNetwork public: @@ -232,13 +232,9 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork }; // For WebRTC over TCP. -class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler, public ISrsDisposingHandler +class SrsRtcTcpConn : public ISrsConnection, public ISrsCoroutineHandler, public ISrsExecutorHandler { private: - // The manager object to manage the connection. - ISrsResourceManager* manager_; - // Use a coroutine to serve the TCP connection. - SrsCoroutine* trd_; // The ip and port of client. std::string ip_; int port_; @@ -249,11 +245,24 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo ISrsProtocolReadWriter* skt_; // Packet cache. char* pkt_; -public: - SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm); +private: + // The shared resource which own this object, we should never free it because it's managed by shared ptr. + SrsSharedResource* wrapper_; + // The owner coroutine, allow user to interrupt the loop. + ISrsInterruptable* owner_coroutine_; + ISrsContextIdSetter* owner_cid_; + SrsContextId cid_; +public: + SrsRtcTcpConn(); + SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port); virtual ~SrsRtcTcpConn(); +public: + // Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id. + void setup_owner(SrsSharedResource* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid); public: ISrsKbpsDelta* delta(); + // Interrupt transport by session. + void interrupt(); // Interface ISrsResource. public: virtual std::string desc(); @@ -261,9 +270,9 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo // Interface ISrsConnection. public: virtual std::string remote_ip(); -// Interface ISrsStartable +// Interface ISrsExecutorHandler public: - virtual srs_error_t start(); + virtual void on_executor_done(ISrsInterruptable* executor); // Interface ISrsCoroutineHandler public: virtual srs_error_t cycle(); @@ -273,10 +282,6 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo srs_error_t read_packet(char* pkt, int* nb_pkt); srs_error_t on_stun(char* pkt, int nb_pkt); srs_error_t on_tcp_pkt(char* pkt, int nb_pkt); -// Interface of ISrsDisposingHandler -public: - virtual void on_before_dispose(ISrsResource* c); - virtual void on_disposing(ISrsResource* c); }; #endif diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 9e3e466f0e..9e70ea7a69 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -59,6 +59,7 @@ const int kVideoSamplerate = 90000; using namespace std; +#ifdef SRS_FFMPEG_FIT // The RTP payload max size, reserved some paddings for SRTP as such: // kRtpPacketSize = kRtpMaxPayloadSize + paddings // For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400, @@ -68,6 +69,7 @@ using namespace std; // so we set kRtpMaxPayloadSize = 1200. // see @doc https://groups.google.com/g/discuss-webrtc/c/gH5ysR3SoZI const int kRtpMaxPayloadSize = kRtpPacketSize - 300; +#endif // TODO: Add this function into SrsRtpMux class. srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf) diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index c5f6500751..af962f38b1 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -39,6 +39,7 @@ using namespace std; #include #ifdef SRS_RTC #include +#include #endif #ifdef SRS_GB28181 #include @@ -1193,8 +1194,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf if (nn == 10 && b[0] == 0 && b[2] == 0 && b[3] == 1 && b[1] - b[5] == 20 && b[6] == 0x21 && b[7] == 0x12 && b[8] == 0xa4 && b[9] == 0x42 ) { - // TODO: FIXME: Should manage this connection by _srs_rtc_manager - resource = new SrsRtcTcpConn(io, ip, port, this); + resource = new SrsRtcTcpConn(io, ip, port); } else { resource = new SrsHttpxConn(listener == http_listener_, this, io, http_server, ip, port); } @@ -1213,8 +1213,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf resource = new SrsHttpxConn(is_https, this, new SrsTcpConnection(stfd2), http_server, ip, port); #ifdef SRS_RTC } else if (listener == webrtc_listener_) { - // TODO: FIXME: Should manage this connection by _srs_rtc_manager - resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port, this); + resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port); #endif } else if (listener == exporter_listener_) { // TODO: FIXME: Maybe should support https metrics. @@ -1227,11 +1226,28 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf } } +#ifdef SRS_RTC + // For RTC TCP connection, use resource executor to manage the resource. + SrsRtcTcpConn* raw_conn = dynamic_cast(resource); + if (raw_conn) { + SrsSharedResource* conn = new SrsSharedResource(raw_conn); + SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_rtc_manager, conn, raw_conn, raw_conn); + raw_conn->setup_owner(conn, executor, executor); + if ((err = executor->start()) != srs_success) { + srs_freep(executor); + return srs_error_wrap(err, "start executor"); + } + return err; + } +#endif + // Use connection manager to manage all the resources. + srs_assert(resource); conn_manager->add(resource); // If connection is a resource to start, start a coroutine to handle it. ISrsStartable* conn = dynamic_cast(resource); + srs_assert(conn); if ((err = conn->start()) != srs_success) { return srs_error_wrap(err, "start conn coroutine"); } diff --git a/trunk/src/app/srs_app_stream_bridge.cpp b/trunk/src/app/srs_app_stream_bridge.cpp index 1c391dd83e..543cd91f37 100644 --- a/trunk/src/app/srs_app_stream_bridge.cpp +++ b/trunk/src/app/srs_app_stream_bridge.cpp @@ -8,7 +8,6 @@ #include #include -#include #include #include #include @@ -63,13 +62,12 @@ srs_error_t SrsFrameToRtmpBridge::on_frame(SrsSharedPtrMessage* frame) return source_->on_frame(frame); } +#ifdef SRS_RTC SrsFrameToRtcBridge::SrsFrameToRtcBridge(SrsRtcSource* source) { -#ifdef SRS_RTC source_ = source; -#endif -#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) +#if defined(SRS_FFMPEG_FIT) uint32_t audio_ssrc = 0; uint8_t audio_payload_type = 0; uint32_t video_ssrc = 0; @@ -119,12 +117,10 @@ srs_error_t SrsFrameToRtcBridge::on_publish() { srs_error_t err = srs_success; -#ifdef SRS_RTC // TODO: FIXME: Should sync with bridge? if ((err = source_->on_publish()) != srs_success) { return srs_error_wrap(err, "source publish"); } -#endif #ifdef SRS_FFMPEG_FIT if ((err = rtp_builder_->on_publish()) != srs_success) { @@ -141,11 +137,9 @@ void SrsFrameToRtcBridge::on_unpublish() rtp_builder_->on_unpublish(); #endif -#ifdef SRS_RTC // @remark This bridge might be disposed here, so never use it. // TODO: FIXME: Should sync with bridge? source_->on_unpublish(); -#endif } srs_error_t SrsFrameToRtcBridge::on_frame(SrsSharedPtrMessage* frame) @@ -159,12 +153,9 @@ srs_error_t SrsFrameToRtcBridge::on_frame(SrsSharedPtrMessage* frame) srs_error_t SrsFrameToRtcBridge::on_rtp(SrsRtpPacket* pkt) { -#ifdef SRS_RTC return source_->on_rtp(pkt); -#else - return srs_success; -#endif } +#endif SrsCompositeBridge::SrsCompositeBridge() { diff --git a/trunk/src/app/srs_app_stream_bridge.hpp b/trunk/src/app/srs_app_stream_bridge.hpp index 6b85bb7a55..bbf42d3295 100644 --- a/trunk/src/app/srs_app_stream_bridge.hpp +++ b/trunk/src/app/srs_app_stream_bridge.hpp @@ -19,7 +19,6 @@ class SrsLiveSource; class SrsRtcSource; class SrsRtmpFormat; class SrsMetaCache; -class SrsAudioTranscoder; class SrsRtpPacket; class SrsRtcRtpBuilder; @@ -55,12 +54,16 @@ class SrsFrameToRtmpBridge : public ISrsStreamBridge virtual srs_error_t on_frame(SrsSharedPtrMessage* frame); }; +#ifdef SRS_RTC // A bridge to covert AV frame to WebRTC stream. class SrsFrameToRtcBridge : public ISrsStreamBridge { private: SrsRtcSource* source_; +private: +#if defined(SRS_FFMPEG_FIT) SrsRtcRtpBuilder* rtp_builder_; +#endif public: SrsFrameToRtcBridge(SrsRtcSource* source); virtual ~SrsFrameToRtcBridge(); @@ -71,6 +74,7 @@ class SrsFrameToRtcBridge : public ISrsStreamBridge virtual srs_error_t on_frame(SrsSharedPtrMessage* frame); srs_error_t on_rtp(SrsRtpPacket* pkt); }; +#endif // A bridge chain, a set of bridges. class SrsCompositeBridge : public ISrsStreamBridge diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index d5c6e42297..3aa39d124d 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -268,10 +268,9 @@ srs_error_t SrsCircuitBreaker::on_timer(srs_utime_t interval) // The hybrid thread cpu and memory. float thread_percent = stat->percent * 100; - static char buf[128]; - string snk_desc; #ifdef SRS_RTC + static char buf[128]; if (_srs_pps_snack2->r10s()) { snprintf(buf, sizeof(buf), ", snk=%d,%d,%d", _srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), _srs_pps_snack4->r10s() // NACK packet,seqs sent. diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index e8bac6484f..8232d45f71 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 126 +#define VERSION_REVISION 127 #endif