Skip to content

Commit

Permalink
RTC: Support statistic for HTTP-API, HTTP-Callback and Security (#2483)
Browse files Browse the repository at this point in the history
* commit message for your changes. Lines starting

* Update srs_app_rtc_api.cpp

* add SrsRtcConnPlay and SrsRtcConnPublish, in enum SrsRtmpConnType

* Update srs_rtmp_stack.cpp

* Update srs_app_rtc_conn.cpp

* Update srs_app_rtc_api.cpp

* update utest

* Update srs_utest_app.cpp
  • Loading branch information
duiniuluantanqin authored Jul 24, 2021
1 parent 20931dd commit 0efd7b1
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 51 deletions.
78 changes: 33 additions & 45 deletions trunk/src/app/srs_app_rtc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
if (clientip.empty()) {
clientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();
}

string api;
if ((prop = req->ensure_property_string("api")) != NULL) {
Expand All @@ -105,17 +108,19 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
tid = prop->to_str();
}

// TODO: FIXME: Parse vhost.
// Parse app and stream from streamurl.
string app;
string stream_name;
if (true) {
string tcUrl;
srs_parse_rtmp_url(streamurl, tcUrl, stream_name);
// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.req_->ip = clientip;

srs_parse_rtmp_url(streamurl, ruc.req_->tcUrl, ruc.req_->stream);

int port;
string schema, host, vhost, param;
srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);
srs_discovery_tc_url(ruc.req_->tcUrl, ruc.req_->schema, ruc.req_->host, ruc.req_->vhost,
ruc.req_->app, ruc.req_->stream, ruc.req_->port, ruc.req_->param);

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(ruc.req_->vhost);
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

// For client to specifies the candidate(EIP) of server.
Expand All @@ -129,12 +134,10 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
string dtls = r->query_get("dtls");

srs_trace("RTC play %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s, srtp=%s, dtls=%s",
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length(),
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app.c_str(), ruc.req_->stream.c_str(), remote_sdp_str.length(),
eip.c_str(), codec.c_str(), srtp.c_str(), dtls.c_str()
);

// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.eip_ = eip;
ruc.codec_ = codec;
ruc.publish_ = false;
Expand All @@ -155,16 +158,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
return srs_error_wrap(err, "remote sdp check failed");
}

ruc.req_->app = app;
ruc.req_->stream = stream_name;

// TODO: FIXME: Parse vhost.
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost("");
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

SrsSdp local_sdp;

// Config for SDP and session.
Expand Down Expand Up @@ -288,6 +281,7 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt

// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
SrsAutoFree(SrsJsonObject, req);
if (true) {
string req_json;
if ((err = r->body_read_all(req_json)) != srs_success) {
Expand Down Expand Up @@ -318,6 +312,9 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
if (clientip.empty()){
clientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();
}

string api;
if ((prop = req->ensure_property_string("api")) != NULL) {
Expand All @@ -329,16 +326,19 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
tid = prop->to_str();
}

// Parse app and stream from streamurl.
string app;
string stream_name;
if (true) {
string tcUrl;
srs_parse_rtmp_url(streamurl, tcUrl, stream_name);
// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.req_->ip = clientip;

srs_parse_rtmp_url(streamurl, ruc.req_->tcUrl, ruc.req_->stream);

int port;
string schema, host, vhost, param;
srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);
srs_discovery_tc_url(ruc.req_->tcUrl, ruc.req_->schema, ruc.req_->host, ruc.req_->vhost,
ruc.req_->app, ruc.req_->stream, ruc.req_->port, ruc.req_->param);

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(ruc.req_->vhost);
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

// For client to specifies the candidate(EIP) of server.
Expand All @@ -349,12 +349,10 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
string codec = r->query_get("codec");

srs_trace("RTC publish %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s",
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(),
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app.c_str(), ruc.req_->stream.c_str(),
remote_sdp_str.length(), eip.c_str(), codec.c_str()
);

// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.eip_ = eip;
ruc.codec_ = codec;
ruc.publish_ = true;
Expand All @@ -369,16 +367,6 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
return srs_error_wrap(err, "remote sdp check failed");
}

ruc.req_->app = app;
ruc.req_->stream = stream_name;

// TODO: FIXME: Parse vhost.
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost("");
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

SrsSdp local_sdp;

// TODO: FIXME: move to create_session.
Expand Down
25 changes: 25 additions & 0 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ SrsRtcPlayStream::~SrsRtcPlayStream()
srs_freep(it->second);
}
}

// update the statistic when client coveried.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_disconnect(cid_.c_str());
}

srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations)
Expand Down Expand Up @@ -529,6 +533,12 @@ srs_error_t SrsRtcPlayStream::start()
return srs_error_wrap(err, "on start play");
}
}

// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPlay)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}

is_started = true;

Expand Down Expand Up @@ -998,6 +1008,10 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
srs_freep(twcc_epp_);
srs_freep(pli_epp);
srs_freep(req);

// update the statistic when client coveried.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_disconnect(cid_.c_str());
}

srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescription* stream_desc)
Expand Down Expand Up @@ -1112,6 +1126,12 @@ srs_error_t SrsRtcPublishStream::start()
return srs_error_wrap(err, "on start publish");
}
}

// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req, session_, SrsRtcConnPublish)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}

is_started = true;

Expand Down Expand Up @@ -1823,6 +1843,11 @@ std::string SrsRtcConnection::desc()
return "RtcConn";
}

void SrsRtcConnection::expire()
{
_srs_rtc_manager->remove(this);
}

void SrsRtcConnection::switch_to_context()
{
_srs_context->set_id(cid_);
Expand Down
5 changes: 4 additions & 1 deletion trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class SrsRtcConnectionNackTimer : public ISrsFastTimer
//
// For performance, we use non-public from resource,
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler
class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, public ISrsExpire
{
friend class SrsSecurityTransport;
friend class SrsRtcPlayStream;
Expand Down Expand Up @@ -486,6 +486,9 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler
public:
virtual const SrsContextId& get_id();
virtual std::string desc();
// Interface ISrsExpire.
public:
virtual void expire();
public:
void switch_to_context();
const SrsContextId& context_id();
Expand Down
7 changes: 5 additions & 2 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <srs_app_pithy_print.hpp>
#include <srs_app_log.hpp>
#include <srs_app_threads.hpp>
#include <srs_app_statistic.hpp>

#ifdef SRS_FFMPEG_FIT
#include <srs_app_rtc_codec.hpp>
Expand Down Expand Up @@ -540,7 +541,8 @@ srs_error_t SrsRtcSource::on_publish()
_srs_hybrid->timer100ms()->subscribe(this);
}

// TODO: FIXME: Handle by statistic.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(req, _source_id.c_str());

return err;
}
Expand Down Expand Up @@ -576,7 +578,8 @@ void SrsRtcSource::on_unpublish()
srs_freep(bridger_);
}

// TODO: FIXME: Handle by statistic.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_close(req);
}

void SrsRtcSource::subscribe(ISrsRtcSourceEventHandler* h)
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_security.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ srs_error_t SrsSecurity::allow_check(SrsConfDirective* rules, SrsRtmpConnType ty

switch (type) {
case SrsRtmpConnPlay:
case SrsRtcConnPlay:
if (rule->arg0() != "play") {
break;
}
Expand All @@ -82,6 +83,7 @@ srs_error_t SrsSecurity::allow_check(SrsConfDirective* rules, SrsRtmpConnType ty
case SrsRtmpConnFMLEPublish:
case SrsRtmpConnFlashPublish:
case SrsRtmpConnHaivisionPublish:
case SrsRtcConnPublish:
if (rule->arg0() != "publish") {
break;
}
Expand Down Expand Up @@ -112,6 +114,7 @@ srs_error_t SrsSecurity::deny_check(SrsConfDirective* rules, SrsRtmpConnType typ

switch (type) {
case SrsRtmpConnPlay:
case SrsRtcConnPlay:
if (rule->arg0() != "play") {
break;
}
Expand All @@ -122,6 +125,7 @@ srs_error_t SrsSecurity::deny_check(SrsConfDirective* rules, SrsRtmpConnType typ
case SrsRtmpConnFMLEPublish:
case SrsRtmpConnFlashPublish:
case SrsRtmpConnHaivisionPublish:
case SrsRtcConnPublish:
if (rule->arg0() != "publish") {
break;
}
Expand Down
6 changes: 4 additions & 2 deletions trunk/src/protocol/srs_rtmp_stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1628,17 +1628,19 @@ SrsResponse::~SrsResponse()
string srs_client_type_string(SrsRtmpConnType type)
{
switch (type) {
case SrsRtmpConnPlay: return "Play";
case SrsRtmpConnPlay: return "rtmp-play";
case SrsRtcConnPlay: return "rtc-play";
case SrsRtmpConnFlashPublish: return "flash-publish";
case SrsRtmpConnFMLEPublish: return "fmle-publish";
case SrsRtmpConnHaivisionPublish: return "haivision-publish";
case SrsRtcConnPublish: return "rtc-publish";
default: return "Unknown";
}
}

bool srs_client_type_is_publish(SrsRtmpConnType type)
{
return type != SrsRtmpConnPlay;
return ((type != SrsRtmpConnPlay) && (type != SrsRtcConnPlay));
}

SrsHandshakeBytes::SrsHandshakeBytes()
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/protocol/srs_rtmp_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ enum SrsRtmpConnType
SrsRtmpConnFMLEPublish,
SrsRtmpConnFlashPublish,
SrsRtmpConnHaivisionPublish,
SrsRtcConnPlay,
SrsRtcConnPublish,
};
std::string srs_client_type_string(SrsRtmpConnType type);
bool srs_client_type_is_publish(SrsRtmpConnType type);
Expand Down
Loading

0 comments on commit 0efd7b1

Please sign in to comment.