Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTC: Support statistic for HTTP-API, HTTP-Callback and Security #2483

Merged
merged 8 commits into from
Jul 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 33 additions & 45 deletions trunk/src/app/srs_app_rtc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
if (clientip.empty()) {
clientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();
}

string api;
if ((prop = req->ensure_property_string("api")) != NULL) {
Expand All @@ -105,17 +108,19 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
tid = prop->to_str();
}

// TODO: FIXME: Parse vhost.
// Parse app and stream from streamurl.
string app;
string stream_name;
if (true) {
string tcUrl;
srs_parse_rtmp_url(streamurl, tcUrl, stream_name);
// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.req_->ip = clientip;

srs_parse_rtmp_url(streamurl, ruc.req_->tcUrl, ruc.req_->stream);

int port;
string schema, host, vhost, param;
srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);
srs_discovery_tc_url(ruc.req_->tcUrl, ruc.req_->schema, ruc.req_->host, ruc.req_->vhost,
ruc.req_->app, ruc.req_->stream, ruc.req_->port, ruc.req_->param);

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(ruc.req_->vhost);
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

// For client to specifies the candidate(EIP) of server.
Expand All @@ -129,12 +134,10 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
string dtls = r->query_get("dtls");

srs_trace("RTC play %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s, srtp=%s, dtls=%s",
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(), remote_sdp_str.length(),
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app.c_str(), ruc.req_->stream.c_str(), remote_sdp_str.length(),
eip.c_str(), codec.c_str(), srtp.c_str(), dtls.c_str()
);

// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.eip_ = eip;
ruc.codec_ = codec;
ruc.publish_ = false;
Expand All @@ -155,16 +158,6 @@ srs_error_t SrsGoApiRtcPlay::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMe
return srs_error_wrap(err, "remote sdp check failed");
}

ruc.req_->app = app;
ruc.req_->stream = stream_name;

// TODO: FIXME: Parse vhost.
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost("");
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

SrsSdp local_sdp;

// Config for SDP and session.
Expand Down Expand Up @@ -288,6 +281,7 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt

// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
SrsAutoFree(SrsJsonObject, req);
if (true) {
string req_json;
if ((err = r->body_read_all(req_json)) != srs_success) {
Expand Down Expand Up @@ -318,6 +312,9 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
if ((prop = req->ensure_property_string("clientip")) != NULL) {
clientip = prop->to_str();
}
if (clientip.empty()){
clientip = dynamic_cast<SrsHttpMessage*>(r)->connection()->remote_ip();
}

string api;
if ((prop = req->ensure_property_string("api")) != NULL) {
Expand All @@ -329,16 +326,19 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
tid = prop->to_str();
}

// Parse app and stream from streamurl.
string app;
string stream_name;
if (true) {
string tcUrl;
srs_parse_rtmp_url(streamurl, tcUrl, stream_name);
// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.req_->ip = clientip;

srs_parse_rtmp_url(streamurl, ruc.req_->tcUrl, ruc.req_->stream);

int port;
string schema, host, vhost, param;
srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);
srs_discovery_tc_url(ruc.req_->tcUrl, ruc.req_->schema, ruc.req_->host, ruc.req_->vhost,
ruc.req_->app, ruc.req_->stream, ruc.req_->port, ruc.req_->param);

// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(ruc.req_->vhost);
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

// For client to specifies the candidate(EIP) of server.
Expand All @@ -349,12 +349,10 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
string codec = r->query_get("codec");

srs_trace("RTC publish %s, api=%s, tid=%s, clientip=%s, app=%s, stream=%s, offer=%dB, eip=%s, codec=%s",
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), app.c_str(), stream_name.c_str(),
streamurl.c_str(), api.c_str(), tid.c_str(), clientip.c_str(), ruc.req_->app.c_str(), ruc.req_->stream.c_str(),
remote_sdp_str.length(), eip.c_str(), codec.c_str()
);

// The RTC user config object.
SrsRtcUserConfig ruc;
ruc.eip_ = eip;
ruc.codec_ = codec;
ruc.publish_ = true;
Expand All @@ -369,16 +367,6 @@ srs_error_t SrsGoApiRtcPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHtt
return srs_error_wrap(err, "remote sdp check failed");
}

ruc.req_->app = app;
ruc.req_->stream = stream_name;

// TODO: FIXME: Parse vhost.
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost("");
if (parsed_vhost) {
ruc.req_->vhost = parsed_vhost->arg0();
}

SrsSdp local_sdp;

// TODO: FIXME: move to create_session.
Expand Down
25 changes: 25 additions & 0 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ SrsRtcPlayStream::~SrsRtcPlayStream()
srs_freep(it->second);
}
}

// update the statistic when client coveried.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_disconnect(cid_.c_str());
}

srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::map<uint32_t, SrsRtcTrackDescription*> sub_relations)
Expand Down Expand Up @@ -529,6 +533,12 @@ srs_error_t SrsRtcPlayStream::start()
return srs_error_wrap(err, "on start play");
}
}

// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req_, session_, SrsRtcConnPlay)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}

is_started = true;

Expand Down Expand Up @@ -998,6 +1008,10 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
srs_freep(twcc_epp_);
srs_freep(pli_epp);
srs_freep(req);

// update the statistic when client coveried.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_disconnect(cid_.c_str());
}

srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescription* stream_desc)
Expand Down Expand Up @@ -1112,6 +1126,12 @@ srs_error_t SrsRtcPublishStream::start()
return srs_error_wrap(err, "on start publish");
}
}

// update the statistic when client discoveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(cid_.c_str(), req, session_, SrsRtcConnPublish)) != srs_success) {
return srs_error_wrap(err, "rtc: stat client");
}

is_started = true;

Expand Down Expand Up @@ -1823,6 +1843,11 @@ std::string SrsRtcConnection::desc()
return "RtcConn";
}

void SrsRtcConnection::expire()
{
_srs_rtc_manager->remove(this);
}

void SrsRtcConnection::switch_to_context()
{
_srs_context->set_id(cid_);
Expand Down
5 changes: 4 additions & 1 deletion trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class SrsRtcConnectionNackTimer : public ISrsFastTimer
//
// For performance, we use non-public from resource,
// see https://stackoverflow.com/questions/3747066/c-cannot-convert-from-base-a-to-derived-type-b-via-virtual-base-a
class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler
class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler, public ISrsExpire
{
friend class SrsSecurityTransport;
friend class SrsRtcPlayStream;
Expand Down Expand Up @@ -486,6 +486,9 @@ class SrsRtcConnection : public ISrsResource, public ISrsDisposingHandler
public:
virtual const SrsContextId& get_id();
virtual std::string desc();
// Interface ISrsExpire.
public:
virtual void expire();
Copy link
Member

@winlinvip winlinvip Jul 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface supports kickoff. We need to test if there will be any issues.

        client->conn->expire();
        srs_warn("kickoff client id=%s ok", client_id.c_str());

TRANS_BY_GPT3

Copy link
Member Author

@duiniuluantanqin duiniuluantanqin Jul 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is used for the kickoff feature. I have tested it locally and it passed.

TRANS_BY_GPT3

public:
void switch_to_context();
const SrsContextId& context_id();
Expand Down
7 changes: 5 additions & 2 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <srs_app_pithy_print.hpp>
#include <srs_app_log.hpp>
#include <srs_app_threads.hpp>
#include <srs_app_statistic.hpp>

#ifdef SRS_FFMPEG_FIT
#include <srs_app_rtc_codec.hpp>
Expand Down Expand Up @@ -540,7 +541,8 @@ srs_error_t SrsRtcSource::on_publish()
_srs_hybrid->timer100ms()->subscribe(this);
}

// TODO: FIXME: Handle by statistic.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(req, _source_id.c_str());

return err;
}
Expand Down Expand Up @@ -576,7 +578,8 @@ void SrsRtcSource::on_unpublish()
srs_freep(bridger_);
}

// TODO: FIXME: Handle by statistic.
SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_close(req);
}

void SrsRtcSource::subscribe(ISrsRtcSourceEventHandler* h)
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_security.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ srs_error_t SrsSecurity::allow_check(SrsConfDirective* rules, SrsRtmpConnType ty

switch (type) {
case SrsRtmpConnPlay:
case SrsRtcConnPlay:
if (rule->arg0() != "play") {
break;
}
Expand All @@ -82,6 +83,7 @@ srs_error_t SrsSecurity::allow_check(SrsConfDirective* rules, SrsRtmpConnType ty
case SrsRtmpConnFMLEPublish:
case SrsRtmpConnFlashPublish:
case SrsRtmpConnHaivisionPublish:
case SrsRtcConnPublish:
if (rule->arg0() != "publish") {
break;
}
Expand Down Expand Up @@ -112,6 +114,7 @@ srs_error_t SrsSecurity::deny_check(SrsConfDirective* rules, SrsRtmpConnType typ

switch (type) {
case SrsRtmpConnPlay:
case SrsRtcConnPlay:
if (rule->arg0() != "play") {
break;
}
Expand All @@ -122,6 +125,7 @@ srs_error_t SrsSecurity::deny_check(SrsConfDirective* rules, SrsRtmpConnType typ
case SrsRtmpConnFMLEPublish:
case SrsRtmpConnFlashPublish:
case SrsRtmpConnHaivisionPublish:
case SrsRtcConnPublish:
if (rule->arg0() != "publish") {
break;
}
Expand Down
6 changes: 4 additions & 2 deletions trunk/src/protocol/srs_rtmp_stack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1628,17 +1628,19 @@ SrsResponse::~SrsResponse()
string srs_client_type_string(SrsRtmpConnType type)
{
switch (type) {
case SrsRtmpConnPlay: return "Play";
case SrsRtmpConnPlay: return "rtmp-play";
case SrsRtcConnPlay: return "rtc-play";
case SrsRtmpConnFlashPublish: return "flash-publish";
case SrsRtmpConnFMLEPublish: return "fmle-publish";
case SrsRtmpConnHaivisionPublish: return "haivision-publish";
case SrsRtcConnPublish: return "rtc-publish";
default: return "Unknown";
}
}

bool srs_client_type_is_publish(SrsRtmpConnType type)
{
return type != SrsRtmpConnPlay;
return ((type != SrsRtmpConnPlay) && (type != SrsRtcConnPlay));
}

SrsHandshakeBytes::SrsHandshakeBytes()
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/protocol/srs_rtmp_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ enum SrsRtmpConnType
SrsRtmpConnFMLEPublish,
SrsRtmpConnFlashPublish,
SrsRtmpConnHaivisionPublish,
SrsRtcConnPlay,
SrsRtcConnPublish,
};
std::string srs_client_type_string(SrsRtmpConnType type);
bool srs_client_type_is_publish(SrsRtmpConnType type);
Expand Down
Loading