From 770d9591487dcd138b1a7859c40b7c912f02be93 Mon Sep 17 00:00:00 2001 From: winlin Date: Fri, 2 Sep 2022 22:39:19 +0800 Subject: [PATCH] WebRTC: Support config, listener and SDP for TCP transport. --- trunk/conf/full.conf | 21 ++ trunk/configure | 2 +- trunk/src/app/srs_app_config.cpp | 71 ++++- trunk/src/app/srs_app_config.hpp | 3 + trunk/src/app/srs_app_rtc_conn.cpp | 283 +++---------------- trunk/src/app/srs_app_rtc_conn.hpp | 74 +---- trunk/src/app/srs_app_rtc_network.cpp | 388 ++++++++++++++++++++++++++ trunk/src/app/srs_app_rtc_network.hpp | 172 ++++++++++++ trunk/src/app/srs_app_rtc_sdp.cpp | 16 +- trunk/src/app/srs_app_rtc_sdp.hpp | 3 +- trunk/src/app/srs_app_rtc_server.cpp | 61 +--- trunk/src/app/srs_app_rtc_server.hpp | 27 -- trunk/src/app/srs_app_server.cpp | 30 ++ trunk/src/app/srs_app_server.hpp | 2 + 14 files changed, 767 insertions(+), 386 deletions(-) create mode 100644 trunk/src/app/srs_app_rtc_network.cpp create mode 100644 trunk/src/app/srs_app_rtc_network.hpp diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 56afd5c27c..f7e5ee8023 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -302,6 +302,27 @@ rtc_server { # The udp listen port, we will reuse it for connections. # default: 8000 listen 8000; + # For WebRTC over TCP directly, not TURN, see https://github.com/ossrs/srs/issues/2852 + # Some network does not support UDP, or not very well, so we use TCP like HTTP/80 port for firewall traversing. + tcp { + # Whether enable WebRTC over TCP. + # Overwrite by env SRS_RTC_SERVER_TCP_ENABLED + # Default: off + enabled off; + # The TCP listen port for WebRTC. Highly recommend is some normally used ports, such as TCP/80, TCP/443, + # TCP/8000, TCP/8080 etc. However SRS default to TCP/8000 corresponding to UDP/8000. + # Overwrite by env SRS_RTC_SERVER_TCP_LISTEN + # Default: 8000 + listen 8000; + } + # The protocol for candidate to use, it can be: + # udp Generate UDP candidates. Note that UDP server is always enabled for WebRTC. + # tcp Generate TCP candidates. Fail if rtc_server.tcp(WebRTC over TCP) is disabled. + # all Generate UDP+TCP candidates. Ignore if rtc_server.tcp(WebRTC over TCP) is disabled. + # Note that if both are connected, we will use the first connected(DTLS done) one. + # Overwrite by env SRS_RTC_SERVER_PROTOCOL + # Default: udp + protocol udp; # The exposed candidate IPs, response in SDP candidate line. It can be: # * Retrieve server IP automatically, from all network interfaces. # $CANDIDATE Read the IP from ENV variable, use * if not set. diff --git a/trunk/configure b/trunk/configure index f2ba0495e6..317640733b 100755 --- a/trunk/configure +++ b/trunk/configure @@ -260,7 +260,7 @@ MODULE_FILES=("srs_app_server" "srs_app_conn" "srs_app_rtmp_conn" "srs_app_sourc "srs_app_ingest" "srs_app_ffmpeg" "srs_app_utility" "srs_app_edge" "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" - "srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call" + "srs_app_mpegts_udp" "srs_app_listener" "srs_app_async_call" "srs_app_rtc_network" "srs_app_caster_flv" "srs_app_latest_version" "srs_app_uuid" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" "srs_app_coworkers" "srs_app_hybrid" "srs_app_threads") diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index fa4899c445..7aac3ad53e 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2289,8 +2289,8 @@ srs_error_t SrsConfig::check_normal_config() SrsConfDirective* conf = root->get("rtc_server"); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { string n = conf->at(i)->name; - if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" - && n != "encrypt" && n != "reuseport" && n != "merge_nalus" && n != "black_hole" + if (n != "enabled" && n != "listen" && n != "dir" && n != "candidate" && n != "ecdsa" && n != "tcp" + && n != "encrypt" && n != "reuseport" && n != "merge_nalus" && n != "black_hole" && n != "protocol" && n != "ip_family" && n != "api_as_candidates" && n != "resolve_api_domain" && n != "keep_api_domain" && n != "use_auto_detect_network_ip") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal rtc_server.%s", n.c_str()); @@ -3701,6 +3701,73 @@ bool SrsConfig::get_use_auto_detect_network_ip() return SRS_CONF_PERFER_TRUE(conf->arg0()); } +bool SrsConfig::get_rtc_server_tcp_enabled() +{ + SRS_OVERWRITE_BY_ENV_BOOL("SRS_RTC_SERVER_TCP_ENABLED"); + + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("tcp"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("enabled"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +int SrsConfig::get_rtc_server_tcp_listen() +{ + SRS_OVERWRITE_BY_ENV_INT("SRS_RTC_SERVER_TCP_LISTEN"); + + static int DEFAULT = 8000; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("tcp"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("listen"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +std::string SrsConfig::get_rtc_server_protocol() +{ + SRS_OVERWRITE_BY_ENV_STRING("SRS_RTC_SERVER_PROTOCOL"); + + static string DEFAULT = "udp"; + + SrsConfDirective* conf = root->get("rtc_server"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("protocol"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return conf->arg0(); +} + std::string SrsConfig::get_rtc_server_ip_family() { static string DEFAULT = "ipv4"; diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 3f2e6d2866..bb8920a1b6 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -498,6 +498,9 @@ class SrsConfig virtual bool get_resolve_api_domain(); virtual bool get_keep_api_domain(); virtual bool get_use_auto_detect_network_ip(); + virtual bool get_rtc_server_tcp_enabled(); + virtual int get_rtc_server_tcp_listen(); + virtual std::string get_rtc_server_protocol(); virtual std::string get_rtc_server_ip_family(); virtual bool get_rtc_server_ecdsa(); virtual bool get_rtc_server_encrypt(); diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index e861d47eff..f14d156d92 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -47,6 +47,7 @@ using namespace std; #include #include #include +#include SrsPps* _srs_pps_sstuns = NULL; SrsPps* _srs_pps_srtcps = NULL; @@ -75,9 +76,9 @@ ISrsRtcTransport::~ISrsRtcTransport() { } -SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s) +SrsSecurityTransport::SrsSecurityTransport(ISrsRtcNetwork* s) { - session_ = s; + network_ = s; dtls_ = new SrsDtls((ISrsDtlsCallback*)this); srtp_ = new SrsSRTP(); @@ -111,7 +112,7 @@ srs_error_t SrsSecurityTransport::write_dtls_data(void* data, int size) ++_srs_pps_sstuns->sugar; - if ((err = session_->sendonly_skt->sendto(data, size, 0)) != srs_success) { + if ((err = network_->write(data, size, NULL)) != srs_success) { return srs_error_wrap(err, "send dtls packet"); } @@ -129,7 +130,7 @@ srs_error_t SrsSecurityTransport::on_dtls(char* data, int nb_data) srs_error_t SrsSecurityTransport::on_dtls_alert(std::string type, std::string desc) { - return session_->on_dtls_alert(type, desc); + return network_->on_dtls_alert(type, desc); } srs_error_t SrsSecurityTransport::on_dtls_handshake_done() @@ -148,7 +149,7 @@ srs_error_t SrsSecurityTransport::on_dtls_handshake_done() return srs_error_wrap(err, "srtp init"); } - return session_->on_connection_established(); + return network_->on_connection_established(); } srs_error_t SrsSecurityTransport::on_dtls_application_data(const char* buf, const int nb_buf) @@ -198,7 +199,7 @@ srs_error_t SrsSecurityTransport::unprotect_rtcp(void* packet, int* nb_plaintext return srtp_->unprotect_rtcp(packet, nb_plaintext); } -SrsSemiSecurityTransport::SrsSemiSecurityTransport(SrsRtcConnection* s) : SrsSecurityTransport(s) +SrsSemiSecurityTransport::SrsSemiSecurityTransport(ISrsRtcNetwork* s) : SrsSecurityTransport(s) { } @@ -216,9 +217,9 @@ srs_error_t SrsSemiSecurityTransport::protect_rtcp(void* packet, int* nb_cipher) return srs_success; } -SrsPlaintextTransport::SrsPlaintextTransport(SrsRtcConnection* s) +SrsPlaintextTransport::SrsPlaintextTransport(ISrsRtcNetwork* s) { - session_ = s; + network_ = s; } SrsPlaintextTransport::~SrsPlaintextTransport() @@ -248,7 +249,7 @@ srs_error_t SrsPlaintextTransport::on_dtls_alert(std::string type, std::string d srs_error_t SrsPlaintextTransport::on_dtls_handshake_done() { srs_trace("RTC: DTLS handshake done."); - return session_->on_connection_established(); + return network_->on_connection_established(); } srs_error_t SrsPlaintextTransport::on_dtls_application_data(const char* data, const int len) @@ -434,11 +435,6 @@ SrsRtcPlayStream::~SrsRtcPlayStream() session_->server_->exec_async_work(new SrsRtcAsyncCallOnStop(cid_, req_)); } - // TODO: FIXME: Should not do callback in de-constructor? - if (_srs_rtc_hijacker) { - _srs_rtc_hijacker->on_stop_play(session_, this, req_); - } - _srs_config->unsubscribe(this); srs_freep(nack_epp); @@ -600,12 +596,6 @@ srs_error_t SrsRtcPlayStream::start() return srs_error_wrap(err, "start pli worker"); } - if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_start_play(session_, this, req_)) != srs_success) { - return srs_error_wrap(err, "on start play"); - } - } - is_started = true; return err; @@ -649,12 +639,6 @@ srs_error_t SrsRtcPlayStream::cycle() SrsErrorPithyPrint* epp = new SrsErrorPithyPrint(); SrsAutoFree(SrsErrorPithyPrint, epp); - if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_start_consume(session_, this, req_, consumer)) != srs_success) { - return srs_error_wrap(err, "on start consuming"); - } - } - while (true) { if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "rtc sender thread"); @@ -1105,14 +1089,6 @@ SrsRtcPublishStream::~SrsRtcPublishStream() source->on_unpublish(); } - // TODO: FIXME: Should not do callback in de-constructor? - // NOTE: on_stop_publish lead to switch io, - // it must be called after source stream unpublish (set source stream is_created=false). - // if not, it lead to republish failed. - if (_srs_rtc_hijacker) { - _srs_rtc_hijacker->on_stop_publish(session_, this, req_); - } - for (int i = 0; i < (int)video_tracks_.size(); ++i) { SrsRtcVideoRecvTrack* track = video_tracks_.at(i); srs_freep(track); @@ -1248,12 +1224,6 @@ srs_error_t SrsRtcPublishStream::start() return srs_error_wrap(err, "start pli worker"); } - if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_start_publish(session_, this, req_)) != srs_success) { - return srs_error_wrap(err, "on start publish"); - } - } - is_started = true; return err; @@ -1383,7 +1353,7 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data) // Decrypt the cipher to plaintext RTP data. char* plaintext = data; int nb_plaintext = nb_data; - if ((err = session_->transport_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) { + if ((err = session_->network_->unprotect_rtp(plaintext, &nb_plaintext)) != srs_success) { // We try to decode the RTP header for more detail error informations. SrsBuffer b(data, nb_data); SrsRtpHeader h; h.ignore_padding(true); srs_error_t r0 = h.decode(&b); srs_freep(r0); // Ignore any error for header decoding. @@ -1467,12 +1437,6 @@ srs_error_t SrsRtcPublishStream::do_on_rtp_plaintext(SrsRtpPacket*& pkt, SrsBuff return srs_error_new(ERROR_RTC_RTP, "unknown ssrc=%u", ssrc); } - if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_rtp_packet(session_, this, req_, pkt)) != srs_success) { - return srs_error_wrap(err, "on rtp packet"); - } - } - // If circuit-breaker is enabled, disable nack. if (_srs_circuit_breaker->hybrid_critical_water_level()) { ++_srs_pps_snack4->sugar; @@ -1760,14 +1724,6 @@ void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& n } } -ISrsRtcConnectionHijacker::ISrsRtcConnectionHijacker() -{ -} - -ISrsRtcConnectionHijacker::~ISrsRtcConnectionHijacker() -{ -} - SrsRtcConnectionNackTimer::SrsRtcConnectionNackTimer(SrsRtcConnection* p) : p_(p) { _srs_hybrid->timer20ms()->subscribe(this); @@ -1811,11 +1767,9 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) { req_ = NULL; cid_ = cid; - hijacker_ = NULL; - sendonly_skt = NULL; server_ = s; - transport_ = new SrsSecurityTransport(this); + network_ = new SrsRtcNetwork(this); cache_iov_ = new iovec(); cache_iov_->iov_base = new char[kRtpPacketSize]; @@ -1829,9 +1783,7 @@ SrsRtcConnection::SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid) twcc_id_ = 0; nn_simulate_player_nack_drop = 0; - pp_address_change = new SrsErrorPithyPrint(); pli_epp = new SrsErrorPithyPrint(); - delta_ = new SrsEphemeralDelta(); nack_enabled_ = false; timer_nack_ = new SrsRtcConnectionNackTimer(this); @@ -1861,13 +1813,8 @@ SrsRtcConnection::~SrsRtcConnection() players_.clear(); players_ssrc_map_.clear(); - // Note that we should never delete the sendonly_skt, - // it's just point to the object in peer_addresses_. - map::iterator it; - for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) { - SrsUdpMuxSocket* addr = it->second; - srs_freep(addr); - } + // Free network over UDP or TCP. + srs_freep(network_); if (true) { char* iov_base = (char*)cache_iov_->iov_base; @@ -1876,11 +1823,8 @@ SrsRtcConnection::~SrsRtcConnection() } srs_freep(cache_buffer_); - srs_freep(transport_); srs_freep(req_); - srs_freep(pp_address_change); srs_freep(pli_epp); - srs_freep(delta_); } void SrsRtcConnection::on_before_dispose(ISrsResource* c) @@ -1943,22 +1887,9 @@ string SrsRtcConnection::username() return username_; } -vector SrsRtcConnection::peer_addresses() -{ - vector addresses; - - map::iterator it; - for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) { - SrsUdpMuxSocket* addr = it->second; - addresses.push_back(addr); - } - - return addresses; -} - ISrsKbpsDelta* SrsRtcConnection::delta() { - return delta_; + return network_->delta(); } const SrsContextId& SrsRtcConnection::get_id() @@ -2034,12 +1965,6 @@ srs_error_t SrsRtcConnection::add_player(SrsRtcUserConfig* ruc, SrsSdp& local_sd SrsRequest* req = ruc->req_; - if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_before_play(this, req)) != srs_success) { - return srs_error_wrap(err, "before play"); - } - } - std::map play_sub_relations; if ((err = negotiate_play_capability(ruc, play_sub_relations)) != srs_success) { return srs_error_wrap(err, "play negotiate"); @@ -2084,17 +2009,8 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st username_ = username; req_ = r->copy(); - if (!srtp) { - srs_freep(transport_); - if (dtls) { - transport_ = new SrsSemiSecurityTransport(this); - } else { - transport_ = new SrsPlaintextTransport(this); - } - } - SrsSessionConfig* cfg = &local_sdp.session_negotiate_; - if ((err = transport_->initialize(cfg)) != srs_success) { + if ((err = network_->initialize(cfg, dtls, srtp)) != srs_success) { return srs_error_wrap(err, "init"); } @@ -2111,26 +2027,19 @@ srs_error_t SrsRtcConnection::initialize(SrsRequest* r, bool dtls, bool srtp, st return err; } -srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) +srs_error_t SrsRtcConnection::on_stun(SrsStunPacket* r, char* data, int nb_data) { srs_error_t err = srs_success; - // Update stat when we received data. - delta_->add_delta(skt->size(), 0); + // Write STUN messages to blackhole. + if (_srs_blackhole->blackhole) { + _srs_blackhole->sendto(data, nb_data); + } if (!r->is_binding_request()) { return err; } - // We are running in the ice-lite(server) mode. If client have multi network interface, - // we only choose one candidate pair which is determined by client. - update_sendonly_socket(skt); - - // Write STUN messages to blackhole. - if (_srs_blackhole->blackhole) { - _srs_blackhole->sendto(skt->data(), skt->size()); - } - if ((err = on_binding_request(r)) != srs_success) { return srs_error_wrap(err, "stun binding request failed"); } @@ -2140,21 +2049,15 @@ srs_error_t SrsRtcConnection::on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r) srs_error_t SrsRtcConnection::on_dtls(char* data, int nb_data) { - // Update stat when we received data. - delta_->add_delta(nb_data, 0); - - return transport_->on_dtls(data, nb_data); + return network_->on_dtls(data, nb_data); } srs_error_t SrsRtcConnection::on_rtcp(char* data, int nb_data) { srs_error_t err = srs_success; - // Update stat when we received data. - delta_->add_delta(nb_data, 0); - int nb_unprotected_buf = nb_data; - if ((err = transport_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) { + if ((err = network_->unprotect_rtcp(data, &nb_unprotected_buf)) != srs_success) { return srs_error_wrap(err, "rtcp unprotect"); } @@ -2282,18 +2185,10 @@ srs_error_t SrsRtcConnection::on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp) return srs_success; } -void SrsRtcConnection::set_hijacker(ISrsRtcConnectionHijacker* h) -{ - hijacker_ = h; -} - srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data) { srs_error_t err = srs_success; - // Update stat when we received data. - delta_->add_delta(nb_data, 0); - SrsRtcPublishStream* publisher = NULL; if ((err = find_publisher(data, nb_data, &publisher)) != srs_success) { return srs_error_wrap(err, "find"); @@ -2368,12 +2263,6 @@ srs_error_t SrsRtcConnection::on_connection_established() } } - if (hijacker_) { - if ((err = hijacker_->on_dtls_done()) != srs_success) { - return srs_error_wrap(err, "hijack on dtls done"); - } - } - return err; } @@ -2393,39 +2282,6 @@ srs_error_t SrsRtcConnection::on_dtls_alert(std::string type, std::string desc) return err; } -srs_error_t SrsRtcConnection::start_play(string stream_uri) -{ - srs_error_t err = srs_success; - - map::iterator it = players_.find(stream_uri); - if(it == players_.end()) { - return srs_error_new(ERROR_RTC_NO_PLAYER, "not subscribe %s", stream_uri.c_str()); - } - - SrsRtcPlayStream* player = it->second; - if ((err = player->start()) != srs_success) { - return srs_error_wrap(err, "start"); - } - - return err; -} - -srs_error_t SrsRtcConnection::start_publish(std::string stream_uri) -{ - srs_error_t err = srs_success; - - map::iterator it = publishers_.find(stream_uri); - if(it == publishers_.end()) { - return srs_error_new(ERROR_RTC_NO_PUBLISHER, "no %s publisher", stream_uri.c_str()); - } - - if ((err = it->second->start()) != srs_success) { - return srs_error_wrap(err, "start"); - } - - return err; -} - bool SrsRtcConnection::is_alive() { return last_stun_time + session_timeout > srs_get_system_time(); @@ -2436,52 +2292,9 @@ void SrsRtcConnection::alive() last_stun_time = srs_get_system_time(); } -void SrsRtcConnection::update_sendonly_socket(SrsUdpMuxSocket* skt) +SrsRtcUdpNetwork* SrsRtcConnection::udp() { - // TODO: FIXME: Refine performance. - string prev_peer_id, peer_id = skt->peer_id(); - if (sendonly_skt) { - prev_peer_id = sendonly_skt->peer_id(); - } - - // Ignore if same address. - if (prev_peer_id == peer_id) { - return; - } - - // Find object from cache. - SrsUdpMuxSocket* addr_cache = NULL; - if (true) { - map::iterator it = peer_addresses_.find(peer_id); - if (it != peer_addresses_.end()) { - addr_cache = it->second; - } - } - - // Show address change log. - if (prev_peer_id.empty()) { - srs_trace("RTC: session address init %s", peer_id.c_str()); - } else { - uint32_t nn = 0; - if (pp_address_change->can_print(skt->get_peer_port(), &nn)) { - srs_trace("RTC: session address change %s -> %s, cached=%d, nn_change=%u/%u, nn_address=%u", prev_peer_id.c_str(), - peer_id.c_str(), (addr_cache? 1:0), pp_address_change->nn_count, nn, peer_addresses_.size()); - } - } - - // If no cache, build cache and setup the relations in connection. - if (!addr_cache) { - peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly(); - _srs_rtc_manager->add_with_id(peer_id, this); - - uint64_t fast_id = skt->fast_id(); - if (fast_id) { - _srs_rtc_manager->add_with_fast_id(fast_id, this); - } - } - - // Update the transport. - sendonly_skt = addr_cache; + return network_->udp(); } srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) @@ -2490,15 +2303,12 @@ srs_error_t SrsRtcConnection::send_rtcp(char *data, int nb_data) ++_srs_pps_srtcps->sugar; - // Update stat when we sending data. - delta_->add_delta(0, nb_data); - int nb_buf = nb_data; - if ((err = transport_->protect_rtcp(data, &nb_buf)) != srs_success) { + if ((err = network_->protect_rtcp(data, &nb_buf)) != srs_success) { return srs_error_wrap(err, "protect rtcp"); } - if ((err = sendonly_skt->sendto(data, nb_buf, 0)) != srs_success) { + if ((err = network_->write(data, nb_buf, NULL)) != srs_success) { return srs_error_wrap(err, "send"); } @@ -2682,7 +2492,7 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt) // Cipher RTP to SRTP packet. if (true) { int nn_encrypt = (int)iov->iov_len; - if ((err = transport_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) { + if ((err = network_->protect_rtp(iov->iov_base, &nn_encrypt)) != srs_success) { return srs_error_wrap(err, "srtp protect"); } iov->iov_len = (size_t)nn_encrypt; @@ -2697,11 +2507,11 @@ srs_error_t SrsRtcConnection::do_send_packet(SrsRtpPacket* pkt) ++_srs_pps_srtps->sugar; - // Update stat when we sending data. - delta_->add_delta(0, iov->iov_len); - - // TODO: FIXME: Handle error. - sendonly_skt->sendto(iov->iov_base, iov->iov_len, 0); + if ((err = network_->write(iov->iov_base, iov->iov_len, NULL)) != srs_success) { + srs_warn("RTC: Write %d bytes err %s", iov->iov_len, srs_error_desc(err).c_str()); + srs_freep(err); + return err; + } // Detail log, should disable it in release version. srs_info("RTC: SEND PT=%u, SSRC=%#x, SEQ=%u, Time=%u, %u/%u bytes", pkt->header.get_payload_type(), pkt->header.get_ssrc(), @@ -2764,17 +2574,14 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) stun_binding_response.set_remote_ufrag(r->get_local_ufrag()); stun_binding_response.set_transcation_id(r->get_transcation_id()); // FIXME: inet_addr is deprecated, IPV6 support - stun_binding_response.set_mapped_address(be32toh(inet_addr(sendonly_skt->get_peer_ip().c_str()))); - stun_binding_response.set_mapped_port(sendonly_skt->get_peer_port()); + stun_binding_response.set_mapped_address(be32toh(inet_addr(network_->get_peer_ip().c_str()))); + stun_binding_response.set_mapped_port(network_->get_peer_port()); if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) { return srs_error_wrap(err, "stun binding response encode failed"); } - // Update stat when we sending data. - delta_->add_delta(0, stream->pos()); - - if ((err = sendonly_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) { + if ((err = network_->write(stream->data(), stream->pos(), NULL)) != srs_success) { return srs_error_wrap(err, "stun binding response send failed"); } @@ -2783,7 +2590,7 @@ srs_error_t SrsRtcConnection::on_binding_request(SrsStunPacket* r) // TODO: FIXME: Add cost. srs_trace("RTC: session STUN done, waiting DTLS handshake."); - if((err = transport_->start_active_handshake()) != srs_success) { + if((err = network_->start_active_handshake()) != srs_success) { return srs_error_wrap(err, "fail to dtls handshake"); } } @@ -3646,12 +3453,6 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDesc } } - if (_srs_rtc_hijacker) { - if ((err = _srs_rtc_hijacker->on_create_publish(this, publisher, req)) != srs_success) { - return srs_error_wrap(err, "on create publish"); - } - } - // If DTLS done, start the publisher. Because maybe create some publishers after DTLS done. // For example, for single PC, we maybe start publisher when create it, because DTLS is done. if(ESTABLISHED == state()) { @@ -3663,13 +3464,3 @@ srs_error_t SrsRtcConnection::create_publisher(SrsRequest* req, SrsRtcSourceDesc return err; } -ISrsRtcHijacker::ISrsRtcHijacker() -{ -} - -ISrsRtcHijacker::~ISrsRtcHijacker() -{ -} - -ISrsRtcHijacker* _srs_rtc_hijacker = NULL; - diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 425016d704..1d69709483 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -52,6 +52,9 @@ class SrsRtcUserConfig; class SrsRtcSendTrack; class SrsRtcPublishStream; class SrsEphemeralDelta; +class SrsRtcNetwork; +class SrsRtcUdpNetwork; +class ISrsRtcNetwork; const uint8_t kSR = 200; const uint8_t kRR = 201; @@ -101,12 +104,12 @@ class ISrsRtcTransport : public ISrsDtlsCallback class SrsSecurityTransport : public ISrsRtcTransport { private: - SrsRtcConnection* session_; + ISrsRtcNetwork* network_; SrsDtls* dtls_; SrsSRTP* srtp_; bool handshake_done; public: - SrsSecurityTransport(SrsRtcConnection* s); + SrsSecurityTransport(ISrsRtcNetwork* s); virtual ~SrsSecurityTransport(); srs_error_t initialize(SrsSessionConfig* cfg); @@ -136,7 +139,7 @@ class SrsSecurityTransport : public ISrsRtcTransport class SrsSemiSecurityTransport : public SrsSecurityTransport { public: - SrsSemiSecurityTransport(SrsRtcConnection* s); + SrsSemiSecurityTransport(ISrsRtcNetwork* s); virtual ~SrsSemiSecurityTransport(); public: srs_error_t protect_rtp(void* packet, int* nb_cipher); @@ -147,9 +150,9 @@ class SrsSemiSecurityTransport : public SrsSecurityTransport class SrsPlaintextTransport : public ISrsRtcTransport { private: - SrsRtcConnection* session_; + ISrsRtcNetwork* network_; public: - SrsPlaintextTransport(SrsRtcConnection* s); + SrsPlaintextTransport(ISrsRtcNetwork* s); virtual ~SrsPlaintextTransport(); public: virtual srs_error_t initialize(SrsSessionConfig* cfg); @@ -407,16 +410,6 @@ class SrsRtcPublishStream : public ISrsRtspPacketDecodeHandler void update_send_report_time(uint32_t ssrc, const SrsNtp& ntp, uint32_t rtp_time); }; -// Callback for RTC connection. -class ISrsRtcConnectionHijacker -{ -public: - ISrsRtcConnectionHijacker(); - virtual ~ISrsRtcConnectionHijacker(); -public: - virtual srs_error_t on_dtls_done() = 0; -}; - // A fast timer for conntion, for NACK feedback. class SrsRtcConnectionNackTimer : public ISrsFastTimer { @@ -444,11 +437,9 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi SrsRtcConnectionNackTimer* timer_nack_; public: bool disposing_; - ISrsRtcConnectionHijacker* hijacker_; private: SrsRtcServer* server_; SrsRtcConnectionStateType state_; - ISrsRtcTransport* transport_; private: iovec* cache_iov_; SrsBuffer* cache_buffer_; @@ -464,10 +455,8 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi private: // The local:remote username, such as m5x0n128:jvOm where local name is m5x0n128. std::string username_; - // The peer address, client maybe use more than one address, it's the current selected one. - SrsUdpMuxSocket* sendonly_skt; - // The address list, client may use multiple addresses. - std::map peer_addresses_; + // Use one UDP network and one TCP network. + SrsRtcNetwork* network_; private: // TODO: FIXME: Rename it. // The timeout of session, keep alive by STUN ping pong. @@ -485,14 +474,10 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi int twcc_id_; // Simulators. int nn_simulate_player_nack_drop; - // Pithy print for address change, use port as error code. - SrsErrorPithyPrint* pp_address_change; // Pithy print for PLI request. SrsErrorPithyPrint* pli_epp; private: bool nack_enabled_; -private: - SrsEphemeralDelta* delta_; public: SrsRtcConnection(SrsRtcServer* s, const SrsContextId& cid); virtual ~SrsRtcConnection(); @@ -511,8 +496,6 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi void set_state(SrsRtcConnectionStateType state); // Get username pair for this connection, used as ID of session. std::string username(); - // Get all addresses client used. - std::vector peer_addresses(); public: virtual ISrsKbpsDelta* delta(); // Interface ISrsResource. @@ -532,7 +515,7 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi // Before initialize, user must set the local SDP, which is used to inititlize DTLS. srs_error_t initialize(SrsRequest* r, bool dtls, bool srtp, std::string username); // The peer address may change, we can identify that by STUN messages. - srs_error_t on_stun(SrsUdpMuxSocket* skt, SrsStunPacket* r); + srs_error_t on_stun(SrsStunPacket* r, char* data, int nb_data); srs_error_t on_dtls(char* data, int nb_data); srs_error_t on_rtp(char* data, int nb_data); private: @@ -545,16 +528,13 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi public: srs_error_t on_rtcp_feedback_twcc(char* buf, int nb_buf); srs_error_t on_rtcp_feedback_remb(SrsRtcpPsfbCommon *rtcp); -public: - void set_hijacker(ISrsRtcConnectionHijacker* h); public: srs_error_t on_connection_established(); srs_error_t on_dtls_alert(std::string type, std::string desc); - srs_error_t start_play(std::string stream_uri); - srs_error_t start_publish(std::string stream_uri); bool is_alive(); void alive(); - void update_sendonly_socket(SrsUdpMuxSocket* skt); +public: + SrsRtcUdpNetwork* udp(); public: // send rtcp srs_error_t send_rtcp(char *data, int nb_data); @@ -582,33 +562,5 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, publi srs_error_t create_publisher(SrsRequest* request, SrsRtcSourceDescription* stream_desc); }; -class ISrsRtcHijacker -{ -public: - ISrsRtcHijacker(); - virtual ~ISrsRtcHijacker(); -public: - // Initialize the hijacker. - virtual srs_error_t initialize() = 0; - // When create publisher, SDP is done, DTLS is not ready. - virtual srs_error_t on_create_publish(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req) = 0; - // When start publisher by RTC, SDP and DTLS are done. - virtual srs_error_t on_start_publish(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req) = 0; - // When stop publish by RTC. - virtual void on_stop_publish(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req) = 0; - // When got RTP plaintext packet. - virtual srs_error_t on_rtp_packet(SrsRtcConnection* session, SrsRtcPublishStream* publisher, SrsRequest* req, SrsRtpPacket* pkt) = 0; - // When before play by RTC. (wait source to ready in cascade scenario) - virtual srs_error_t on_before_play(SrsRtcConnection* session, SrsRequest* req) = 0; - // When start player by RTC. - virtual srs_error_t on_start_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0; - // When stop player by RTC. - virtual void on_stop_play(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req) = 0; - // When start consuming for player for RTC. - virtual srs_error_t on_start_consume(SrsRtcConnection* session, SrsRtcPlayStream* player, SrsRequest* req, SrsRtcConsumer* consumer) = 0; -}; - -extern ISrsRtcHijacker* _srs_rtc_hijacker; - #endif diff --git a/trunk/src/app/srs_app_rtc_network.cpp b/trunk/src/app/srs_app_rtc_network.cpp new file mode 100644 index 0000000000..9537ad6283 --- /dev/null +++ b/trunk/src/app/srs_app_rtc_network.cpp @@ -0,0 +1,388 @@ +// +// Copyright (c) 2013-2022 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// + +#include + +#include +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +ISrsRtcNetwork::ISrsRtcNetwork() +{ +} + +ISrsRtcNetwork::~ISrsRtcNetwork() +{ +} + +SrsRtcNetwork::SrsRtcNetwork(SrsRtcConnection* conn) +{ + conn_ = conn; + udp_ = new SrsRtcUdpNetwork(this); + delta_ = new SrsEphemeralDelta(); +} + +SrsRtcNetwork::~SrsRtcNetwork() +{ + // Free the UDP network after transport deleted. + srs_freep(udp_); + + srs_freep(delta_); +} + +srs_error_t SrsRtcNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool srtp) +{ + srs_error_t err = srs_success; + + if ((err = udp_->initialize(cfg, dtls, srtp)) != srs_success) { + return srs_error_wrap(err, "init"); + } + + return err; +} + +srs_error_t SrsRtcNetwork::start_active_handshake() +{ + return udp_->start_active_handshake(); +} + +srs_error_t SrsRtcNetwork::on_dtls(char* data, int nb_data) +{ + return udp_->on_dtls(data, nb_data); +} + +srs_error_t SrsRtcNetwork::on_dtls_alert(std::string type, std::string desc) +{ + return conn_->on_dtls_alert(type, desc); +} + +srs_error_t SrsRtcNetwork::on_connection_established() +{ + return conn_->on_connection_established(); +} + +srs_error_t SrsRtcNetwork::protect_rtp(void* packet, int* nb_cipher) +{ + return udp_->protect_rtp(packet, nb_cipher); +} + +srs_error_t SrsRtcNetwork::protect_rtcp(void* packet, int* nb_cipher) +{ + return udp_->protect_rtcp(packet, nb_cipher); +} + +srs_error_t SrsRtcNetwork::unprotect_rtp(void* packet, int* nb_plaintext) +{ + return udp_->unprotect_rtp(packet, nb_plaintext); +} + +srs_error_t SrsRtcNetwork::unprotect_rtcp(void* packet, int* nb_plaintext) +{ + return udp_->unprotect_rtcp(packet, nb_plaintext); +} + +srs_error_t SrsRtcNetwork::on_rtcp(char* data, int nb_data) +{ + // Update stat when we received data. + delta_->add_delta(nb_data, 0); + + return conn_->on_rtcp(data, nb_data); +} + +srs_error_t SrsRtcNetwork::on_rtp(char* data, int nb_data) +{ + // Update stat when we received data. + delta_->add_delta(nb_data, 0); + + return conn_->on_rtp(data, nb_data); +} + +string SrsRtcNetwork::get_peer_ip() +{ + return udp_->get_peer_ip(); +} + +int SrsRtcNetwork::get_peer_port() +{ + return udp_->get_peer_port(); +} + +SrsRtcUdpNetwork* SrsRtcNetwork::udp() +{ + return udp_; +} + +ISrsKbpsDelta* SrsRtcNetwork::delta() +{ + return delta_; +} + +srs_error_t SrsRtcNetwork::write(void* buf, size_t size, ssize_t* nwrite) +{ + return udp_->write(buf, size, nwrite); +} + +SrsRtcUdpNetwork::SrsRtcUdpNetwork(SrsRtcNetwork* network) +{ + network_ = network; + sendonly_skt = NULL; + pp_address_change_ = new SrsErrorPithyPrint(); + transport_ = new SrsSecurityTransport(this); +} + +SrsRtcUdpNetwork::~SrsRtcUdpNetwork() +{ + // Free transport first, which depends on socket. + srs_freep(transport_); + + // Note that we should never delete the sendonly_skt, + // it's just point to the object in peer_addresses_. + map::iterator it; + for (it = peer_addresses_.begin(); it != peer_addresses_.end(); ++it) { + SrsUdpMuxSocket* addr = it->second; + srs_freep(addr); + } + + srs_freep(pp_address_change_); +} + +srs_error_t SrsRtcUdpNetwork::initialize(SrsSessionConfig* cfg, bool dtls, bool srtp) +{ + srs_error_t err = srs_success; + + if (!srtp) { + srs_freep(transport_); + if (dtls) { + transport_ = new SrsSemiSecurityTransport(this); + } else { + transport_ = new SrsPlaintextTransport(this); + } + } + + if ((err = transport_->initialize(cfg)) != srs_success) { + return srs_error_wrap(err, "init"); + } + + return err; +} + +srs_error_t SrsRtcUdpNetwork::start_active_handshake() +{ + return transport_->start_active_handshake(); +} + +srs_error_t SrsRtcUdpNetwork::on_dtls(char* data, int nb_data) +{ + // Update stat when we received data. + network_->delta_->add_delta(nb_data, 0); + + return transport_->on_dtls(data, nb_data); +} + +srs_error_t SrsRtcUdpNetwork::on_dtls_alert(std::string type, std::string desc) +{ + return network_->conn_->on_dtls_alert(type, desc); +} + +srs_error_t SrsRtcUdpNetwork::on_connection_established() +{ + return network_->conn_->on_connection_established(); +} + +srs_error_t SrsRtcUdpNetwork::protect_rtp(void* packet, int* nb_cipher) +{ + return transport_->protect_rtp(packet, nb_cipher); +} + +srs_error_t SrsRtcUdpNetwork::protect_rtcp(void* packet, int* nb_cipher) +{ + return transport_->protect_rtcp(packet, nb_cipher); +} + +srs_error_t SrsRtcUdpNetwork::unprotect_rtp(void* packet, int* nb_plaintext) +{ + return transport_->unprotect_rtp(packet, nb_plaintext); +} + +srs_error_t SrsRtcUdpNetwork::unprotect_rtcp(void* packet, int* nb_plaintext) +{ + // Update stat when we received data. + network_->delta_->add_delta(*nb_plaintext, 0); + + return transport_->unprotect_rtcp(packet, nb_plaintext); +} + +srs_error_t SrsRtcUdpNetwork::on_rtcp(char* data, int nb_data) +{ + // Update stat when we received data. + network_->delta_->add_delta(nb_data, 0); + + return network_->conn_->on_rtcp(data, nb_data); +} + +srs_error_t SrsRtcUdpNetwork::on_rtp(char* data, int nb_data) +{ + // Update stat when we received data. + network_->delta_->add_delta(nb_data, 0); + + return network_->conn_->on_rtp(data, nb_data); +} + +string SrsRtcUdpNetwork::get_peer_ip() +{ + srs_assert(sendonly_skt); + return sendonly_skt->get_peer_ip(); +} + +int SrsRtcUdpNetwork::get_peer_port() +{ + srs_assert(sendonly_skt); + return sendonly_skt->get_peer_port(); +} + +void SrsRtcUdpNetwork::update_sendonly_socket(SrsUdpMuxSocket* skt) +{ + // TODO: FIXME: Refine performance. + string prev_peer_id, peer_id = skt->peer_id(); + if (sendonly_skt) { + prev_peer_id = sendonly_skt->peer_id(); + } + + // Ignore if same address. + if (prev_peer_id == peer_id) { + return; + } + + // Find object from cache. + SrsUdpMuxSocket* addr_cache = NULL; + if (true) { + map::iterator it = peer_addresses_.find(peer_id); + if (it != peer_addresses_.end()) { + addr_cache = it->second; + } + } + + // Show address change log. + if (prev_peer_id.empty()) { + srs_trace("RTC: session address init %s", peer_id.c_str()); + } else { + uint32_t nn = 0; + if (pp_address_change_->can_print(skt->get_peer_port(), &nn)) { + srs_trace("RTC: session address change %s -> %s, cached=%d, nn_change=%u/%u, nn_address=%u", prev_peer_id.c_str(), + peer_id.c_str(), (addr_cache? 1:0), pp_address_change_->nn_count, nn, peer_addresses_.size()); + } + } + + // If no cache, build cache and setup the relations in connection. + if (!addr_cache) { + peer_addresses_[peer_id] = addr_cache = skt->copy_sendonly(); + _srs_rtc_manager->add_with_id(peer_id, network_->conn_); + + uint64_t fast_id = skt->fast_id(); + if (fast_id) { + _srs_rtc_manager->add_with_fast_id(fast_id, network_->conn_); + } + } + + // Update the transport. + sendonly_skt = addr_cache; +} + +srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite) +{ + // Update stat when we sending data. + network_->delta_->add_delta(0, size); + + if (nwrite) *nwrite = size; + return sendonly_skt->sendto(buf, size, SRS_UTIME_NO_TIMEOUT); +} + +SrsRtcTcpConn::SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm) +{ + manager_ = cm; + ip_ = cip; + port_ = port; + skt_ = new SrsTcpConnection(fd); + delta_ = new SrsNetworkDelta(); + delta_->set_io(skt_, skt_); + trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id()); +} + +SrsRtcTcpConn::~SrsRtcTcpConn() +{ + trd_->interrupt(); + srs_freep(trd_); + + srs_freep(delta_); + srs_freep(skt_); +} + +ISrsKbpsDelta* SrsRtcTcpConn::delta() +{ + return delta_; +} + +std::string SrsRtcTcpConn::desc() +{ + return "Tcp"; +} + +const SrsContextId& SrsRtcTcpConn::get_id() +{ + return trd_->cid(); +} + +std::string SrsRtcTcpConn::remote_ip() +{ + return ip_; +} + +srs_error_t SrsRtcTcpConn::start() +{ + return trd_->start(); +} + +srs_error_t SrsRtcTcpConn::cycle() +{ + srs_error_t err = do_cycle(); + + // Only stat the HTTP streaming clients, ignore all API clients. + SrsStatistic::instance()->on_disconnect(get_id().c_str()); + SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), delta_); + + // Because we use manager to manage this object, + // not the http connection object, so we must remove it here. + manager_->remove(this); + + // For HTTP-API timeout, we think it's done successfully, + // because there may be no request or response for HTTP-API. + if (srs_error_code(err) == ERROR_SOCKET_TIMEOUT) { + srs_freep(err); + return srs_success; + } + + return err; +} + +srs_error_t SrsRtcTcpConn::do_cycle() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Handle all bytes of TCP Connection. + + return err; +} + diff --git a/trunk/src/app/srs_app_rtc_network.hpp b/trunk/src/app/srs_app_rtc_network.hpp new file mode 100644 index 0000000000..b883701e55 --- /dev/null +++ b/trunk/src/app/srs_app_rtc_network.hpp @@ -0,0 +1,172 @@ +// +// Copyright (c) 2013-2022 The SRS Authors +// +// SPDX-License-Identifier: MIT or MulanPSL-2.0 +// + +#ifndef SRS_APP_RTC_NETWORK_HPP +#define SRS_APP_RTC_NETWORK_HPP + +#include + +#include +#include + +#include +#include +#include +#include + +class ISrsResourceManager; +class SrsCoroutine; +class SrsNetworkDelta; +class SrsTcpConnection; +class ISrsKbpsDelta; +class SrsUdpMuxSocket; +class SrsErrorPithyPrint; +class ISrsRtcTransport; +class SrsEphemeralDelta; +class ISrsKbpsDelta; +class SrsRtcUdpNetwork; + +// For DTLS to call network service. +class ISrsRtcNetwork : public ISrsStreamWriter +{ +public: + ISrsRtcNetwork(); + virtual ~ISrsRtcNetwork(); +public: + // Callback when DTLS connected. + virtual srs_error_t on_connection_established() = 0; + // Callback when DTLS disconnected. + virtual srs_error_t on_dtls_alert(std::string type, std::string desc) = 0; +}; + +// The UDP network, default for WebRTC. +class SrsRtcNetwork : public ISrsRtcNetwork +{ +private: + friend class SrsRtcUdpNetwork; +private: + // WebRTC session object. + SrsRtcConnection* conn_; + // Network over UDP. + SrsRtcUdpNetwork* udp_; + // Delta object for statistics. + SrsEphemeralDelta* delta_; +public: + SrsRtcNetwork(SrsRtcConnection* conn); + virtual ~SrsRtcNetwork(); +// DTLS transport functions. +public: + srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp); + virtual srs_error_t start_active_handshake(); + virtual srs_error_t on_dtls(char* data, int nb_data); + virtual srs_error_t on_dtls_alert(std::string type, std::string desc); + srs_error_t on_connection_established(); + srs_error_t protect_rtp(void* packet, int* nb_cipher); + srs_error_t protect_rtcp(void* packet, int* nb_cipher); + srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); + srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); +// When got data from socket. +public: + srs_error_t on_rtcp(char* data, int nb_data); + srs_error_t on_rtp(char* data, int nb_data); +// Other functions. +public: + // ICE reflexive address functions. + std::string get_peer_ip(); + int get_peer_port(); + // Get the UDP network object. + SrsRtcUdpNetwork* udp(); + // Get the delta object for statistics. + virtual ISrsKbpsDelta* delta(); +// Interface ISrsStreamWriter. +public: + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); +}; + +// The WebRTC over UDP network. +class SrsRtcUdpNetwork : public ISrsRtcNetwork +{ +private: + SrsRtcNetwork* network_; +private: + // Pithy print for address change, use port as error code. + SrsErrorPithyPrint* pp_address_change_; + // The peer address, client maybe use more than one address, it's the current selected one. + SrsUdpMuxSocket* sendonly_skt; + // The address list, client may use multiple addresses. + std::map peer_addresses_; + // The DTLS transport over this network. + ISrsRtcTransport* transport_; +public: + SrsRtcUdpNetwork(SrsRtcNetwork* network); + virtual ~SrsRtcUdpNetwork(); +public: + // Update the UDP connection. + void update_sendonly_socket(SrsUdpMuxSocket* skt); +// DTLS transport functions. +public: + srs_error_t initialize(SrsSessionConfig* cfg, bool dtls, bool srtp); + virtual srs_error_t start_active_handshake(); + virtual srs_error_t on_dtls(char* data, int nb_data); + virtual srs_error_t on_dtls_alert(std::string type, std::string desc); + srs_error_t on_connection_established(); + srs_error_t protect_rtp(void* packet, int* nb_cipher); + srs_error_t protect_rtcp(void* packet, int* nb_cipher); + srs_error_t unprotect_rtp(void* packet, int* nb_plaintext); + srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext); +// When got data from socket. +public: + srs_error_t on_rtcp(char* data, int nb_data); + srs_error_t on_rtp(char* data, int nb_data); +// Other functions. +public: + // ICE reflexive address functions. + std::string get_peer_ip(); + int get_peer_port(); +// Interface ISrsStreamWriter. +public: + virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite); +}; + +// For WebRTC over TCP. +class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler +{ +private: + // The manager object to manage the connection. + ISrsResourceManager* manager_; + // Use a coroutine to serve the TCP connection. + SrsCoroutine* trd_; + // The ip and port of client. + std::string ip_; + int port_; + // The delta for statistic. + SrsNetworkDelta* delta_; + // TCP Transport object. + SrsTcpConnection* skt_; +public: + SrsRtcTcpConn(srs_netfd_t fd, std::string cip, int port, ISrsResourceManager* cm); + virtual ~SrsRtcTcpConn(); +public: + ISrsKbpsDelta* delta(); +// Interface ISrsResource. +public: + virtual std::string desc(); + virtual const SrsContextId& get_id(); +// Interface ISrsConnection. +public: + virtual std::string remote_ip(); +// Interface ISrsStartable +public: + virtual srs_error_t start(); +// Interface ISrsCoroutineHandler +public: + virtual srs_error_t cycle(); +private: + virtual srs_error_t do_cycle(); +}; + +#endif + diff --git a/trunk/src/app/srs_app_rtc_sdp.cpp b/trunk/src/app/srs_app_rtc_sdp.cpp index 09d4a53b1f..c952b62a7c 100644 --- a/trunk/src/app/srs_app_rtc_sdp.cpp +++ b/trunk/src/app/srs_app_rtc_sdp.cpp @@ -434,11 +434,22 @@ srs_error_t SrsMediaDesc::encode(std::ostringstream& os) // @see: https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-00#section-4.2 uint32_t priority = (1<<24)*(126) + (1<<8)*(65535) + (1)*(256 - component_id); + // See ICE TCP at https://www.rfc-editor.org/rfc/rfc6544 + if (iter->protocol_ == "tcp") { + os << "a=candidate:" << foundation++ << " " + << component_id << " tcp " << priority << " " + << iter->ip_ << " " << iter->port_ + << " typ " << iter->type_ + << " tcptype passive" + << kCRLF; + continue; + } + // @see: https://tools.ietf.org/id/draft-ietf-mmusic-ice-sip-sdp-14.html#rfc.section.5.1 os << "a=candidate:" << foundation++ << " " << component_id << " udp " << priority << " " << iter->ip_ << " " << iter->port_ - << " typ " << iter->type_ + << " typ " << iter->type_ << " generation 0" << kCRLF; srs_verbose("local SDP candidate line=%s", os.str().c_str()); @@ -885,10 +896,11 @@ void SrsSdp::set_fingerprint(const std::string& fingerprint) } } -void SrsSdp::add_candidate(const std::string& ip, const int& port, const std::string& type) +void SrsSdp::add_candidate(const std::string& protocol, const std::string& ip, const int& port, const std::string& type) { // @see: https://tools.ietf.org/id/draft-ietf-mmusic-ice-sip-sdp-14.html#rfc.section.5.1 SrsCandidate candidate; + candidate.protocol_ = protocol; candidate.ip_ = ip; candidate.port_ = port; candidate.type_ = type; diff --git a/trunk/src/app/srs_app_rtc_sdp.hpp b/trunk/src/app/srs_app_rtc_sdp.hpp index 1569e7ba15..63658d3f4a 100644 --- a/trunk/src/app/srs_app_rtc_sdp.hpp +++ b/trunk/src/app/srs_app_rtc_sdp.hpp @@ -112,6 +112,7 @@ class SrsMediaPayloadType struct SrsCandidate { + std::string protocol_; std::string ip_; int port_; std::string type_; @@ -187,7 +188,7 @@ class SrsSdp void set_dtls_role(const std::string& dtls_role); void set_fingerprint_algo(const std::string& algo); void set_fingerprint(const std::string& fingerprint); - void add_candidate(const std::string& ip, const int& port, const std::string& type); + void add_candidate(const std::string& protocol, const std::string& ip, const int& port, const std::string& type); std::string get_ice_ufrag() const; std::string get_ice_pwd() const; diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index de89eefb28..879d148af4 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -29,6 +29,7 @@ using namespace std; #include #include #include +#include extern SrsPps* _srs_pps_rpkts; SrsPps* _srs_pps_rstuns = NULL; @@ -269,22 +270,6 @@ static set discover_candidates(SrsRtcUserConfig* ruc) return candidate_ips; } -ISrsRtcServerHandler::ISrsRtcServerHandler() -{ -} - -ISrsRtcServerHandler::~ISrsRtcServerHandler() -{ -} - -ISrsRtcServerHijacker::ISrsRtcServerHijacker() -{ -} - -ISrsRtcServerHijacker::~ISrsRtcServerHijacker() -{ -} - SrsRtcUserConfig::SrsRtcUserConfig() { req_ = new SrsRequest(); @@ -299,8 +284,6 @@ SrsRtcUserConfig::~SrsRtcUserConfig() SrsRtcServer::SrsRtcServer() { - handler = NULL; - hijacker = NULL; async = new SrsAsyncCallWorker(); _srs_config->subscribe(this); @@ -345,16 +328,6 @@ srs_error_t SrsRtcServer::on_reload_rtc_server() return srs_success; } -void SrsRtcServer::set_handler(ISrsRtcServerHandler* h) -{ - handler = h; -} - -void SrsRtcServer::set_hijacker(ISrsRtcServerHijacker* h) -{ - hijacker = h; -} - srs_error_t SrsRtcServer::exec_async_work(ISrsAsyncCallTask * t) { return async->execute(t); @@ -416,21 +389,6 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) session->alive(); } - // Notify hijack to handle the UDP packet. - if (hijacker && is_rtp_or_rtcp && is_rtcp) { - bool consumed = false; - if (session) { - session->switch_to_context(); - } - if ((err = hijacker->on_udp_packet(skt, session, &consumed)) != srs_success) { - return srs_error_wrap(err, "hijack consumed=%u", consumed); - } - - if (consumed) { - return err; - } - } - // For STUN, the peer address may change. if (!is_rtp_or_rtcp && srs_is_stun((uint8_t*)data, size)) { ++_srs_pps_rstuns->sugar; @@ -456,7 +414,12 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) ping.get_username().c_str(), peer_id.c_str(), fast_id); } - return session->on_stun(skt, &ping); + // For each binding request, update the UDP socket. + if (ping.is_binding_request()) { + session->udp()->update_sendonly_socket(skt); + } + + return session->on_stun(&ping, data, size); } // For DTLS, RTCP or RTP, which does not support peer address changing. @@ -584,15 +547,21 @@ srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* ruc, SrsSdp& local // We allows to mock the eip of server. if (true) { int listen_port = _srs_config->get_rtc_server_listen(); + string protocol = _srs_config->get_rtc_server_protocol(); set candidates = discover_candidates(ruc); for (set::iterator it = candidates.begin(); it != candidates.end(); ++it) { string hostname; int port = listen_port; srs_parse_hostport(*it, hostname, port); - local_sdp.add_candidate(hostname, port, "host"); + if (protocol == "udp" || protocol == "tcp") { + local_sdp.add_candidate(protocol, hostname, port, "host"); + } else { + local_sdp.add_candidate("udp", hostname, port, "host"); + local_sdp.add_candidate("tcp", hostname, port, "host"); + } } vector v = vector(candidates.begin(), candidates.end()); - srs_trace("RTC: Use candidates %s", srs_join_vector_string(v, ", ").c_str()); + srs_trace("RTC: Use candidates %s, protocol=%s", srs_join_vector_string(v, ", ").c_str(), protocol.c_str()); } // Setup the negotiate DTLS by config. diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 498ae58757..35f6572235 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -48,28 +48,6 @@ class SrsRtcBlackhole extern SrsRtcBlackhole* _srs_blackhole; -// The handler for RTC server to call. -class ISrsRtcServerHandler -{ -public: - ISrsRtcServerHandler(); - virtual ~ISrsRtcServerHandler(); -public: - // When server detect the timeout for session object. - virtual void on_timeout(SrsRtcConnection* session) = 0; -}; - -// The hijacker to hook server. -class ISrsRtcServerHijacker -{ -public: - ISrsRtcServerHijacker(); - virtual ~ISrsRtcServerHijacker(); -public: - // If consumed set to true, server will ignore the packet. - virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt, SrsRtcConnection* session, bool* pconsumed) = 0; -}; - // The user config for RTC publish or play. class SrsRtcUserConfig { @@ -95,8 +73,6 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs { private: std::vector listeners; - ISrsRtcServerHandler* handler; - ISrsRtcServerHijacker* hijacker; SrsAsyncCallWorker* async; public: SrsRtcServer(); @@ -107,9 +83,6 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs public: virtual srs_error_t on_reload_rtc_server(); public: - // Set the handler for server events. - void set_handler(ISrsRtcServerHandler* h); - void set_hijacker(ISrsRtcServerHijacker* h); srs_error_t exec_async_work(ISrsAsyncCallTask* t); public: // TODO: FIXME: Support gracefully quit. diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 0b10bd5e0f..94f2890b00 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -36,6 +36,7 @@ using namespace std; #include #include #include +#include std::string srs_listener_type2string(SrsListenerType type) { @@ -54,6 +55,8 @@ std::string srs_listener_type2string(SrsListenerType type) return "MPEG-TS over UDP"; case SrsListenerFlv: return "HTTP-FLV"; + case SrsListenerTcp: + return "TCP"; default: return "UNKONWN"; } @@ -583,6 +586,7 @@ void SrsServer::dispose() close_listeners(SrsListenerHttpsStream); close_listeners(SrsListenerMpegTsOverUdp); close_listeners(SrsListenerFlv); + close_listeners(SrsListenerTcp); // Fast stop to notify FFMPEG to quit, wait for a while then fast kill. ingester->dispose(); @@ -609,6 +613,7 @@ void SrsServer::gracefully_dispose() close_listeners(SrsListenerHttpsStream); close_listeners(SrsListenerMpegTsOverUdp); close_listeners(SrsListenerFlv); + close_listeners(SrsListenerTcp); srs_trace("listeners closed"); // Fast stop to notify FFMPEG to quit, wait for a while then fast kill. @@ -738,6 +743,23 @@ srs_error_t SrsServer::listen() if ((err = listen_stream_caster()) != srs_success) { return srs_error_wrap(err, "stream caster listen"); } + + // TODO: FIXME: Refine the listeners. + close_listeners(SrsListenerTcp); + if (_srs_config->get_rtc_server_tcp_enabled()) { + SrsListener* listener = new SrsBufferListener(this, SrsListenerTcp); + listeners.push_back(listener); + + std::string ep = srs_int2str(_srs_config->get_rtc_server_tcp_listen()); + + std::string ip; + int port; + srs_parse_endpoint(ep, ip, port); + + if ((err = listener->listen(ip, port)) != srs_success) { + return srs_error_wrap(err, "tcp listen %s:%d", ip.c_str(), port); + } + } if ((err = conn_manager->start()) != srs_success) { return srs_error_wrap(err, "connection manager"); @@ -1354,6 +1376,12 @@ void SrsServer::resample_kbps() continue; } + SrsRtcTcpConn* tcp = dynamic_cast(c); + if (tcp) { + stat->kbps_add_delta(c->get_id().c_str(), tcp->delta()); + continue; + } + // Impossible path, because we only create these connections above. srs_assert(false); } @@ -1447,6 +1475,8 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, IS *pr = new SrsHttpxConn(false, this, stfd, http_server, ip, port); } else if (type == SrsListenerHttpsStream) { *pr = new SrsHttpxConn(true, this, stfd, http_server, ip, port); + } else if (type == SrsListenerTcp) { + *pr = new SrsRtcTcpConn(stfd, ip, port, this); } else { srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port); srs_close_stfd(stfd); diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 36e6dae69b..723f84bd39 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -56,6 +56,8 @@ enum SrsListenerType SrsListenerHttpsApi = 8, // HTTPS stream, SrsListenerHttpsStream = 9, + // WebRTC over TCP, + SrsListenerTcp = 10, }; // A common tcp listener, for RTMP/HTTP server.