From 24125b9770b61c5341863ca9c38b3084d2983e91 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 5 Nov 2020 15:08:36 +0800 Subject: [PATCH] For #1657: Refine connection arch, remove hierachy --- trunk/src/app/srs_app_caster_flv.cpp | 57 ++++++++++++++--- trunk/src/app/srs_app_caster_flv.hpp | 31 +++++++-- trunk/src/app/srs_app_http_api.cpp | 1 + trunk/src/app/srs_app_http_conn.cpp | 91 +++++++++++++++++++++++---- trunk/src/app/srs_app_http_conn.hpp | 73 ++++++++++++++------- trunk/src/app/srs_app_http_stream.cpp | 11 +++- trunk/src/app/srs_app_rtmp_conn.cpp | 1 + 7 files changed, 213 insertions(+), 52 deletions(-) diff --git a/trunk/src/app/srs_app_caster_flv.cpp b/trunk/src/app/srs_app_caster_flv.cpp index af8062f61f..a4e6cc50db 100644 --- a/trunk/src/app/srs_app_caster_flv.cpp +++ b/trunk/src/app/srs_app_caster_flv.cpp @@ -85,7 +85,7 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) srs_warn("empty ip for fd=%d", srs_netfd_fileno(stfd)); } - SrsHttpConn* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip, port); + ISrsStartableConneciton* conn = new SrsDynamicHttpConn(this, stfd, http_mux, ip, port); conns.push_back(conn); if ((err = conn->start()) != srs_success) { @@ -97,14 +97,14 @@ srs_error_t SrsAppCasterFlv::on_tcp_client(srs_netfd_t stfd) void SrsAppCasterFlv::remove(ISrsResource* c) { - SrsHttpConn* conn = dynamic_cast(c); + ISrsStartableConneciton* conn = dynamic_cast(c); - std::vector::iterator it; + std::vector::iterator it; if ((it = std::find(conns.begin(), conns.end(), conn)) != conns.end()) { conns.erase(it); } - // fixbug: SrsHttpConn for CasterFlv is not freed, which could cause memory leak + // fixbug: ISrsStartableConneciton for CasterFlv is not freed, which could cause memory leak // so, free conn which is not managed by SrsServer->conns; // @see: https://github.com/ossrs/srs/issues/826 manager->remove(c); @@ -141,23 +141,23 @@ srs_error_t SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessa return err; } -SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) +SrsDynamicHttpConn::SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int cport) { + manager = cm; sdk = NULL; pprint = SrsPithyPrint::create_caster(); + conn = new SrsHttpConn(this, fd, m, cip, cport); + ip = cip; + port = cport; } SrsDynamicHttpConn::~SrsDynamicHttpConn() { + srs_freep(conn); srs_freep(sdk); srs_freep(pprint); } -srs_error_t SrsDynamicHttpConn::on_got_http_message(ISrsHttpMessage* msg) -{ - return srs_success; -} - srs_error_t SrsDynamicHttpConn::proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o) { srs_error_t err = srs_success; @@ -249,6 +249,43 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod return err; } +srs_error_t SrsDynamicHttpConn::on_http_message(ISrsHttpMessage* msg) +{ + return srs_success; +} + +void SrsDynamicHttpConn::on_conn_done() +{ + // Because we use manager to manage this object, + // not the http connection object, so we must remove it here. + manager->remove(this); +} + +std::string SrsDynamicHttpConn::desc() +{ + return "DHttpConn"; +} + +std::string SrsDynamicHttpConn::remote_ip() +{ + return conn->remote_ip(); +} + +const SrsContextId& SrsDynamicHttpConn::get_id() +{ + return conn->get_id(); +} + +srs_error_t SrsDynamicHttpConn::start() +{ + return conn->start(); +} + +void SrsDynamicHttpConn::remark(int64_t* in, int64_t* out) +{ + conn->remark(in, out); +} + SrsHttpFileReader::SrsHttpFileReader(ISrsHttpResponseReader* h) { http = h; diff --git a/trunk/src/app/srs_app_caster_flv.hpp b/trunk/src/app/srs_app_caster_flv.hpp index aee43b3828..4123eb1ef4 100644 --- a/trunk/src/app/srs_app_caster_flv.hpp +++ b/trunk/src/app/srs_app_caster_flv.hpp @@ -53,7 +53,7 @@ class SrsAppCasterFlv : virtual public ISrsTcpHandler private: std::string output; SrsHttpServeMux* http_mux; - std::vector conns; + std::vector conns; SrsResourceManager* manager; public: SrsAppCasterFlv(SrsConfDirective* c); @@ -72,21 +72,44 @@ class SrsAppCasterFlv : virtual public ISrsTcpHandler }; // The dynamic http connection, never drop the body. -class SrsDynamicHttpConn : public SrsHttpConn +class SrsDynamicHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsHttpMessageHandler { private: + // The manager object to manage the connection. + ISrsResourceManager* manager; std::string output; SrsPithyPrint* pprint; SrsSimpleRtmpClient* sdk; + SrsHttpConn* conn; +private: + // The ip and port of client. + std::string ip; + int port; public: SrsDynamicHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, std::string cip, int port); virtual ~SrsDynamicHttpConn(); -public: - virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg); public: virtual srs_error_t proxy(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, std::string o); private: virtual srs_error_t do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec); +// Extract APIs from SrsTcpConnection. +// Interface ISrsHttpMessageHandler. +public: + virtual srs_error_t on_http_message(ISrsHttpMessage* msg); + virtual void on_conn_done(); +// Interface ISrsResource. +public: + virtual std::string desc(); +// Interface ISrsConnection. +public: + virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); +// Interface ISrsKbpsDelta +public: + virtual void remark(int64_t* in, int64_t* out); }; // The http wrapper for file reader, to read http post stream like a file. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index c7f9ce21e4..85dbce4f0b 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1840,6 +1840,7 @@ srs_error_t SrsHttpApi::cycle() srs_error_t err = do_cycle(); // Notify manager to remove it. + // Note that we create this object, so we use manager to remove it. manager->remove(this); // success. diff --git a/trunk/src/app/srs_app_http_conn.cpp b/trunk/src/app/srs_app_http_conn.cpp index 978e429731..27f1e68530 100644 --- a/trunk/src/app/srs_app_http_conn.cpp +++ b/trunk/src/app/srs_app_http_conn.cpp @@ -59,14 +59,22 @@ using namespace std; #include #include -SrsHttpConn::SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport) +ISrsHttpMessageHandler::ISrsHttpMessageHandler() +{ +} + +ISrsHttpMessageHandler::~ISrsHttpMessageHandler() +{ +} + +SrsHttpConn::SrsHttpConn(ISrsHttpMessageHandler* handler, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int cport) { parser = new SrsHttpParser(); cors = new SrsHttpCorsMux(); http_mux = m; + handler_ = handler; skt = new SrsTcpConnection(fd); - manager = cm; ip = cip; port = cport; create_time = srsu2ms(srs_get_system_time()); @@ -150,7 +158,7 @@ srs_error_t SrsHttpConn::do_cycle() last_req = hreq->to_request(hreq->host()); // may should discard the body. - if ((err = on_got_http_message(req)) != srs_success) { + if ((err = handler_->on_http_message(req)) != srs_success) { break; } @@ -176,6 +184,16 @@ srs_error_t SrsHttpConn::do_cycle() return err; } +ISrsHttpMessageHandler* SrsHttpConn::handler() +{ + return handler_; +} + +srs_error_t SrsHttpConn::pull() +{ + return trd->pull(); +} + srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; @@ -239,8 +257,8 @@ srs_error_t SrsHttpConn::cycle() { srs_error_t err = do_cycle(); - // Notify manager to remove it. - manager->remove(this); + // Notify handler to handle it. + handler_->on_conn_done(); // success. if (err == srs_success) { @@ -284,29 +302,41 @@ void SrsHttpConn::expire() trd->interrupt(); } -SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) : SrsHttpConn(cm, fd, m, cip, port) +SrsResponseOnlyHttpConn::SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, string cip, int port) { + manager = cm; + conn = new SrsHttpConn(this, fd, m, cip, port); + stfd = fd; } SrsResponseOnlyHttpConn::~SrsResponseOnlyHttpConn() { + srs_freep(conn); } srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq) { srs_error_t err = srs_success; + SrsStSocket skt; + + // We start a socket to read the stfd, which is writing by conn. + // It's ok, because conn never read it after processing the HTTP request. + if ((err = skt.initialize(stfd)) != srs_success) { + return srs_error_wrap(err, "init socket"); + } + // Check user interrupt by interval. - skt->set_recv_timeout(3 * SRS_UTIME_SECONDS); + skt.set_recv_timeout(3 * SRS_UTIME_SECONDS); // drop all request body. char body[4096]; while (true) { - if ((err = trd->pull()) != srs_success) { + if ((err = conn->pull()) != srs_success) { return srs_error_wrap(err, "timeout"); } - if ((err = skt->read(body, 4096, NULL)) != srs_success) { + if ((err = skt.read(body, 4096, NULL)) != srs_success) { // Because we use timeout to check trd state, so we should ignore any timeout. if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) { srs_freep(err); @@ -320,7 +350,7 @@ srs_error_t SrsResponseOnlyHttpConn::pop_message(ISrsHttpMessage** preq) return err; } -srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg) +srs_error_t SrsResponseOnlyHttpConn::on_http_message(ISrsHttpMessage* msg) { srs_error_t err = srs_success; @@ -343,9 +373,46 @@ srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg) return err; } -void SrsResponseOnlyHttpConn::expire() +void SrsResponseOnlyHttpConn::on_conn_done() +{ + // Because we use manager to manage this object, + // not the http connection object, so we must remove it here. + manager->remove(this); +} + +srs_error_t SrsResponseOnlyHttpConn::set_tcp_nodelay(bool v) +{ + return conn->set_tcp_nodelay(v); +} + +srs_error_t SrsResponseOnlyHttpConn::set_socket_buffer(srs_utime_t buffer_v) +{ + return conn->set_socket_buffer(buffer_v); +} + +std::string SrsResponseOnlyHttpConn::desc() +{ + return "ROHttpConn"; +} + +std::string SrsResponseOnlyHttpConn::remote_ip() +{ + return conn->remote_ip(); +} + +const SrsContextId& SrsResponseOnlyHttpConn::get_id() +{ + return conn->get_id(); +} + +srs_error_t SrsResponseOnlyHttpConn::start() +{ + return conn->start(); +} + +void SrsResponseOnlyHttpConn::remark(int64_t* in, int64_t* out) { - SrsHttpConn::expire(); + conn->remark(in, out); } SrsHttpServer::SrsHttpServer(SrsServer* svr) diff --git a/trunk/src/app/srs_app_http_conn.hpp b/trunk/src/app/srs_app_http_conn.hpp index d86593d332..af92295591 100644 --- a/trunk/src/app/srs_app_http_conn.hpp +++ b/trunk/src/app/srs_app_http_conn.hpp @@ -54,6 +54,21 @@ class SrsHttpMessage; class SrsHttpStreamServer; class SrsHttpStaticServer; +// The handler for HTTP message. +class ISrsHttpMessageHandler +{ +public: + ISrsHttpMessageHandler(); + virtual ~ISrsHttpMessageHandler(); +public: + // Handle the HTTP message msg, which may be parsed partially. + // For the static service or api, discard any body. + // For the stream caster, for instance, http flv streaming, may discard the flv header or not. + virtual srs_error_t on_http_message(ISrsHttpMessage* msg) = 0; + // When connection is destroy, should use manager to dispose it. + virtual void on_conn_done() = 0; +}; + // The http connection which request the static or stream content. class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsReloadHandler , virtual public ISrsCoroutineHandler, virtual public ISrsExpire @@ -62,6 +77,7 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsR SrsHttpParser* parser; ISrsHttpServeMux* http_mux; SrsHttpCorsMux* cors; + ISrsHttpMessageHandler* handler_; protected: SrsTcpConnection* skt; // Each connection start a green thread, @@ -71,8 +87,6 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsR std::string ip; int port; private: - // The manager object to manage the connection. - ISrsResourceManager* manager; // The connection total kbps. // not only the rtmp or http connection, all type of connection are // need to statistic the kbps of io. @@ -83,7 +97,7 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsR // for current connection to log self create time and calculate the living time. int64_t create_time; public: - SrsHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); + SrsHttpConn(ISrsHttpMessageHandler* handler, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); virtual ~SrsHttpConn(); // Interface ISrsResource. public: @@ -91,13 +105,13 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsR // Interface ISrsKbpsDelta public: virtual void remark(int64_t* in, int64_t* out); -protected: +private: virtual srs_error_t do_cycle(); -protected: - // When got http message, - // for the static service or api, discard any body. - // for the stream caster, for instance, http flv streaming, may discard the flv header or not. - virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg) = 0; +public: + // Get the HTTP message handler. + virtual ISrsHttpMessageHandler* handler(); + // Whether the connection coroutine is error or terminated. + virtual srs_error_t pull(); private: virtual srs_error_t process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r); // When the connection disconnect, call this method. @@ -115,19 +129,9 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsR virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); // Interface ISrsStartable public: - // Start the client green thread. - // when server get a client from listener, - // 1. server will create an concrete connection(for instance, RTMP connection), - // 2. then add connection to its connection manager, - // 3. start the client thread by invoke this start() - // when client cycle thread stop, invoke the on_thread_stop(), which will use server - // To remove the client by server->remove(this). virtual srs_error_t start(); // Interface ISrsOneCycleThreadHandler public: - // The thread cycle function, - // when serve connection completed, terminate the loop which will terminate the thread, - // thread will invoke the on_thread_stop() when it terminated. virtual srs_error_t cycle(); // Interface ISrsConnection. public: @@ -139,8 +143,13 @@ class SrsHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsR }; // Drop body of request, only process the response. -class SrsResponseOnlyHttpConn : public SrsHttpConn +class SrsResponseOnlyHttpConn : virtual public ISrsStartableConneciton, virtual public ISrsHttpMessageHandler { +private: + // The manager object to manage the connection. + ISrsResourceManager* manager; + SrsHttpConn* conn; + srs_netfd_t stfd; public: SrsResponseOnlyHttpConn(ISrsResourceManager* cm, srs_netfd_t fd, ISrsHttpServeMux* m, std::string cip, int port); virtual ~SrsResponseOnlyHttpConn(); @@ -151,11 +160,29 @@ class SrsResponseOnlyHttpConn : public SrsHttpConn // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 // @remark Should only used in HTTP-FLV streaming connection. virtual srs_error_t pop_message(ISrsHttpMessage** preq); +// Interface ISrsHttpMessageHandler. public: - virtual srs_error_t on_got_http_message(ISrsHttpMessage* msg); + virtual srs_error_t on_http_message(ISrsHttpMessage* msg); + virtual void on_conn_done(); +// Extract APIs from SrsTcpConnection. public: - // Set connection to expired. - virtual void expire(); + // Set socket option TCP_NODELAY. + virtual srs_error_t set_tcp_nodelay(bool v); + // Set socket option SO_SNDBUF in srs_utime_t. + virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v); +// Interface ISrsResource. +public: + virtual std::string desc(); +// Interface ISrsConnection. +public: + virtual std::string remote_ip(); + virtual const SrsContextId& get_id(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); +// Interface ISrsKbpsDelta +public: + virtual void remark(int64_t* in, int64_t* out); }; // The http server, use http stream or static server to serve requests. diff --git a/trunk/src/app/srs_app_http_stream.cpp b/trunk/src/app/srs_app_http_stream.cpp index 63370449cd..6170d7e16d 100755 --- a/trunk/src/app/srs_app_http_stream.cpp +++ b/trunk/src/app/srs_app_http_stream.cpp @@ -602,7 +602,7 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess // Use receive thread to accept the close event to avoid FD leak. // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 SrsHttpMessage* hr = dynamic_cast(r); - SrsResponseOnlyHttpConn* hc = dynamic_cast(hr->connection()); + SrsHttpConn* hc = dynamic_cast(hr->connection()); // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); @@ -622,7 +622,8 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess return srs_error_wrap(err, "encoder dump cache"); } } - + + // Try to use fast flv encoder, remember that it maybe NULL. SrsFlvStreamEncoder* ffe = dynamic_cast(enc); // Set the socket options for transport. @@ -638,7 +639,11 @@ srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMess return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep); } - SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); + // Note that the handler of hc now is rohc. + SrsResponseOnlyHttpConn* rohc = dynamic_cast(hc->handler()); + srs_assert(rohc); + + SrsHttpRecvThread* trd = new SrsHttpRecvThread(rohc); SrsAutoFree(SrsHttpRecvThread, trd); if ((err = trd->start()) != srs_success) { diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 8a8a00c310..aab392d5d6 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1442,6 +1442,7 @@ srs_error_t SrsRtmpConn::cycle() srs_error_t err = do_cycle(); // Notify manager to remove it. + // Note that we create this object, so we use manager to remove it. manager->remove(this); // success.