Skip to content

Commit

Permalink
For #1568, extract SrsSourceManager from SrsSource.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 16, 2020
1 parent fea293d commit 9dbd049
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 23 deletions.
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ srs_error_t SrsHttpStreamServer::hijack(ISrsHttpMessage* request, ISrsHttpHandle
}

SrsSource* s = NULL;
if ((err = SrsSource::fetch_or_create(r, server, &s)) != srs_success) {
if ((err = _srs_sources->fetch_or_create(r, server, &s)) != srs_success) {
return srs_error_wrap(err, "source create");
}
srs_assert(s != NULL);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ srs_error_t SrsRtmpConn::stream_service_cycle()

// find a source to serve.
SrsSource* source = NULL;
if ((err = SrsSource::fetch_or_create(req, server, &source)) != srs_success) {
if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {
return srs_error_wrap(err, "rtmp: fetch source");
}
srs_assert(source != NULL);
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ void SrsServer::dispose()
// @remark don't dispose ingesters, for too slow.

// dispose the source for hls and dvr.
SrsSource::dispose_all();
_srs_sources->dispose();

// @remark don't dispose all connections, for too slow.

Expand Down Expand Up @@ -952,7 +952,7 @@ srs_error_t SrsServer::do_cycle()
}

// notice the stream sources to cycle.
if ((err = SrsSource::cycle_all()) != srs_success) {
if ((err = _srs_sources->cycle()) != srs_success) {
return srs_error_wrap(err, "source cycle");
}

Expand Down
31 changes: 22 additions & 9 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1635,9 +1635,17 @@ srs_error_t SrsMetaCache::update_vsh(SrsSharedPtrMessage* msg)
return vformat->on_video(msg);
}

std::map<std::string, SrsSource*> SrsSource::pool;
SrsSourceManager* _srs_sources = new SrsSourceManager();

srs_error_t SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
SrsSourceManager::SrsSourceManager()
{
}

SrsSourceManager::~SrsSourceManager()
{
}

srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -1665,7 +1673,7 @@ srs_error_t SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsS
return err;
}

SrsSource* SrsSource::fetch(SrsRequest* r)
SrsSource* SrsSourceManager::fetch(SrsRequest* r)
{
SrsSource* source = NULL;

Expand All @@ -1679,12 +1687,12 @@ SrsSource* SrsSource::fetch(SrsRequest* r)
// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->req->update_auth(r);
source->update_auth(r);

return source;
}

void SrsSource::dispose_all()
void SrsSourceManager::dispose()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
Expand All @@ -1694,16 +1702,16 @@ void SrsSource::dispose_all()
return;
}

srs_error_t SrsSource::cycle_all()
srs_error_t SrsSourceManager::cycle()
{
int cid = _srs_context->get_id();
srs_error_t err = do_cycle_all();
srs_error_t err = do_cycle();
_srs_context->set_id(cid);

return err;
}

srs_error_t SrsSource::do_cycle_all()
srs_error_t SrsSourceManager::do_cycle()
{
srs_error_t err = srs_success;

Expand Down Expand Up @@ -1744,7 +1752,7 @@ srs_error_t SrsSource::do_cycle_all()
return err;
}

void SrsSource::destroy()
void SrsSourceManager::destroy()
{
std::map<std::string, SrsSource*>::iterator it;
for (it = pool.begin(); it != pool.end(); ++it) {
Expand Down Expand Up @@ -1994,6 +2002,11 @@ bool SrsSource::inactive()
return _can_publish;
}

void SrsSource::update_auth(SrsRequest* r)
{
req->update_auth(r);
}

bool SrsSource::can_publish(bool is_edge)
{
if (is_edge) {
Expand Down
33 changes: 23 additions & 10 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,32 +438,43 @@ class SrsMetaCache
virtual srs_error_t update_vsh(SrsSharedPtrMessage* msg);
};

// live streaming source.
class SrsSource : public ISrsReloadHandler
// The source manager to create and refresh all stream sources.
class SrsSourceManager
{
friend class SrsOriginHub;
private:
static std::map<std::string, SrsSource*> pool;
std::map<std::string, SrsSource*> pool;
public:
SrsSourceManager();
virtual ~SrsSourceManager();
public:
// create source when fetch from cache failed.
// @param r the client request.
// @param h the event handler for source.
// @param pps the matched source, if success never be NULL.
static srs_error_t fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
virtual srs_error_t fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
private:
// Get the exists source, NULL when not exists.
// update the request and return the exists source.
static SrsSource* fetch(SrsRequest* r);
virtual SrsSource* fetch(SrsRequest* r);
public:
// dispose and cycle all sources.
static void dispose_all();
static srs_error_t cycle_all();
virtual void dispose();
virtual srs_error_t cycle();
private:
static srs_error_t do_cycle_all();
virtual srs_error_t do_cycle();
public:
// when system exit, destroy the sources,
// For gmc to analysis mem leaks.
static void destroy();
virtual void destroy();
};

// Global singleton instance.
extern SrsSourceManager* _srs_sources;

// live streaming source.
class SrsSource : public ISrsReloadHandler
{
friend class SrsOriginHub;
private:
// For publish, it's the publish client id.
// For edge, it's the edge ingest id.
Expand Down Expand Up @@ -531,6 +542,8 @@ class SrsSource : public ISrsReloadHandler
// Whether source is inactive, which means there is no publishing stream source.
// @remark For edge, it's inactive util stream has been pulled from origin.
virtual bool inactive();
// Update the authentication information in request.
virtual void update_auth(SrsRequest* r);
public:
virtual bool can_publish(bool is_edge);
virtual srs_error_t on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
Expand Down

0 comments on commit 9dbd049

Please sign in to comment.