Skip to content

Commit

Permalink
For #2136: API: Cleanup no active streams for statistics. v5.0.42
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 19, 2022
1 parent 4edf333 commit e9d6601
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 43 deletions.
2 changes: 2 additions & 0 deletions trunk/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ bug
/research/thread-model/thread-local
*.gcp
*.svg
/3rdparty/st-srs/srs
/3rdparty/st-srs/.circleci
24 changes: 0 additions & 24 deletions trunk/3rdparty/st-srs/.circleci/config.yml

This file was deleted.

1 change: 0 additions & 1 deletion trunk/3rdparty/st-srs/srs

This file was deleted.

2 changes: 2 additions & 0 deletions trunk/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# Changelog

The changelog for SRS.
Expand All @@ -6,6 +7,7 @@ The changelog for SRS.

## SRS 5.0 Changelog

* v5.0, 2022-08-19, For [#2136](https://github.com/ossrs/srs/issues/2136): API: Cleanup no active streams for statistics. v5.0.42
* v5.0, 2022-08-14, Fix [#2747](https://github.com/ossrs/srs/issues/2747): Support Apple Silicon M1(aarch64). v5.0.41
* v5.0, 2022-08-12, Support crossbuild for hisiv500. v5.0.40
* v5.0, 2022-08-10, Build: Detect OS by packager. v5.0.39
Expand Down
86 changes: 69 additions & 17 deletions trunk/src/app/srs_app_statistic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj)
obj->set("publish", publish);

publish->set("active", SrsJsonAny::boolean(active));
publish->set("cid", SrsJsonAny::str(publisher_id.c_str()));
if (!publisher_id.empty()) {
publish->set("cid", SrsJsonAny::str(publisher_id.c_str()));
}

if (!has_video) {
obj->set("video", SrsJsonAny::null());
Expand Down Expand Up @@ -164,6 +166,11 @@ srs_error_t SrsStatisticStream::dumps(SrsJsonObject* obj)

void SrsStatisticStream::publish(std::string id)
{
// To prevent duplicated publish event by bridger.
if (active) {
return;
}

publisher_id = id;
active = true;

Expand All @@ -172,6 +179,11 @@ void SrsStatisticStream::publish(std::string id)

void SrsStatisticStream::close()
{
// To prevent duplicated close event.
if (!active) {
return;
}

has_video = false;
has_audio = false;
active = false;
Expand All @@ -186,11 +198,17 @@ SrsStatisticClient::SrsStatisticClient()
req = NULL;
type = SrsRtmpConnUnknown;
create = srs_get_system_time();

clk = new SrsWallClock();
kbps = new SrsKbps(clk);
kbps->set_io(NULL, NULL);
}

SrsStatisticClient::~SrsStatisticClient()
{
srs_freep(req);
srs_freep(kbps);
srs_freep(clk);
}

srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj)
Expand All @@ -208,6 +226,14 @@ srs_error_t SrsStatisticClient::dumps(SrsJsonObject* obj)
obj->set("type", SrsJsonAny::str(srs_client_type_string(type).c_str()));
obj->set("publish", SrsJsonAny::boolean(srs_client_type_is_publish(type)));
obj->set("alive", SrsJsonAny::number(srsu2ms(srs_get_system_time() - create) / 1000.0));
obj->set("send_bytes", SrsJsonAny::integer(kbps->get_send_bytes()));
obj->set("recv_bytes", SrsJsonAny::integer(kbps->get_recv_bytes()));

SrsJsonObject* okbps = SrsJsonAny::object();
obj->set("kbps", okbps);

okbps->set("recv_30s", SrsJsonAny::integer(kbps->get_recv_kbps_30s()));
okbps->set("send_30s", SrsJsonAny::integer(kbps->get_send_kbps_30s()));

return err;
}
Expand Down Expand Up @@ -363,22 +389,6 @@ void SrsStatistic::on_stream_close(SrsRequest* req)
SrsStatisticVhost* vhost = create_vhost(req);
SrsStatisticStream* stream = create_stream(vhost, req);
stream->close();

// TODO: FIXME: Should fix https://github.com/ossrs/srs/issues/803
if (true) {
std::map<std::string, SrsStatisticStream*>::iterator it;
if ((it=streams.find(stream->id)) != streams.end()) {
streams.erase(it);
}
}

// TODO: FIXME: Should fix https://github.com/ossrs/srs/issues/803
if (true) {
std::map<std::string, SrsStatisticStream*>::iterator it;
if ((it = rstreams.find(stream->url)) != rstreams.end()) {
rstreams.erase(it);
}
}
}

srs_error_t SrsStatistic::on_client(std::string id, SrsRequest* req, ISrsExpire* conn, SrsRtmpConnType type)
Expand Down Expand Up @@ -429,6 +439,40 @@ void SrsStatistic::on_disconnect(std::string id)

stream->nb_clients--;
vhost->nb_clients--;

cleanup_stream(stream);
}

void SrsStatistic::cleanup_stream(SrsStatisticStream* stream)
{
// If stream has publisher(not active) or player(clients), never cleanup it.
if (stream->active || stream->nb_clients > 0) {
return;
}

// There should not be any clients referring to the stream.
for (std::map<std::string, SrsStatisticClient*>::iterator it = clients.begin(); it != clients.end(); ++it) {
SrsStatisticClient* client = it->second;
srs_assert(client->stream != stream);
}

// Do cleanup streams.
if (true) {
std::map<std::string, SrsStatisticStream *>::iterator it;
if ((it = streams.find(stream->id)) != streams.end()) {
streams.erase(it);
}
}

if (true) {
std::map<std::string, SrsStatisticStream *>::iterator it;
if ((it = rstreams.find(stream->url)) != rstreams.end()) {
rstreams.erase(it);
}
}

// It's safe to delete the stream now.
srs_freep(stream);
}

void SrsStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta* delta)
Expand All @@ -446,6 +490,7 @@ void SrsStatistic::kbps_add_delta(std::string id, ISrsKbpsDelta* delta)
// add delta of connection to kbps.
// for next sample() of server kbps can get the stat.
kbps->add_delta(in, out);
client->kbps->add_delta(in, out);
client->stream->kbps->add_delta(in, out);
client->stream->vhost->kbps->add_delta(in, out);
}
Expand All @@ -467,6 +512,13 @@ SrsKbps* SrsStatistic::kbps_sample()
stream->kbps->sample();
}
}
if (true) {
std::map<std::string, SrsStatisticClient*>::iterator it;
for (it = clients.begin(); it != clients.end(); it++) {
SrsStatisticClient* client = it->second;
client->kbps->sample();
}
}

return kbps;
}
Expand Down
8 changes: 8 additions & 0 deletions trunk/src/app/srs_app_statistic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ struct SrsStatisticClient
SrsRtmpConnType type;
std::string id;
srs_utime_t create;
public:
// The stream total kbps.
SrsKbps* kbps;
SrsWallClock* clk;
public:
SrsStatisticClient();
virtual ~SrsStatisticClient();
Expand Down Expand Up @@ -169,6 +173,10 @@ class SrsStatistic
// only got the request object, so the client specified by id maybe not
// exists in stat.
virtual void on_disconnect(std::string id);
private:
// Cleanup the stream if stream is not active and for the last client.
void cleanup_stream(SrsStatisticStream* stream);
public:
// Sample the kbps, add delta bytes of conn.
// Use kbps_sample() to get all result of kbps stat.
virtual void kbps_add_delta(std::string id, ISrsKbpsDelta* delta);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core_version5.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

#define VERSION_MAJOR 5
#define VERSION_MINOR 0
#define VERSION_REVISION 41
#define VERSION_REVISION 42

#endif

0 comments on commit e9d6601

Please sign in to comment.