Skip to content

Commit

Permalink
Fix #908, use empty coroutine to avoid NULL pointer.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jun 4, 2017
1 parent 0e9e179 commit 9ca3697
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 41 deletions.
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_async_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()

SrsAsyncCallWorker::SrsAsyncCallWorker()
{
trd = NULL;
trd = new SrsDummyCoroutine();
wait = srs_cond_new();
}

Expand Down Expand Up @@ -74,7 +74,7 @@ int SrsAsyncCallWorker::count()
int SrsAsyncCallWorker::start()
{
srs_freep(trd);
trd = new SrsCoroutine("async", this, _srs_context->get_id());
trd = new SrsSTCoroutine("async", this, _srs_context->get_id());
return trd->start();
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ SrsConnection::SrsConnection(IConnectionManager* cm, srs_netfd_t c, string cip)
kbps = new SrsKbps();
kbps->set_io(skt, skt);

trd = new SrsCoroutine("conn", this);
trd = new SrsSTCoroutine("conn", this);
}

SrsConnection::~SrsConnection()
Expand Down
9 changes: 5 additions & 4 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ SrsEdgeIngester::SrsEdgeIngester()

upstream = new SrsEdgeRtmpUpstream(redirect);
lb = new SrsLbRoundRobin();
trd = NULL;
trd = new SrsDummyCoroutine();
}

SrsEdgeIngester::~SrsEdgeIngester()
Expand Down Expand Up @@ -199,7 +199,7 @@ int SrsEdgeIngester::start()
}

srs_freep(trd);
trd = new SrsCoroutine("edge-igs", this);
trd = new SrsSTCoroutine("edge-igs", this);
return trd->start();
}

Expand Down Expand Up @@ -423,7 +423,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()

sdk = NULL;
lb = new SrsLbRoundRobin();
trd = NULL;
trd = new SrsDummyCoroutine();
queue = new SrsMessageQueue();
}

Expand Down Expand Up @@ -493,7 +493,8 @@ int SrsEdgeForwarder::start()
return ret;
}

trd = new SrsCoroutine("edge-fwr", this);
srs_freep(trd);
trd = new SrsSTCoroutine("edge-fwr", this);
return trd->start();
}

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static std::vector<std::string> _transcoded_url;

SrsEncoder::SrsEncoder()
{
trd = NULL;
trd = new SrsDummyCoroutine();
pprint = SrsPithyPrint::create_encoder();
}

Expand Down Expand Up @@ -74,7 +74,7 @@ int SrsEncoder::on_publish(SrsRequest* req)

// start thread to run all encoding engines.
srs_freep(trd);
trd = new SrsCoroutine("encoder", this, _srs_context->get_id());
trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ SrsForwarder::SrsForwarder(SrsOriginHub* h)
sh_video = sh_audio = NULL;

sdk = NULL;
trd = NULL;
trd = new SrsDummyCoroutine();
queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter();
}
Expand Down Expand Up @@ -136,7 +136,7 @@ int SrsForwarder::on_publish()
req->stream.c_str());

srs_freep(trd);
trd = new SrsCoroutine("forward", this);
trd = new SrsSTCoroutine("forward", this);
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start srs thread failed. ret=%d", ret);
return ret;
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ SrsBufferCache::SrsBufferCache(SrsSource* s, SrsRequest* r)
req = r->copy();
source = s;
queue = new SrsMessageQueue(true);
trd = new SrsCoroutine("http-stream", this);
trd = new SrsSTCoroutine("http-stream", this);

// TODO: FIXME: support reload.
fast_cache = _srs_config->get_vhost_http_remux_fast_cache(req->vhost);
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ SrsIngester::SrsIngester()

expired = false;

trd = NULL;
trd = new SrsDummyCoroutine();
pprint = SrsPithyPrint::create_ingester();
}

Expand Down Expand Up @@ -141,7 +141,7 @@ int SrsIngester::start()

// start thread to run all encoding engines.
srs_freep(trd);
trd = new SrsCoroutine("ingest", this, _srs_context->get_id());
trd = new SrsSTCoroutine("ingest", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ SrsKafkaProducer::SrsKafkaProducer()
metadata_expired = srs_cond_new();

lock = srs_mutex_new();
trd = NULL;
trd = new SrsDummyCoroutine();
worker = new SrsAsyncCallWorker();
cache = new SrsKafkaCache();

Expand Down Expand Up @@ -410,7 +410,7 @@ int SrsKafkaProducer::start()
}

srs_freep(trd);
trd = new SrsCoroutine("kafka", this, _srs_context->get_id());
trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("start kafka thread failed. ret=%d", ret);
}
Expand Down
8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ SrsUdpListener::SrsUdpListener(ISrsUdpHandler* h, string i, int p)
nb_buf = SRS_UDP_MAX_PACKET_SIZE;
buf = new char[nb_buf];

trd = NULL;
trd = new SrsDummyCoroutine();
}

SrsUdpListener::~SrsUdpListener()
Expand Down Expand Up @@ -140,7 +140,7 @@ int SrsUdpListener::listen()
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

srs_freep(trd);
trd = new SrsCoroutine("udp", this);
trd = new SrsSTCoroutine("udp", this);
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
Expand Down Expand Up @@ -187,7 +187,7 @@ SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
_fd = -1;
_stfd = NULL;

trd = NULL;
trd = new SrsDummyCoroutine();
}

SrsTcpListener::~SrsTcpListener()
Expand Down Expand Up @@ -242,7 +242,7 @@ int SrsTcpListener::listen()
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

srs_freep(trd);
trd = new SrsCoroutine("tcp", this);
trd = new SrsSTCoroutine("tcp", this);
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_ng_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ using namespace std;

SrsNgExec::SrsNgExec()
{
trd = NULL;
trd = new SrsDummyCoroutine();
pprint = SrsPithyPrint::create_exec();
}

Expand All @@ -61,7 +61,7 @@ int SrsNgExec::on_publish(SrsRequest* req)

// start thread to run all processes.
srs_freep(trd);
trd = new SrsCoroutine("encoder", this, _srs_context->get_id());
trd = new SrsSTCoroutine("encoder", this, _srs_context->get_id());
if ((ret = trd->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
Expand Down
6 changes: 3 additions & 3 deletions trunk/src/app/srs_app_recv_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ SrsRecvThread::SrsRecvThread(ISrsMessagePumper* p, SrsRtmpServer* r, int tm)
rtmp = r;
pumper = p;
timeout = tm;
trd = NULL;
trd = new SrsDummyCoroutine();
}

SrsRecvThread::~SrsRecvThread()
Expand All @@ -77,7 +77,7 @@ int SrsRecvThread::cid()
int SrsRecvThread::start()
{
srs_freep(trd);
trd = new SrsCoroutine("recv", this);
trd = new SrsSTCoroutine("recv", this);
return trd->start();
}

Expand Down Expand Up @@ -535,7 +535,7 @@ SrsHttpRecvThread::SrsHttpRecvThread(SrsResponseOnlyHttpConn* c)
{
conn = c;
error = ERROR_SUCCESS;
trd = new SrsCoroutine("http-receive", this, _srs_context->get_id());
trd = new SrsSTCoroutine("http-receive", this, _srs_context->get_id());
}

SrsHttpRecvThread::~SrsHttpRecvThread()
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ SrsRtspConn::SrsRtspConn(SrsRtspCaster* c, srs_netfd_t fd, std::string o)
stfd = fd;
skt = new SrsStSocket();
rtsp = new SrsRtspStack(skt);
trd = new SrsCoroutine("rtsp", this);
trd = new SrsSTCoroutine("rtsp", this);

req = NULL;
sdk = NULL;
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ SrsSignalManager::SrsSignalManager(SrsServer* s)

server = s;
sig_pipe[0] = sig_pipe[1] = -1;
trd = new SrsCoroutine("signal", this);
trd = new SrsSTCoroutine("signal", this);
signal_read_stfd = NULL;
}

Expand Down
59 changes: 49 additions & 10 deletions trunk/src/app/srs_app_st.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,46 @@ ISrsCoroutineHandler::~ISrsCoroutineHandler()
{
}

SrsCoroutine::SrsCoroutine(const string& n, ISrsCoroutineHandler* h, int cid)
SrsCoroutine::SrsCoroutine()
{
}

SrsCoroutine::~SrsCoroutine()
{
}

SrsDummyCoroutine::SrsDummyCoroutine()
{
}

SrsDummyCoroutine::~SrsDummyCoroutine()
{
}

int SrsDummyCoroutine::start()
{
return ERROR_THREAD_DUMMY;
}

void SrsDummyCoroutine::stop()
{
}

void SrsDummyCoroutine::interrupt()
{
}

int SrsDummyCoroutine::pull()
{
return ERROR_THREAD_DUMMY;
}

int SrsDummyCoroutine::cid()
{
return 0;
}

SrsSTCoroutine::SrsSTCoroutine(const string& n, ISrsCoroutineHandler* h, int cid)
{
name = n;
handler = h;
Expand All @@ -50,12 +89,12 @@ SrsCoroutine::SrsCoroutine(const string& n, ISrsCoroutineHandler* h, int cid)
started = interrupted = disposed = false;
}

SrsCoroutine::~SrsCoroutine()
SrsSTCoroutine::~SrsSTCoroutine()
{
stop();
}

int SrsCoroutine::start()
int SrsSTCoroutine::start()
{
int ret = ERROR_SUCCESS;

Expand All @@ -77,7 +116,7 @@ int SrsCoroutine::start()
return ret;
}

void SrsCoroutine::stop()
void SrsSTCoroutine::stop()
{
if (!started || disposed) {
return;
Expand All @@ -101,7 +140,7 @@ void SrsCoroutine::stop()
return;
}

void SrsCoroutine::interrupt()
void SrsSTCoroutine::interrupt()
{
if (!started || interrupted) {
return;
Expand All @@ -113,17 +152,17 @@ void SrsCoroutine::interrupt()
st_thread_interrupt((st_thread_t)trd);
}

int SrsCoroutine::pull()
int SrsSTCoroutine::pull()
{
return err;
}

int SrsCoroutine::cid()
int SrsSTCoroutine::cid()
{
return context;
}

int SrsCoroutine::cycle()
int SrsSTCoroutine::cycle()
{
if (_srs_context) {
if (context) {
Expand All @@ -139,9 +178,9 @@ int SrsCoroutine::cycle()
return ret;
}

void* SrsCoroutine::pfn(void* arg)
void* SrsSTCoroutine::pfn(void* arg)
{
SrsCoroutine* p = (SrsCoroutine*)arg;
SrsSTCoroutine* p = (SrsSTCoroutine*)arg;
void*res = (void*)(uint64_t)p->cycle();
return res;
}
Expand Down
Loading

0 comments on commit 9ca3697

Please sign in to comment.