From 4f45f1ed46cac999b8359bacd10b91376fbf84ad Mon Sep 17 00:00:00 2001 From: wanglei Date: Tue, 11 Jan 2022 09:39:36 +0800 Subject: [PATCH 1/2] bugfix: SrsMetaCache memleak; getaddrinfo use delete. --- trunk/src/app/srs_app_source.cpp | 660 ++++++++++++------------ trunk/src/kernel/srs_kernel_utility.cpp | 305 +++++------ trunk/src/protocol/srs_service_st.cpp | 109 ++-- 3 files changed, 546 insertions(+), 528 deletions(-) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index d297f0f4aa..623e5162b2 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -73,14 +73,14 @@ SrsRtmpJitter::~SrsRtmpJitter() srs_error_t SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; - + // for performance issue if (ag != SrsRtmpJitterAlgorithmFULL) { // all jitter correct features is disabled, ignore. if (ag == SrsRtmpJitterAlgorithmOFF) { return err; } - + // start at zero, but donot ensure monotonically increasing. if (ag == SrsRtmpJitterAlgorithmZERO) { // for the first time, last_pkt_correct_time is -1. @@ -90,18 +90,18 @@ srs_error_t SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgori msg->timestamp -= last_pkt_correct_time; return err; } - + // other algorithm, ignore. return err; } - + // full jitter algorithm, do jitter correct. // set to 0 for metadata. if (!msg->is_av()) { msg->timestamp = 0; return err; } - + /** * we use a very simple time jitter detect/correct algorithm: * 1. delta: ensure the delta is positive and valid, @@ -114,19 +114,19 @@ srs_error_t SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgori */ int64_t time = msg->timestamp; int64_t delta = time - last_pkt_time; - + // if jitter detected, reset the delta. if (delta < CONST_MAX_JITTER_MS_NEG || delta > CONST_MAX_JITTER_MS) { // use default 10ms to notice the problem of stream. // @see https://github.com/ossrs/srs/issues/425 delta = DEFAULT_FRAME_TIME_MS; } - + last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta); - + msg->timestamp = last_pkt_correct_time; last_pkt_time = time; - + return err; } @@ -183,12 +183,12 @@ void SrsFastVector::clear() void SrsFastVector::erase(int _begin, int _end) { srs_assert(_begin < _end); - + // move all erased to previous. for (int i = 0; i < count - _end; i++) { msgs[_begin + i] = msgs[_end + i]; } - + // update the count. count -= _end - _begin; } @@ -203,13 +203,13 @@ void SrsFastVector::push_back(SrsSharedPtrMessage* msg) buf[i] = msgs[i]; } srs_info("fast vector incrase %d=>%d", nb_msgs, size); - + // use new array. srs_freepa(msgs); msgs = buf; nb_msgs = size; } - + msgs[count++] = msg; } @@ -263,7 +263,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow if (av_start_time == -1) { av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } - + av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } @@ -276,25 +276,25 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow if (is_overflow) { *is_overflow = true; } - + shrink(); } - + return err; } srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** pmsgs, int& count) { srs_error_t err = srs_success; - + int nb_msgs = (int)msgs.size(); if (nb_msgs <= 0) { return err; } - + srs_assert(max_count > 0); count = srs_min(max_count, nb_msgs); - + SrsSharedPtrMessage** omsgs = msgs.data(); memcpy(pmsgs, omsgs, count * sizeof(SrsSharedPtrMessage*)); @@ -311,19 +311,19 @@ srs_error_t SrsMessageQueue::dump_packets(int max_count, SrsSharedPtrMessage** p // the rtmp play client will get 128msgs once, so this branch rarely execute. msgs.erase(msgs.begin(), msgs.begin() + count); } - + return err; } srs_error_t SrsMessageQueue::dump_packets(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; - + int nb_msgs = (int)msgs.size(); if (nb_msgs <= 0) { return err; } - + SrsSharedPtrMessage** omsgs = msgs.data(); for (int i = 0; i < nb_msgs; i++) { SrsSharedPtrMessage* msg = omsgs[i]; @@ -331,7 +331,7 @@ srs_error_t SrsMessageQueue::dump_packets(SrsLiveConsumer* consumer, bool atc, S return srs_error_wrap(err, "consume message"); } } - + return err; } @@ -340,11 +340,11 @@ void SrsMessageQueue::shrink() SrsSharedPtrMessage* video_sh = NULL; SrsSharedPtrMessage* audio_sh = NULL; int msgs_size = (int)msgs.size(); - + // Remove all msgs, mark the sequence headers. for (int i = 0; i < (int)msgs.size(); i++) { SrsSharedPtrMessage* msg = msgs.at(i); - + if (msg->is_video() && SrsFlvVideo::sh(msg->payload, msg->size)) { srs_freep(video_sh); video_sh = msg; @@ -355,11 +355,11 @@ void SrsMessageQueue::shrink() audio_sh = msg; continue; } - + srs_freep(msg); } msgs.clear(); - + // Update av_start_time, the start time of queue. av_start_time = av_end_time; @@ -372,7 +372,7 @@ void SrsMessageQueue::shrink() audio_sh->timestamp = srsu2ms(av_end_time); msgs.push_back(audio_sh); } - + if (!_ignore_shrink) { srs_trace("shrinking, size=%d, removed=%d, max=%dms", (int)msgs.size(), msgs_size - (int)msgs.size(), srsu2msi(max_queue_size)); } @@ -382,7 +382,7 @@ void SrsMessageQueue::clear() { #ifndef SRS_PERF_QUEUE_FAST_VECTOR std::vector::iterator it; - + for (it = msgs.begin(); it != msgs.end(); ++it) { SrsSharedPtrMessage* msg = *it; srs_freep(msg); @@ -390,9 +390,9 @@ void SrsMessageQueue::clear() #else msgs.free(); #endif - + msgs.clear(); - + av_start_time = av_end_time = -1; } @@ -411,7 +411,7 @@ SrsLiveConsumer::SrsLiveConsumer(SrsLiveSource* s) jitter = new SrsRtmpJitter(); queue = new SrsMessageQueue(); should_update_source_id = false; - + #ifdef SRS_PERF_QUEUE_COND_WAIT mw_wait = srs_cond_new(); mw_min_msgs = 0; @@ -425,7 +425,7 @@ SrsLiveConsumer::~SrsLiveConsumer() source->on_consumer_destroy(this); srs_freep(jitter); srs_freep(queue); - + #ifdef SRS_PERF_QUEUE_COND_WAIT srs_cond_destroy(mw_wait); #endif @@ -449,7 +449,7 @@ int64_t SrsLiveConsumer::get_time() srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag) { srs_error_t err = srs_success; - + SrsSharedPtrMessage* msg = shared_msg->copy(); if (!atc) { @@ -461,14 +461,14 @@ srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, if ((err = queue->enqueue(msg, NULL)) != srs_success) { return srs_error_wrap(err, "enqueue message"); } - + #ifdef SRS_PERF_QUEUE_COND_WAIT // fire the mw when msgs is enough. if (mw_waiting) { // For RTMP, we wait for messages and duration. srs_utime_t duration = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; - + // For ATC, maybe the SH timestamp bigger than A/V packet, // when encoder republish or overflow. // @see https://github.com/ossrs/srs/pull/749 @@ -477,7 +477,7 @@ srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, mw_waiting = false; return err; } - + // when duration ok, signal to flush. if (match_min_msgs && duration > mw_duration) { srs_cond_signal(mw_wait); @@ -486,39 +486,39 @@ srs_error_t SrsLiveConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, } } #endif - + return err; } srs_error_t SrsLiveConsumer::dump_packets(SrsMessageArray* msgs, int& count) { srs_error_t err = srs_success; - + srs_assert(count >= 0); srs_assert(msgs->max > 0); - + // the count used as input to reset the max if positive. int max = count? srs_min(count, msgs->max) : msgs->max; - + // the count specifies the max acceptable count, // here maybe 1+, and we must set to 0 when got nothing. count = 0; - + if (should_update_source_id) { srs_trace("update source_id=%s/%s", source->source_id().c_str(), source->pre_source_id().c_str()); should_update_source_id = false; } - + // paused, return nothing. if (paused) { return err; } - + // pump msgs from queue. if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) { return srs_error_wrap(err, "dump packets"); } - + return err; } @@ -529,21 +529,21 @@ void SrsLiveConsumer::wait(int nb_msgs, srs_utime_t msgs_duration) srs_usleep(SRS_CONSTS_RTMP_PULSE); return; } - + mw_min_msgs = nb_msgs; mw_duration = msgs_duration; - + srs_utime_t duration = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; - + // when duration ok, signal to flush. if (match_min_msgs && duration > mw_duration) { return; } - + // the enqueue will notify this cond. mw_waiting = true; - + // use cond block wait for high performance mode. srs_cond_wait(mw_wait); } @@ -552,10 +552,10 @@ void SrsLiveConsumer::wait(int nb_msgs, srs_utime_t msgs_duration) srs_error_t SrsLiveConsumer::on_play_client_pause(bool is_pause) { srs_error_t err = srs_success; - + srs_trace("stream consumer change pause state %d=>%d", paused, is_pause); paused = is_pause; - + return err; } @@ -589,7 +589,7 @@ void SrsGopCache::dispose() void SrsGopCache::set(bool v) { enable_gop_cache = v; - + if (!v) { clear(); return; @@ -604,53 +604,53 @@ bool SrsGopCache::enabled() srs_error_t SrsGopCache::cache(SrsSharedPtrMessage* shared_msg) { srs_error_t err = srs_success; - + if (!enable_gop_cache) { return err; } - + // the gop cache know when to gop it. SrsSharedPtrMessage* msg = shared_msg; - + // got video, update the video count if acceptable if (msg->is_video()) { // drop video when not h.264 if (!SrsFlvVideo::h264(msg->payload, msg->size)) { return err; } - + cached_video_count++; audio_after_last_video_count = 0; } - + // no acceptable video or pure audio, disable the cache. if (pure_audio()) { return err; } - + // ok, gop cache enabled, and got an audio. if (msg->is_audio()) { audio_after_last_video_count++; } - + // clear gop cache when pure audio count overflow if (audio_after_last_video_count > SRS_PURE_AUDIO_GUESS_COUNT) { srs_warn("clear gop cache for guess pure audio overflow"); clear(); return err; } - + // clear gop cache when got key frame if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) { clear(); - + // curent msg is video frame, so we set to 1. cached_video_count = 1; } - + // cache the frame. gop_cache.push_back(msg->copy()); - + return err; } @@ -662,7 +662,7 @@ void SrsGopCache::clear() srs_freep(msg); } gop_cache.clear(); - + cached_video_count = 0; audio_after_last_video_count = 0; } @@ -670,7 +670,7 @@ void SrsGopCache::clear() srs_error_t SrsGopCache::dump(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm jitter_algorithm) { srs_error_t err = srs_success; - + std::vector::iterator it; for (it = gop_cache.begin(); it != gop_cache.end(); ++it) { SrsSharedPtrMessage* msg = *it; @@ -679,7 +679,7 @@ srs_error_t SrsGopCache::dump(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitter } } srs_trace("dispatch cached gop success. count=%d, duration=%d", (int)gop_cache.size(), consumer->get_time()); - + return err; } @@ -693,10 +693,10 @@ srs_utime_t SrsGopCache::start_time() if (empty()) { return 0; } - + SrsSharedPtrMessage* msg = gop_cache[0]; srs_assert(msg); - + return srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS); } @@ -720,7 +720,7 @@ bool srs_hls_can_continue(int ret, SrsSharedPtrMessage* sh, SrsSharedPtrMessage* if (ret != ERROR_HLS_DECODE_ERROR) { return false; } - + // when video size equals to sequence header, // the video actually maybe a sequence header, // continue to make ffmpeg happy. @@ -728,7 +728,7 @@ bool srs_hls_can_continue(int ret, SrsSharedPtrMessage* sh, SrsSharedPtrMessage* srs_warn("the msg is actually a sequence header, ignore this packet."); return true; } - + return false; } @@ -751,7 +751,7 @@ void SrsMixQueue::clear() srs_freep(msg); } msgs.clear(); - + nb_videos = 0; nb_audios = 0; } @@ -759,7 +759,7 @@ void SrsMixQueue::clear() void SrsMixQueue::push(SrsSharedPtrMessage* msg) { msgs.insert(std::make_pair(msg->timestamp, msg)); - + if (msg->is_video()) { nb_videos++; } else { @@ -770,37 +770,37 @@ void SrsMixQueue::push(SrsSharedPtrMessage* msg) SrsSharedPtrMessage* SrsMixQueue::pop() { bool mix_ok = false; - + // pure video if (nb_videos >= SRS_MIX_CORRECT_PURE_AV && nb_audios == 0) { mix_ok = true; } - + // pure audio if (nb_audios >= SRS_MIX_CORRECT_PURE_AV && nb_videos == 0) { mix_ok = true; } - + // got 1 video and 1 audio, mix ok. if (nb_videos >= 1 && nb_audios >= 1) { mix_ok = true; } - + if (!mix_ok) { return NULL; } - + // pop the first msg. std::multimap::iterator it = msgs.begin(); SrsSharedPtrMessage* msg = it->second; msgs.erase(it); - + if (msg->is_video()) { nb_videos--; } else { nb_audios--; } - + return msg; } @@ -809,7 +809,7 @@ SrsOriginHub::SrsOriginHub() source = NULL; req = NULL; is_active = false; - + hls = new SrsHls(); dash = new SrsDash(); dvr = new SrsDvr(); @@ -819,14 +819,14 @@ SrsOriginHub::SrsOriginHub() #endif ng_exec = new SrsNgExec(); format = new SrsRtmpFormat(); - + _srs_config->subscribe(this); } SrsOriginHub::~SrsOriginHub() { _srs_config->unsubscribe(this); - + if (true) { std::vector::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { @@ -836,7 +836,7 @@ SrsOriginHub::~SrsOriginHub() forwarders.clear(); } srs_freep(ng_exec); - + srs_freep(format); srs_freep(hls); srs_freep(dash); @@ -850,46 +850,46 @@ SrsOriginHub::~SrsOriginHub() srs_error_t SrsOriginHub::initialize(SrsLiveSource* s, SrsRequest* r) { srs_error_t err = srs_success; - + req = r; source = s; - + if ((err = format->initialize()) != srs_success) { return srs_error_wrap(err, "format initialize"); } - + if ((err = hls->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "hls initialize"); } - + if ((err = dash->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "dash initialize"); } - + if ((err = dvr->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "dvr initialize"); } - + return err; } void SrsOriginHub::dispose() { hls->dispose(); - + // TODO: Support dispose DASH. } srs_error_t SrsOriginHub::cycle() { srs_error_t err = srs_success; - + if ((err = hls->cycle()) != srs_success) { return srs_error_wrap(err, "hls cycle"); } - + // TODO: Support cycle DASH. - + return err; } @@ -901,11 +901,11 @@ bool SrsOriginHub::active() srs_error_t SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet) { srs_error_t err = srs_success; - + if ((err = format->on_metadata(packet)) != srs_success) { return srs_error_wrap(err, "Format parse metadata"); } - + // copy to all forwarders if (true) { std::vector::iterator it; @@ -916,18 +916,18 @@ srs_error_t SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, Srs } } } - + if ((err = dvr->on_meta_data(shared_metadata)) != srs_success) { return srs_error_wrap(err, "DVR consume metadata"); } - + return err; } srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) { srs_error_t err = srs_success; - + SrsSharedPtrMessage* msg = shared_audio; // TODO: FIXME: Support parsing OPUS for RTC. @@ -940,29 +940,29 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) if (!format->acodec) { return err; } - + // cache the sequence header if aac // donot cache the sequence header to gop_cache, return here. if (format->is_aac_sequence_header()) { srs_assert(format->acodec); SrsAudioCodecConfig* c = format->acodec; - + static int flv_sample_sizes[] = {8, 16, 0}; static int flv_sound_types[] = {1, 2, 0}; - + // when got audio stream info. SrsStatistic* stat = SrsStatistic::instance(); if ((err = stat->on_audio_info(req, SrsAudioCodecIdAAC, c->sound_rate, c->sound_type, c->aac_object)) != srs_success) { return srs_error_wrap(err, "stat audio"); } - + srs_trace("%dB audio sh, codec(%d, profile=%s, %dchannels, %dkbps, %dHZ), flv(%dbits, %dchannels, %dHZ)", msg->size, c->id, srs_aac_object2str(c->aac_object).c_str(), c->aac_channels, c->audio_data_rate / 1000, srs_aac_srates[c->aac_sample_rate], flv_sample_sizes[c->sound_size], flv_sound_types[c->sound_type], srs_flv_srates[c->sound_rate]); } - + if ((err = hls->on_audio(msg, format)) != srs_success) { // apply the error strategy for hls. // @see https://github.com/ossrs/srs/issues/264 @@ -981,19 +981,19 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) return srs_error_wrap(err, "hls: audio"); } } - + if ((err = dash->on_audio(msg, format)) != srs_success) { srs_warn("dash: ignore audio error %s", srs_error_desc(err).c_str()); srs_error_reset(err); dash->on_unpublish(); } - + if ((err = dvr->on_audio(msg, format)) != srs_success) { srs_warn("dvr: ignore audio error %s", srs_error_desc(err).c_str()); srs_error_reset(err); dvr->on_unpublish(); } - + #ifdef SRS_HDS if ((err = hds->on_audio(msg)) != srs_success) { srs_warn("hds: ignore audio error %s", srs_error_desc(err).c_str()); @@ -1001,7 +1001,7 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) hds->on_unpublish(); } #endif - + // copy to all forwarders. if (true) { std::vector::iterator it; @@ -1012,44 +1012,44 @@ srs_error_t SrsOriginHub::on_audio(SrsSharedPtrMessage* shared_audio) } } } - + return err; } srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_sequence_header) { srs_error_t err = srs_success; - + SrsSharedPtrMessage* msg = shared_video; - + // user can disable the sps parse to workaround when parse sps failed. // @see https://github.com/ossrs/srs/issues/474 if (is_sequence_header) { format->avc_parse_sps = _srs_config->get_parse_sps(req->vhost); } - + if ((err = format->on_video(msg)) != srs_success) { return srs_error_wrap(err, "format consume video"); } - + // Ignore if no format->vcodec, it means the codec is not parsed, or unsupport/unknown codec // such as H.263 codec if (!format->vcodec) { return err; } - + // cache the sequence header if h264 // donot cache the sequence header to gop_cache, return here. if (format->is_avc_sequence_header()) { SrsVideoCodecConfig* c = format->vcodec; srs_assert(c); - + // when got video stream info. SrsStatistic* stat = SrsStatistic::instance(); if ((err = stat->on_video_info(req, SrsVideoCodecIdAVC, c->avc_profile, c->avc_level, c->width, c->height)) != srs_success) { return srs_error_wrap(err, "stat video"); } - + srs_trace("%dB video sh, codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %.1ffps, %.1fs)", msg->size, c->id, srs_avc_profile2str(c->avc_profile).c_str(), srs_avc_level2str(c->avc_level).c_str(), c->width, c->height, @@ -1061,7 +1061,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se if (format->vcodec && !format->vcodec->is_avc_codec_ok()) { return err; } - + if ((err = hls->on_video(msg, format)) != srs_success) { // TODO: We should support more strategies. // apply the error strategy for hls. @@ -1081,19 +1081,19 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se return srs_error_wrap(err, "hls: video"); } } - + if ((err = dash->on_video(msg, format)) != srs_success) { srs_warn("dash: ignore video error %s", srs_error_desc(err).c_str()); srs_error_reset(err); dash->on_unpublish(); } - + if ((err = dvr->on_video(msg, format)) != srs_success) { srs_warn("dvr: ignore video error %s", srs_error_desc(err).c_str()); srs_error_reset(err); dvr->on_unpublish(); } - + #ifdef SRS_HDS if ((err = hds->on_video(msg)) != srs_success) { srs_warn("hds: ignore video error %s", srs_error_desc(err).c_str()); @@ -1101,7 +1101,7 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se hds->on_unpublish(); } #endif - + // copy to all forwarders. if (!forwarders.empty()) { std::vector::iterator it; @@ -1112,19 +1112,19 @@ srs_error_t SrsOriginHub::on_video(SrsSharedPtrMessage* shared_video, bool is_se } } } - + return err; } srs_error_t SrsOriginHub::on_publish() { srs_error_t err = srs_success; - + // create forwarders if ((err = create_forwarders()) != srs_success) { return srs_error_wrap(err, "create forwarders"); } - + // TODO: FIXME: use initialize to set req. if ((err = encoder->on_publish(req)) != srs_success) { return srs_error_wrap(err, "encoder publish"); @@ -1133,60 +1133,60 @@ srs_error_t SrsOriginHub::on_publish() if ((err = hls->on_publish()) != srs_success) { return srs_error_wrap(err, "hls publish"); } - + if ((err = dash->on_publish()) != srs_success) { return srs_error_wrap(err, "dash publish"); } - + // @see https://github.com/ossrs/srs/issues/1613#issuecomment-961657927 if ((err = dvr->on_publish(req)) != srs_success) { return srs_error_wrap(err, "dvr publish"); } - + // TODO: FIXME: use initialize to set req. #ifdef SRS_HDS if ((err = hds->on_publish(req)) != srs_success) { return srs_error_wrap(err, "hds publish"); } #endif - + // TODO: FIXME: use initialize to set req. if ((err = ng_exec->on_publish(req)) != srs_success) { return srs_error_wrap(err, "exec publish"); } - + is_active = true; - + return err; } void SrsOriginHub::on_unpublish() { is_active = false; - + // destroy all forwarders destroy_forwarders(); - + encoder->on_unpublish(); hls->on_unpublish(); dash->on_unpublish(); dvr->on_unpublish(); - + #ifdef SRS_HDS hds->on_unpublish(); #endif - + ng_exec->on_unpublish(); } srs_error_t SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder) { srs_error_t err = srs_success; - + SrsSharedPtrMessage* cache_metadata = source->meta->data(); SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); - + // feed the forwarder the metadata/sequence header, // when reload to enable the forwarder. if (cache_metadata && (err = forwarder->on_meta_data(cache_metadata)) != srs_success) { @@ -1198,86 +1198,86 @@ srs_error_t SrsOriginHub::on_forwarder_start(SrsForwarder* forwarder) if (cache_sh_audio && (err = forwarder->on_audio(cache_sh_audio)) != srs_success) { return srs_error_wrap(err, "forward audio sh"); } - + return err; } srs_error_t SrsOriginHub::on_dvr_request_sh() { srs_error_t err = srs_success; - + SrsSharedPtrMessage* cache_metadata = source->meta->data(); SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); - + // feed the dvr the metadata/sequence header, // when reload to start dvr, dvr will never get the sequence header in stream, // use the SrsLiveSource.on_dvr_request_sh to push the sequence header to DVR. if (cache_metadata && (err = dvr->on_meta_data(cache_metadata)) != srs_success) { return srs_error_wrap(err, "dvr metadata"); } - + if (cache_sh_video) { if ((err = dvr->on_video(cache_sh_video, source->meta->vsh_format())) != srs_success) { return srs_error_wrap(err, "dvr video"); } } - + if (cache_sh_audio) { if ((err = dvr->on_audio(cache_sh_audio, source->meta->ash_format())) != srs_success) { return srs_error_wrap(err, "dvr audio"); } } - + return err; } srs_error_t SrsOriginHub::on_reload_vhost_forward(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + // TODO: FIXME: maybe should ignore when publish already stopped? - + // forwarders destroy_forwarders(); - + // Don't start forwarders when source is not active. if (!is_active) { return err; } - + if ((err = create_forwarders()) != srs_success) { return srs_error_wrap(err, "create forwarders"); } - + srs_trace("vhost %s forwarders reload success", vhost.c_str()); - + return err; } srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + dash->on_unpublish(); - + // Don't start DASH when source is not active. if (!is_active) { return err; } - + if ((err = dash->on_publish()) != srs_success) { return srs_error_wrap(err, "dash start publish"); } - + SrsSharedPtrMessage* cache_sh_video = source->meta->vsh(); if (cache_sh_video) { if ((err = format->on_video(cache_sh_video)) != srs_success) { @@ -1287,7 +1287,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost) return srs_error_wrap(err, "dash on_video"); } } - + SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); if (cache_sh_audio) { if ((err = format->on_audio(cache_sh_audio)) != srs_success) { @@ -1297,32 +1297,32 @@ srs_error_t SrsOriginHub::on_reload_vhost_dash(string vhost) return srs_error_wrap(err, "dash on_audio"); } } - + return err; } srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + // TODO: FIXME: maybe should ignore when publish already stopped? - + hls->on_unpublish(); - + // Don't start HLS when source is not active. if (!is_active) { return err; } - + if ((err = hls->on_publish()) != srs_success) { return srs_error_wrap(err, "hls publish failed"); } srs_trace("vhost %s hls reload success", vhost.c_str()); - + // when publish, don't need to fetch sequence header, which is old and maybe corrupt. // when reload, we must fetch the sequence header from source cache. // notice the source to get the cached sequence header. @@ -1337,7 +1337,7 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) return srs_error_wrap(err, "hls on_video"); } } - + SrsSharedPtrMessage* cache_sh_audio = source->meta->ash(); if (cache_sh_audio) { if ((err = format->on_audio(cache_sh_audio)) != srs_success) { @@ -1347,139 +1347,139 @@ srs_error_t SrsOriginHub::on_reload_vhost_hls(string vhost) return srs_error_wrap(err, "hls on_audio"); } } - + return err; } srs_error_t SrsOriginHub::on_reload_vhost_hds(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + // TODO: FIXME: maybe should ignore when publish already stopped? - + #ifdef SRS_HDS hds->on_unpublish(); - + // Don't start HDS when source is not active. if (!is_active) { return err; } - + if ((err = hds->on_publish(req)) != srs_success) { return srs_error_wrap(err, "hds publish failed"); } srs_trace("vhost %s hds reload success", vhost.c_str()); #endif - + return err; } srs_error_t SrsOriginHub::on_reload_vhost_dvr(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + // TODO: FIXME: maybe should ignore when publish already stopped? - + // cleanup dvr dvr->on_unpublish(); - + // Don't start DVR when source is not active. if (!is_active) { return err; } - + // reinitialize the dvr, update plan. if ((err = dvr->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "reload dvr"); } - + // start to publish by new plan. if ((err = dvr->on_publish(req)) != srs_success) { return srs_error_wrap(err, "dvr publish failed"); } - + if ((err = on_dvr_request_sh()) != srs_success) { return srs_error_wrap(err, "request sh"); } - + srs_trace("vhost %s dvr reload success", vhost.c_str()); - + return err; } srs_error_t SrsOriginHub::on_reload_vhost_transcode(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + // TODO: FIXME: maybe should ignore when publish already stopped? - + encoder->on_unpublish(); - + // Don't start transcode when source is not active. if (!is_active) { return err; } - + if ((err = encoder->on_publish(req)) != srs_success) { return srs_error_wrap(err, "start encoder failed"); } srs_trace("vhost %s transcode reload success", vhost.c_str()); - + return err; } srs_error_t SrsOriginHub::on_reload_vhost_exec(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + // TODO: FIXME: maybe should ignore when publish already stopped? - + ng_exec->on_unpublish(); - + // Don't start exec when source is not active. if (!is_active) { return err; } - + if ((err = ng_exec->on_publish(req)) != srs_success) { return srs_error_wrap(err, "start exec failed"); } srs_trace("vhost %s exec reload success", vhost.c_str()); - + return err; } srs_error_t SrsOriginHub::create_forwarders() { srs_error_t err = srs_success; - + if (!_srs_config->get_forward_enabled(req->vhost)) { return err; } - + SrsConfDirective* conf = _srs_config->get_forwards(req->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { std::string forward_server = conf->args.at(i); - + SrsForwarder* forwarder = new SrsForwarder(this); forwarders.push_back(forwarder); - + // initialize the forwarder with request. if ((err = forwarder->initialize(req, forward_server)) != srs_success) { return srs_error_wrap(err, "init forwarder"); @@ -1487,13 +1487,13 @@ srs_error_t SrsOriginHub::create_forwarders() srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); forwarder->set_queue_size(queue_size); - + if ((err = forwarder->on_publish()) != srs_success) { return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s", req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), forward_server.c_str()); } } - + return err; } @@ -1526,6 +1526,8 @@ void SrsMetaCache::dispose() clear(); srs_freep(previous_video); srs_freep(previous_audio); + srs_freep(previous_video); + srs_freep(previous_audio); } void SrsMetaCache::clear() @@ -1563,23 +1565,23 @@ SrsFormat* SrsMetaCache::ash_format() srs_error_t SrsMetaCache::dumps(SrsLiveConsumer* consumer, bool atc, SrsRtmpJitterAlgorithm ag, bool dm, bool ds) { srs_error_t err = srs_success; - + // copy metadata. if (dm && meta && (err = consumer->enqueue(meta, atc, ag)) != srs_success) { return srs_error_wrap(err, "enqueue metadata"); } - + // copy sequence header // copy audio sequence first, for hls to fast parse the "right" audio codec. // @see https://github.com/ossrs/srs/issues/301 if (ds && audio && (err = consumer->enqueue(audio, atc, ag)) != srs_success) { return srs_error_wrap(err, "enqueue audio sh"); } - + if (ds && video && (err = consumer->enqueue(video, atc, ag)) != srs_success) { return srs_error_wrap(err, "enqueue video sh"); } - + return err; } @@ -1608,16 +1610,16 @@ void SrsMetaCache::update_previous_ash() srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated) { updated = false; - + srs_error_t err = srs_success; - + SrsAmf0Any* prop = NULL; - + // when exists the duration, remove it to make ExoPlayer happy. if (metadata->metadata->get_property("duration") != NULL) { metadata->metadata->remove("duration"); } - + // generate metadata info to print std::stringstream ss; if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) { @@ -1633,37 +1635,37 @@ srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPac ss << ", acodec=" << (int)prop->to_number(); } srs_trace("got metadata%s", ss.str().c_str()); - + // add server info to metadata metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); // version, for example, 1.0.0 // add version to metadata, please donot remove it, for debug. metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION)); - + // encode the metadata to payload int size = 0; char* payload = NULL; if ((err = metadata->encode(size, payload)) != srs_success) { return srs_error_wrap(err, "encode metadata"); } - + if (size <= 0) { srs_warn("ignore the invalid metadata. size=%d", size); return err; } - + // create a shared ptr message. srs_freep(meta); meta = new SrsSharedPtrMessage(); updated = true; - + // dump message to shared ptr message. // the payload/size managed by cache_metadata, user should not free it. if ((err = meta->create(header, payload, size)) != srs_success) { return srs_error_wrap(err, "create metadata"); } - + return err; } @@ -1710,16 +1712,16 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH // @bug https://github.com/ossrs/srs/issues/1230 // TODO: FIXME: Use smaller lock. SrsLocker(lock); - + SrsLiveSource* source = NULL; if ((source = fetch(r)) != NULL) { *pps = source; return err; } - + string stream_url = r->get_stream_url(); string vhost = r->vhost; - + // should always not exists for create a source. srs_assert (pool.find(stream_url) == pool.end()); @@ -1730,7 +1732,7 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); goto failed; } - + pool[stream_url] = source; *pps = source; return err; @@ -1743,19 +1745,19 @@ srs_error_t SrsLiveSourceManager::fetch_or_create(SrsRequest* r, ISrsLiveSourceH SrsLiveSource* SrsLiveSourceManager::fetch(SrsRequest* r) { SrsLiveSource* source = NULL; - + string stream_url = r->get_stream_url(); if (pool.find(stream_url) == pool.end()) { return NULL; } - + source = pool[stream_url]; - + // we always update the request of resource, // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. source->update_auth(r); - + return source; } @@ -1849,22 +1851,22 @@ SrsLiveSource::SrsLiveSource() jitter_algorithm = SrsRtmpJitterAlgorithmOFF; mix_correct = false; mix_queue = new SrsMixQueue(); - + _can_publish = true; die_at = 0; handler = NULL; bridger_ = NULL; - + play_edge = new SrsPlayEdge(); publish_edge = new SrsPublishEdge(); gop_cache = new SrsGopCache(); hub = new SrsOriginHub(); meta = new SrsMetaCache(); - + is_monotonically_increase = false; last_packet_time = 0; - + _srs_config->subscribe(this); atc = false; } @@ -1872,19 +1874,19 @@ SrsLiveSource::SrsLiveSource() SrsLiveSource::~SrsLiveSource() { _srs_config->unsubscribe(this); - + // never free the consumers, // for all consumers are auto free. consumers.clear(); - + srs_freep(hub); srs_freep(meta); srs_freep(mix_queue); - + srs_freep(play_edge); srs_freep(publish_edge); srs_freep(gop_cache); - + srs_freep(req); srs_freep(bridger_); } @@ -1902,7 +1904,7 @@ srs_error_t SrsLiveSource::cycle() if (err != srs_success) { return srs_error_wrap(err, "hub cycle"); } - + return srs_success; } @@ -1912,53 +1914,53 @@ bool SrsLiveSource::expired() if (die_at == 0) { return false; } - + // still publishing? if (!_can_publish || !publish_edge->can_publish()) { return false; } - + // has any consumers? if (!consumers.empty()) { return false; } - + srs_utime_t now = srs_get_system_time(); if (now > die_at + SRS_SOURCE_CLEANUP) { return true; } - + return false; } srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h) { srs_error_t err = srs_success; - + srs_assert(h); srs_assert(!req); - + handler = h; req = r->copy(); atc = _srs_config->get_atc(req->vhost); - + if ((err = hub->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "hub"); } - + if ((err = play_edge->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "edge(play)"); } if ((err = publish_edge->initialize(this, req)) != srs_success) { return srs_error_wrap(err, "edge(publish)"); } - + srs_utime_t queue_size = _srs_config->get_queue_length(req->vhost); publish_edge->set_queue_size(queue_size); - + jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost); mix_correct = _srs_config->get_mix_correct(req->vhost); - + return err; } @@ -1971,90 +1973,90 @@ void SrsLiveSource::set_bridger(ISrsLiveSourceBridger* v) srs_error_t SrsLiveSource::on_reload_vhost_play(string vhost) { srs_error_t err = srs_success; - + if (req->vhost != vhost) { return err; } - + // time_jitter jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(req->vhost); - + // mix_correct if (true) { bool v = _srs_config->get_mix_correct(req->vhost); - + // when changed, clear the mix queue. if (v != mix_correct) { mix_queue->clear(); } mix_correct = v; } - + // atc changed. if (true) { bool v = _srs_config->get_atc(vhost); - + if (v != atc) { srs_warn("vhost %s atc changed to %d, connected client may corrupt.", vhost.c_str(), v); gop_cache->clear(); } atc = v; } - + // gop cache changed. if (true) { bool v = _srs_config->get_gop_cache(vhost); - + if (v != gop_cache->enabled()) { string url = req->get_stream_url(); srs_trace("vhost %s gop_cache changed to %d, source url=%s", vhost.c_str(), v, url.c_str()); gop_cache->set(v); } } - + // queue length if (true) { srs_utime_t v = _srs_config->get_queue_length(req->vhost); - + if (true) { std::vector::iterator it; - + for (it = consumers.begin(); it != consumers.end(); ++it) { SrsLiveConsumer* consumer = *it; consumer->set_queue_size(v); } - + srs_trace("consumers reload queue size success."); } - + // TODO: FIXME: https://github.com/ossrs/srs/issues/742#issuecomment-273656897 // TODO: FIXME: support queue size. #if 0 if (true) { std::vector::iterator it; - + for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; forwarder->set_queue_size(v); } - + srs_trace("forwarders reload queue size success."); } - + if (true) { publish_edge->set_queue_size(v); srs_trace("publish_edge reload queue size success."); } #endif } - + return err; } srs_error_t SrsLiveSource::on_source_id_changed(SrsContextId id) { srs_error_t err = srs_success; - + if (!_source_id.compare(id)) { return err; } @@ -2063,14 +2065,14 @@ srs_error_t SrsLiveSource::on_source_id_changed(SrsContextId id) _pre_source_id = id; } _source_id = id; - + // notice all consumer std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsLiveConsumer* consumer = *it; consumer->update_source_id(); } - + return err; } @@ -2101,14 +2103,14 @@ bool SrsLiveSource::can_publish(bool is_edge) if (is_edge) { return publish_edge->can_publish(); } - + return _can_publish; } srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata) { srs_error_t err = srs_success; - + // if allow atc_auto and bravo-atc detected, open atc for vhost. SrsAmf0Any* prop = NULL; atc = _srs_config->get_atc(req->vhost); @@ -2119,7 +2121,7 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPack } } } - + // Update the meta cache. bool updated = false; if ((err = meta->update_data(&msg->header, metadata, updated)) != srs_success) { @@ -2128,14 +2130,14 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPack if (!updated) { return err; } - + // when already got metadata, drop when reduce sequence header. bool drop_for_reduce = false; if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) { drop_for_reduce = true; srs_warn("drop for reduce sh metadata, size=%d", msg->size); } - + // copy to all consumer if (!drop_for_reduce) { std::vector::iterator it; @@ -2146,7 +2148,7 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPack } } } - + // Copy to hub to all utilities. return hub->on_meta_data(meta->data(), metadata); } @@ -2154,7 +2156,7 @@ srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPack srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio) { srs_error_t err = srs_success; - + // monotically increase detect. if (!mix_correct && is_monotonically_increase) { if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) { @@ -2163,28 +2165,28 @@ srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio) } } last_packet_time = shared_audio->header.timestamp; - + // convert shared_audio to msg, user should not use shared_audio again. // the payload is transfer to msg, and set to NULL in shared_audio. SrsSharedPtrMessage msg; if ((err = msg.create(shared_audio)) != srs_success) { return srs_error_wrap(err, "create message"); } - + // directly process the audio message. if (!mix_correct) { return on_audio_imp(&msg); } - + // insert msg to the queue. mix_queue->push(msg.copy()); - + // fetch someone from mix queue. SrsSharedPtrMessage* m = mix_queue->pop(); if (!m) { return err; } - + // consume the monotonically increase message. if (m->is_audio()) { err = on_audio_imp(m); @@ -2192,17 +2194,17 @@ srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio) err = on_video_imp(m); } srs_freep(m); - + return err; } srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; - + bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size); bool is_sequence_header = is_aac_sequence_header; - + // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) { @@ -2211,7 +2213,7 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) srs_warn("drop for reduce sh audio, size=%d", msg->size); } } - + // Copy to hub to all utilities. if ((err = hub->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "consume audio"); @@ -2231,7 +2233,7 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) } } } - + // cache the sequence header of aac, or first packet of mp3. // for example, the mp3 is used for hls to write the "right" audio codec. // TODO: FIXME: to refine the stream info system. @@ -2240,12 +2242,12 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) return srs_error_wrap(err, "meta consume audio"); } } - + // when sequence header, donot push to gop cache and adjust the timestamp. if (is_sequence_header) { return err; } - + // cache the last gop packets if ((err = gop_cache->cache(msg)) != srs_success) { return srs_error_wrap(err, "gop cache consume audio"); @@ -2260,14 +2262,14 @@ srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) meta->data()->timestamp = msg->timestamp; } } - + return err; } srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) { srs_error_t err = srs_success; - + // monotically increase detect. if (!mix_correct && is_monotonically_increase) { if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) { @@ -2276,7 +2278,7 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) } } last_packet_time = shared_video->header.timestamp; - + // drop any unknown header video. // @see https://github.com/ossrs/srs/issues/421 if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) { @@ -2284,32 +2286,32 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) if (shared_video->size > 0) { b0 = shared_video->payload[0]; } - + srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0); return err; } - + // convert shared_video to msg, user should not use shared_video again. // the payload is transfer to msg, and set to NULL in shared_video. SrsSharedPtrMessage msg; if ((err = msg.create(shared_video)) != srs_success) { return srs_error_wrap(err, "create message"); } - + // directly process the video message. if (!mix_correct) { return on_video_imp(&msg); } - + // insert msg to the queue. mix_queue->push(msg.copy()); - + // fetch someone from mix queue. SrsSharedPtrMessage* m = mix_queue->pop(); if (!m) { return err; } - + // consume the monotonically increase message. if (m->is_audio()) { err = on_audio_imp(m); @@ -2317,16 +2319,16 @@ srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) err = on_video_imp(m); } srs_freep(m); - + return err; } srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; - + bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size); - + // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) { @@ -2335,13 +2337,13 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) srs_warn("drop for reduce sh video, size=%d", msg->size); } } - + // cache the sequence header if h264 // donot cache the sequence header to gop_cache, return here. if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { return srs_error_wrap(err, "meta update video"); } - + // Copy to hub to all utilities. if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) { return srs_error_wrap(err, "hub consume video"); @@ -2361,17 +2363,17 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) } } } - + // when sequence header, donot push to gop cache and adjust the timestamp. if (is_sequence_header) { return err; } - + // cache the last gop packets if ((err = gop_cache->cache(msg)) != srs_success) { return srs_error_wrap(err, "gop cache consume vdieo"); } - + // if atc, update the sequence header to abs time. if (atc) { if (meta->vsh()) { @@ -2381,85 +2383,85 @@ srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) meta->data()->timestamp = msg->timestamp; } } - + return err; } srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage* msg) { srs_error_t err = srs_success; - + SrsBuffer* stream = new SrsBuffer(msg->payload, msg->size); SrsAutoFree(SrsBuffer, stream); - + // the aggregate message always use abs time. int delta = -1; - + while (!stream->empty()) { if (!stream->require(1)) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate"); } int8_t type = stream->read_1bytes(); - + if (!stream->require(3)) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate"); } int32_t data_size = stream->read_3bytes(); - + if (data_size < 0) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate size"); } - + if (!stream->require(3)) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate time"); } int32_t timestamp = stream->read_3bytes(); - + if (!stream->require(1)) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate time(high bits)"); } int32_t time_h = stream->read_1bytes(); - + timestamp |= time_h<<24; timestamp &= 0x7FFFFFFF; - + // adjust abs timestamp in aggregate msg. // only -1 means uninitialized delta. if (delta == -1) { delta = (int)msg->header.timestamp - (int)timestamp; } timestamp += delta; - + if (!stream->require(3)) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate stream id"); } int32_t stream_id = stream->read_3bytes(); - + if (data_size > 0 && !stream->require(data_size)) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate data"); } - + // to common message. SrsCommonMessage o; - + o.header.message_type = type; o.header.payload_length = data_size; o.header.timestamp_delta = timestamp; o.header.timestamp = timestamp; o.header.stream_id = stream_id; o.header.perfer_cid = msg->header.perfer_cid; - + if (data_size > 0) { o.size = data_size; o.payload = new char[o.size]; stream->read_bytes(o.payload, o.size); } - + if (!stream->require(4)) { return srs_error_new(ERROR_RTMP_AGGREGATE, "aggregate previous tag size"); } stream->read_4bytes(); - + // process parsed message if (o.header.is_audio()) { if ((err = on_audio(&o)) != srs_success) { @@ -2471,41 +2473,41 @@ srs_error_t SrsLiveSource::on_aggregate(SrsCommonMessage* msg) } } } - + return err; } srs_error_t SrsLiveSource::on_publish() { srs_error_t err = srs_success; - + // update the request object. srs_assert(req); - + _can_publish = false; - + // whatever, the publish thread is the source or edge source, // save its id to srouce id. if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) { return srs_error_wrap(err, "source id change"); } - + // reset the mix queue. mix_queue->clear(); // Reset the metadata cache, to make VLC happy when disable/enable stream. // @see https://github.com/ossrs/srs/issues/1630#issuecomment-597979448 meta->clear(); - + // detect the monotonically again. is_monotonically_increase = true; last_packet_time = 0; - + // Notify the hub about the publish event. if ((err = hub->on_publish()) != srs_success) { return srs_error_wrap(err, "hub publish"); } - + // notify the handler. srs_assert(handler); if ((err = handler->on_publish(this, req)) != srs_success) { @@ -2518,7 +2520,7 @@ srs_error_t SrsLiveSource::on_publish() SrsStatistic* stat = SrsStatistic::instance(); stat->on_stream_publish(req, _source_id.c_str()); - + return err; } @@ -2528,10 +2530,10 @@ void SrsLiveSource::on_unpublish() if (_can_publish) { return; } - + // Notify the hub about the unpublish event. hub->on_unpublish(); - + // only clear the gop cache, // donot clear the sequence header, for it maybe not changed, // when drop dup sequence header, drop the metadata also. @@ -2543,7 +2545,7 @@ void SrsLiveSource::on_unpublish() meta->update_previous_ash(); srs_trace("cleanup when unpublish"); - + _can_publish = true; if (!_source_id.empty()) { _pre_source_id = _source_id; @@ -2561,7 +2563,7 @@ void SrsLiveSource::on_unpublish() bridger_->on_unpublish(); srs_freep(bridger_); } - + // no consumer, stream is die. if (consumers.empty()) { die_at = srs_get_system_time(); @@ -2571,10 +2573,10 @@ void SrsLiveSource::on_unpublish() srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer) { srs_error_t err = srs_success; - + consumer = new SrsLiveConsumer(this); consumers.push_back(consumer); - + // for edge, when play edge stream, check the state if (_srs_config->get_vhost_is_edge(req->vhost)) { // notice edge to start for the first client. @@ -2582,7 +2584,7 @@ srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer) return srs_error_wrap(err, "play edge"); } } - + return err; } @@ -2636,7 +2638,7 @@ void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer) if (it != consumers.end()) { consumers.erase(it); } - + if (consumers.empty()) { play_edge->on_all_client_stop(); die_at = srs_get_system_time(); diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index 8d59cbc9d4..b7fcb80ff2 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -36,11 +36,11 @@ using namespace std; srs_error_t srs_avc_nalu_read_uev(SrsBitBuffer* stream, int32_t& v) { srs_error_t err = srs_success; - + if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "empty stream"); } - + // ue(v) in 9.1 Parsing process for Exp-Golomb codes // ISO_IEC_14496-10-AVC-2012.pdf, page 227. // Syntax elements coded as ue(v), me(v), or se(v) are Exp-Golomb-coded. @@ -53,34 +53,34 @@ srs_error_t srs_avc_nalu_read_uev(SrsBitBuffer* stream, int32_t& v) for (int8_t b = 0; !b && !stream->empty(); leadingZeroBits++) { b = stream->read_bit(); } - + if (leadingZeroBits >= 31) { return srs_error_new(ERROR_AVC_NALU_UEV, "%dbits overflow 31bits", leadingZeroBits); } - + v = (1 << leadingZeroBits) - 1; for (int i = 0; i < (int)leadingZeroBits; i++) { if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "no bytes for leadingZeroBits=%d", leadingZeroBits); } - + int32_t b = stream->read_bit(); v += b << (leadingZeroBits - 1 - i); } - + return err; } srs_error_t srs_avc_nalu_read_bit(SrsBitBuffer* stream, int8_t& v) { srs_error_t err = srs_success; - + if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "empty stream"); } - + v = stream->read_bit(); - + return err; } @@ -92,7 +92,7 @@ srs_utime_t srs_get_system_time() if (_srs_system_time_us_cache <= 0) { srs_update_system_time(); } - + return _srs_system_time_us_cache; } @@ -113,15 +113,15 @@ srs_gettimeofday_t _srs_gettimeofday = (srs_gettimeofday_t)::gettimeofday; srs_utime_t srs_update_system_time() { timeval now; - + if (_srs_gettimeofday(&now, NULL) < 0) { srs_warn("gettimeofday failed, ignore"); return -1; } - + // we must convert the tv_sec/tv_usec to int64_t. int64_t now_us = ((int64_t)now.tv_sec) * 1000 * 1000 + (int64_t)now.tv_usec; - + // for some ARM os, the starttime maybe invalid, // for example, on the cubieboard2, the srs_startup_time is 1262304014640, // while now is 1403842979210 in ms, diff is 141538964570 ms, 1638 days @@ -132,7 +132,7 @@ srs_utime_t srs_update_system_time() _srs_system_time_startup_time = _srs_system_time_us_cache = now_us; return _srs_system_time_us_cache; } - + // use relative time. int64_t diff = now_us - _srs_system_time_us_cache; diff = srs_max(0, diff); @@ -140,10 +140,10 @@ srs_utime_t srs_update_system_time() srs_warn("clock jump, history=%" PRId64 "us, now=%" PRId64 "us, diff=%" PRId64 "us", _srs_system_time_us_cache, now_us, diff); _srs_system_time_startup_time += diff; } - + _srs_system_time_us_cache = now_us; srs_info("clock updated, startup=%" PRId64 "us, now=%" PRId64 "us", _srs_system_time_startup_time, _srs_system_time_us_cache); - + return _srs_system_time_us_cache; } @@ -153,21 +153,24 @@ string srs_dns_resolve(string host, int& family) addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = family; - + addrinfo* r = NULL; - SrsAutoFree(addrinfo, r); + // bugfix: r is alloc by getaddrinfo, cannot call delete to free it, must use freeaddrinfo(r) + //SrsAutoFree(addrinfo, r); if(getaddrinfo(host.c_str(), NULL, &hints, &r)) { return ""; } - + char shost[64]; memset(shost, 0, sizeof(shost)); if (getnameinfo(r->ai_addr, r->ai_addrlen, shost, sizeof(shost), NULL, 0, NI_NUMERICHOST)) { + free(r); return ""; } - family = r->ai_family; - return string(shost); + freeaddrinfo(r); + family = r->ai_family; + return string(shost); } void srs_parse_hostport(string hostport, string& host, int& port) @@ -246,7 +249,7 @@ void srs_parse_endpoint(string hostport, string& ip, int& port) // Handle IP address ip = hostport.substr(0, pos); } - + const string sport = hostport.substr(pos + 1); port = ::atoi(sport.c_str()); } else { @@ -264,12 +267,12 @@ bool srs_check_ip_addr_valid(string ip) if (ret > 0) { return true; } - + ret = inet_pton(AF_INET6, ip.data(), buf); if (ret > 0) { return true; } - + return false; } @@ -298,84 +301,84 @@ bool srs_is_little_endian() // convert to network(big-endian) order, if not equals, // the system is little-endian, so need to convert the int64 static int little_endian_check = -1; - + if(little_endian_check == -1) { union { int32_t i; int8_t c; } little_check_union; - + little_check_union.i = 0x01; little_endian_check = little_check_union.c; } - + return (little_endian_check == 1); } string srs_string_replace(string str, string old_str, string new_str) { std::string ret = str; - + if (old_str == new_str) { return ret; } - + size_t pos = 0; while ((pos = ret.find(old_str, pos)) != std::string::npos) { ret = ret.replace(pos, old_str.length(), new_str); pos += new_str.length(); } - + return ret; } string srs_string_trim_end(string str, string trim_chars) { std::string ret = str; - + for (int i = 0; i < (int)trim_chars.length(); i++) { char ch = trim_chars.at(i); - + while (!ret.empty() && ret.at(ret.length() - 1) == ch) { ret.erase(ret.end() - 1); - + // ok, matched, should reset the search i = -1; } } - + return ret; } string srs_string_trim_start(string str, string trim_chars) { std::string ret = str; - + for (int i = 0; i < (int)trim_chars.length(); i++) { char ch = trim_chars.at(i); - + while (!ret.empty() && ret.at(0) == ch) { ret.erase(ret.begin()); - + // ok, matched, should reset the search i = -1; } } - + return ret; } string srs_string_remove(string str, string remove_chars) { std::string ret = str; - + for (int i = 0; i < (int)remove_chars.length(); i++) { char ch = remove_chars.at(i); - + for (std::string::iterator it = ret.begin(); it != ret.end();) { if (ch == *it) { it = ret.erase(it); - + // ok, matched, should reset the search i = -1; } else { @@ -383,7 +386,7 @@ string srs_string_remove(string str, string remove_chars) } } } - + return ret; } @@ -396,7 +399,7 @@ string srs_erase_first_substr(string str, string erase_string) if (pos != std::string::npos) { ret.erase(pos, erase_string.length()); } - + return ret; } @@ -409,7 +412,7 @@ string srs_erase_last_substr(string str, string erase_string) if (pos != std::string::npos) { ret.erase(pos, erase_string.length()); } - + return ret; } @@ -487,7 +490,7 @@ vector srs_string_split(string s, string seperator) result.push_back(s); return result; } - + size_t posBegin = 0; size_t posSeperator = s.find(seperator); while (posSeperator != string::npos) { @@ -503,42 +506,42 @@ vector srs_string_split(string s, string seperator) string srs_string_min_match(string str, vector seperators) { string match; - + if (seperators.empty()) { return str; } - + size_t min_pos = string::npos; for (vector::iterator it = seperators.begin(); it != seperators.end(); ++it) { string seperator = *it; - + size_t pos = str.find(seperator); if (pos == string::npos) { continue; } - + if (min_pos == string::npos || pos < min_pos) { min_pos = pos; match = seperator; } } - + return match; } vector srs_string_split(string str, vector seperators) { vector arr; - + size_t pos = string::npos; string s = str; - + while (true) { string seperator = srs_string_min_match(s, seperators); if (seperator.empty()) { break; } - + if ((pos = s.find(seperator)) == string::npos) { break; } @@ -546,23 +549,23 @@ vector srs_string_split(string str, vector seperators) arr.push_back(s.substr(0, pos)); s = s.substr(pos + seperator.length()); } - + if (!s.empty()) { arr.push_back(s); } - + return arr; } int srs_do_create_dir_recursively(string dir) { int ret = ERROR_SUCCESS; - + // stat current dir, if exists, return error. if (srs_path_exists(dir)) { return ERROR_SYSTEM_DIR_EXISTS; } - + // create parent first. size_t pos; if ((pos = dir.rfind("/")) != std::string::npos) { @@ -575,7 +578,7 @@ int srs_do_create_dir_recursively(string dir) // parent exists, set to ok. ret = ERROR_SUCCESS; } - + // create curren dir. #ifdef _WIN32 if (::_mkdir(dir.c_str()) < 0) { @@ -586,59 +589,59 @@ int srs_do_create_dir_recursively(string dir) if (errno == EEXIST) { return ERROR_SYSTEM_DIR_EXISTS; } - + ret = ERROR_SYSTEM_CREATE_DIR; srs_error("create dir %s failed. ret=%d", dir.c_str(), ret); return ret; } - + srs_info("create dir %s success.", dir.c_str()); - + return ret; } - + bool srs_bytes_equals(void* pa, void* pb, int size) { uint8_t* a = (uint8_t*)pa; uint8_t* b = (uint8_t*)pb; - + if (!a && !b) { return true; } - + if (!a || !b) { return false; } - + for(int i = 0; i < size; i++){ if(a[i] != b[i]){ return false; } } - + return true; } srs_error_t srs_create_dir_recursively(string dir) { int ret = srs_do_create_dir_recursively(dir); - + if (ret == ERROR_SYSTEM_DIR_EXISTS || ret == ERROR_SUCCESS) { return srs_success; } - + return srs_error_new(ret, "create dir %s", dir.c_str()); } bool srs_path_exists(std::string path) { struct stat st; - + // stat current dir, if exists, return error. if (stat(path.c_str(), &st) == 0) { return true; } - + return false; } @@ -666,7 +669,7 @@ string srs_path_basename(string path) { std::string dirname = path; size_t pos = string::npos; - + if ((pos = dirname.rfind("/")) != string::npos) { // the basename("/") is "/" if (dirname.length() == 1) { @@ -674,7 +677,7 @@ string srs_path_basename(string path) } dirname = dirname.substr(pos + 1); } - + return dirname; } @@ -682,22 +685,22 @@ string srs_path_filename(string path) { std::string filename = path; size_t pos = string::npos; - + if ((pos = filename.rfind(".")) != string::npos) { return filename.substr(0, pos); } - + return filename; } string srs_path_filext(string path) { size_t pos = string::npos; - + if ((pos = path.rfind(".")) != string::npos) { return path.substr(pos); } - + return ""; } @@ -706,20 +709,20 @@ bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code) if (!stream) { return false; } - + char* bytes = stream->data() + stream->pos(); char* p = bytes; - + for (;;) { if (!stream->require((int)(p - bytes + 3))) { return false; } - + // not match if (p[0] != (char)0x00 || p[1] != (char)0x00) { return false; } - + // match N[00] 00 00 01, where N>=0 if (p[2] == (char)0x01) { if (pnb_start_code) { @@ -727,10 +730,10 @@ bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code) } return true; } - + p++; } - + return false; } @@ -739,53 +742,53 @@ bool srs_aac_startswith_adts(SrsBuffer* stream) if (!stream) { return false; } - + char* bytes = stream->data() + stream->pos(); char* p = bytes; - + if (!stream->require((int)(p - bytes) + 2)) { return false; } - + // matched 12bits 0xFFF, // @remark, we must cast the 0xff to char to compare. if (p[0] != (char)0xff || (char)(p[1] & 0xf0) != (char)0xf0) { return false; } - + return true; } - + // @see pycrc reflect at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L107 uint64_t __crc32_reflect(uint64_t data, int width) { uint64_t res = data & 0x01; - + for (int i = 0; i < (int)width - 1; i++) { data >>= 1; res = (res << 1) | (data & 0x01); } - + return res; } - + // @see pycrc gen_table at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L178 void __crc32_make_table(uint32_t t[256], uint32_t poly, bool reflect_in) { int width = 32; // 32bits checksum. uint64_t msb_mask = (uint32_t)(0x01 << (width - 1)); uint64_t mask = (uint32_t)(((msb_mask - 1) << 1) | 1); - + int tbl_idx_width = 8; // table index size. int tbl_width = 0x01 << tbl_idx_width; // table size: 256 - + for (int i = 0; i < (int)tbl_width; i++) { uint64_t reg = uint64_t(i); - + if (reflect_in) { reg = __crc32_reflect(reg, tbl_idx_width); } - + reg = reg << (width - tbl_idx_width); for (int j = 0; j < tbl_idx_width; j++) { if ((reg&msb_mask) != 0) { @@ -794,53 +797,53 @@ void __crc32_make_table(uint32_t t[256], uint32_t poly, bool reflect_in) reg = reg << 1; } } - + if (reflect_in) { reg = __crc32_reflect(reg, width); } - + t[i] = (uint32_t)(reg & mask); } } - + // @see pycrc table_driven at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L207 uint32_t __crc32_table_driven(uint32_t* t, const void* buf, int size, uint32_t previous, bool reflect_in, uint32_t xor_in, bool reflect_out, uint32_t xor_out) { int width = 32; // 32bits checksum. uint64_t msb_mask = (uint32_t)(0x01 << (width - 1)); uint64_t mask = (uint32_t)(((msb_mask - 1) << 1) | 1); - + int tbl_idx_width = 8; // table index size. - + uint8_t* p = (uint8_t*)buf; uint64_t reg = 0; - + if (!reflect_in) { reg = xor_in; - + for (int i = 0; i < (int)size; i++) { uint8_t tblidx = (uint8_t)((reg >> (width - tbl_idx_width)) ^ p[i]); reg = t[tblidx] ^ (reg << tbl_idx_width); } } else { reg = previous ^ __crc32_reflect(xor_in, width); - + for (int i = 0; i < (int)size; i++) { uint8_t tblidx = (uint8_t)(reg ^ p[i]); reg = t[tblidx] ^ (reg >> tbl_idx_width); } - + reg = __crc32_reflect(reg, width); } - + if (reflect_out) { reg = __crc32_reflect(reg, width); } - + reg ^= xor_out; return (uint32_t)(reg & mask); } - + // @see pycrc https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L207 // IEEETable is the table for the IEEE polynomial. static uint32_t __crc32_IEEE_table[256]; @@ -865,20 +868,20 @@ uint32_t srs_crc32_ieee(const void* buf, int size, uint32_t previous) // @remark The poly of CRC32 IEEE is 0x04C11DB7, its reverse is 0xEDB88320, // please read https://en.wikipedia.org/wiki/Cyclic_redundancy_check uint32_t poly = 0x04C11DB7; - + bool reflect_in = true; uint32_t xor_in = 0xffffffff; bool reflect_out = true; uint32_t xor_out = 0xffffffff; - + if (!__crc32_IEEE_table_initialized) { __crc32_make_table(__crc32_IEEE_table, poly, reflect_in); __crc32_IEEE_table_initialized = true; } - + return __crc32_table_driven(__crc32_IEEE_table, buf, size, previous, reflect_in, xor_in, reflect_out, xor_out); } - + // @see pycrc https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L238 // IEEETable is the table for the MPEG polynomial. static uint32_t __crc32_MPEG_table[256]; @@ -903,17 +906,17 @@ uint32_t srs_crc32_mpegts(const void* buf, int size) // @remark The poly of CRC32 IEEE is 0x04C11DB7, its reverse is 0xEDB88320, // please read https://en.wikipedia.org/wiki/Cyclic_redundancy_check uint32_t poly = 0x04C11DB7; - + bool reflect_in = false; uint32_t xor_in = 0xffffffff; bool reflect_out = false; uint32_t xor_out = 0x0; - + if (!__crc32_MPEG_table_initialized) { __crc32_make_table(__crc32_MPEG_table, poly, reflect_in); __crc32_MPEG_table_initialized = true; } - + return __crc32_table_driven(__crc32_MPEG_table, buf, size, 0x00, reflect_in, xor_in, reflect_out, xor_out); } @@ -928,51 +931,51 @@ namespace { srs_error_t srs_av_base64_decode(string cipher, string& plaintext) { srs_error_t err = srs_success; - + uint8_t decodeMap[256]; memset(decodeMap, 0xff, sizeof(decodeMap)); - + for (int i = 0; i < (int)encoder.length(); i++) { decodeMap[(uint8_t)encoder.at(i)] = uint8_t(i); } - + // decode is like Decode but returns an additional 'end' value, which // indicates if end-of-message padding or a partial quantum was encountered // and thus any additional data is an error. int si = 0; - + // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } - + for (bool end = false; si < (int)cipher.length() && !end;) { // Decode quantum using the base64 alphabet uint8_t dbuf[4]; memset(dbuf, 0x00, sizeof(dbuf)); - + int dinc = 3; int dlen = 4; srs_assert(dinc > 0); - + for (int j = 0; j < (int)sizeof(dbuf); j++) { if (si == (int)cipher.length()) { if (padding != -1 || j < 2) { return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } - + dinc = j - 1; dlen = j; end = true; break; } - + char in = cipher.at(si); - + si++; // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } - + if (in == padding) { // We've reached the end and there's padding switch (j) { @@ -989,13 +992,13 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) // incorrect padding return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } - + si++; // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } } - + if (si < (int)cipher.length()) { // trailing garbage err = srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); @@ -1005,13 +1008,13 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) end = true; break; } - + dbuf[j] = decodeMap[(uint8_t)in]; if (dbuf[j] == 0xff) { return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } } - + // Convert 4x 6bit source bytes into 3 bytes uint32_t val = uint32_t(dbuf[0])<<18 | uint32_t(dbuf[1])<<12 | uint32_t(dbuf[2])<<6 | uint32_t(dbuf[3]); if (dlen >= 2) { @@ -1024,7 +1027,7 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) plaintext.append(1, char(val)); } } - + return err; } @@ -1034,7 +1037,7 @@ srs_error_t srs_av_base64_encode(std::string plaintext, std::string& cipher) srs_error_t err = srs_success; uint8_t decodeMap[256]; memset(decodeMap, 0xff, sizeof(decodeMap)); - + for (int i = 0; i < (int)encoder.length(); i++) { decodeMap[(uint8_t)encoder.at(i)] = uint8_t(i); } @@ -1095,7 +1098,7 @@ int av_toupper(int c) } return c; } - + // fromHexChar converts a hex character into its value and a success flag. uint8_t srs_from_hex_char(uint8_t c) { @@ -1119,11 +1122,11 @@ char* srs_data_to_hex(char* des, const u_int8_t* src, int len) } const char *hex_table = "0123456789ABCDEF"; - + for (int i=0; i> 4]; des[i * 2 + 1] = hex_table[src[i] & 0x0F]; - } + } return des; } @@ -1149,21 +1152,21 @@ int srs_hex_to_data(uint8_t* data, const char* p, int size) if (size <= 0 || (size%2) == 1) { return -1; } - + for (int i = 0; i < (int)size / 2; i++) { uint8_t a = srs_from_hex_char(p[i*2]); if (a == (uint8_t)-1) { return -1; } - + uint8_t b = srs_from_hex_char(p[i*2 + 1]); if (b == (uint8_t)-1) { return -1; } - + data[i] = (a << 4) | b; } - + return size / 2; } @@ -1171,18 +1174,18 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng { // to directly set the field. char* pp = NULL; - + // generate the header. char* p = cache; - + // no header. if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) { return 0; } - + // write new chunk stream header, fmt is 0 *p++ = 0x00 | (perfer_cid & 0x3F); - + // chunk message header, 11 bytes // timestamp, 3bytes, big-endian if (timestamp < RTMP_EXTENDED_TIMESTAMP) { @@ -1195,23 +1198,23 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng *p++ = (char)0xFF; *p++ = (char)0xFF; } - + // message_length, 3bytes, big-endian pp = (char*)&payload_length; *p++ = pp[2]; *p++ = pp[1]; *p++ = pp[0]; - + // message_type, 1bytes *p++ = message_type; - + // stream_id, 4bytes, little-endian pp = (char*)&stream_id; *p++ = pp[0]; *p++ = pp[1]; *p++ = pp[2]; *p++ = pp[3]; - + // for c0 // chunk extended timestamp header, 0 or 4 bytes, big-endian // @@ -1237,7 +1240,7 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng *p++ = pp[1]; *p++ = pp[0]; } - + // always has header return (int)(p - cache); } @@ -1246,20 +1249,20 @@ int srs_chunk_header_c3(int perfer_cid, uint32_t timestamp, char* cache, int nb_ { // to directly set the field. char* pp = NULL; - + // generate the header. char* p = cache; - + // no header. if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE) { return 0; } - + // write no message header chunk stream, fmt is 3 // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, // SRS will rollback to 1B chunk header. *p++ = 0xC0 | (perfer_cid & 0x3F); - + // for c0 // chunk extended timestamp header, 0 or 4 bytes, big-endian // @@ -1285,7 +1288,7 @@ int srs_chunk_header_c3(int perfer_cid, uint32_t timestamp, char* cache, int nb_ *p++ = pp[1]; *p++ = pp[0]; } - + // always has header return (int)(p - cache); } diff --git a/trunk/src/protocol/srs_service_st.cpp b/trunk/src/protocol/srs_service_st.cpp index b4e295377a..b5c12cdfb3 100644 --- a/trunk/src/protocol/srs_service_st.cpp +++ b/trunk/src/protocol/srs_service_st.cpp @@ -27,12 +27,12 @@ using namespace std; bool srs_st_epoll_is_supported(void) { struct epoll_event ev; - + ev.events = EPOLLIN; ev.data.ptr = NULL; /* Guaranteed to fail */ epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev); - + return (errno != ENOSYS); } #endif @@ -45,7 +45,7 @@ srs_error_t srs_st_init() return srs_error_new(ERROR_ST_SET_EPOLL, "linux epoll disabled"); } #endif - + // Select the best event system available on the OS. In Linux this is // epoll(). On BSD it will be kqueue. if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) { @@ -57,7 +57,7 @@ srs_error_t srs_st_init() if (cid.empty()) { cid = _srs_context->generate_id(); } - + int r0 = 0; if((r0 = st_init()) != 0){ return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0); @@ -66,7 +66,7 @@ srs_error_t srs_st_init() // Switch to the background cid. _srs_context->set_id(cid); srs_info("st_init success, use %s", st_get_eventsys_name()); - + return srs_success; } @@ -158,41 +158,46 @@ srs_error_t srs_tcp_connect(string server, int port, srs_utime_t tm, srs_netfd_t if (tm != SRS_UTIME_NO_TIMEOUT) { timeout = tm; } - + *pstfd = NULL; srs_netfd_t stfd = NULL; char sport[8]; snprintf(sport, sizeof(sport), "%d", port); - + addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; - + addrinfo* r = NULL; - SrsAutoFree(addrinfo, r); + // bugfix: r is alloc by getaddrinfo, cannot call delete to free it, must use freeaddrinfo(r) + //SrsAutoFree(addrinfo, r); if(getaddrinfo(server.c_str(), sport, (const addrinfo*)&hints, &r)) { return srs_error_new(ERROR_SYSTEM_IP_INVALID, "get address info"); } - + int sock = socket(r->ai_family, r->ai_socktype, r->ai_protocol); if(sock == -1){ + freeaddrinfo(r); return srs_error_new(ERROR_SOCKET_CREATE, "create socket"); } - + srs_assert(!stfd); stfd = st_netfd_open_socket(sock); if(stfd == NULL){ ::close(sock); + freeaddrinfo(r); return srs_error_new(ERROR_ST_OPEN_SOCKET, "open socket"); } - + if (st_connect((st_netfd_t)stfd, r->ai_addr, r->ai_addrlen, timeout) == -1){ srs_close_stfd(stfd); + freeaddrinfo(r); return srs_error_new(ERROR_ST_CONNECT, "connect to %s:%d", server.c_str(), port); } - + + freeaddrinfo(r); *pstfd = stfd; return srs_success; } @@ -248,7 +253,8 @@ srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) hints.ai_flags = AI_NUMERICHOST; addrinfo* r = NULL; - SrsAutoFreeF(addrinfo, r); + // bugfix: r is alloc by getaddrinfo, cannot call delete to free it, must use freeaddrinfo(r) + //SrsAutoFreeF(addrinfo, r); if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) { return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)", hints.ai_family, hints.ai_socktype, hints.ai_flags); @@ -256,15 +262,18 @@ srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) int fd = 0; if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) { + freeaddrinfo(r); return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d", r->ai_family, r->ai_socktype, r->ai_protocol); } if ((err = do_srs_tcp_listen(fd, r, pfd)) != srs_success) { ::close(fd); + freeaddrinfo(r); return srs_error_wrap(err, "fd=%d", fd); } + freeaddrinfo(r); return err; } @@ -309,7 +318,8 @@ srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd) hints.ai_flags = AI_NUMERICHOST; addrinfo* r = NULL; - SrsAutoFree(addrinfo, r); + // bugfix: r is alloc by getaddrinfo, cannot call delete to free it, must use freeaddrinfo(r) + //SrsAutoFree(addrinfo, r); if(getaddrinfo(ip.c_str(), sport, (const addrinfo*)&hints, &r)) { return srs_error_new(ERROR_SYSTEM_IP_INVALID, "getaddrinfo hints=(%d,%d,%d)", hints.ai_family, hints.ai_socktype, hints.ai_flags); @@ -317,15 +327,18 @@ srs_error_t srs_udp_listen(std::string ip, int port, srs_netfd_t* pfd) int fd = 0; if ((fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol)) == -1) { + freeaddrinfo(r); return srs_error_new(ERROR_SOCKET_CREATE, "socket domain=%d, type=%d, protocol=%d", r->ai_family, r->ai_socktype, r->ai_protocol); } if ((err = do_srs_udp_listen(fd, r, pfd)) != srs_success) { ::close(fd); + freeaddrinfo(r); return srs_error_wrap(err, "fd=%d", fd); } + freeaddrinfo(r); return err; } @@ -487,18 +500,18 @@ int64_t SrsStSocket::get_send_bytes() srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read((st_netfd_t)stfd, buf, size, rtm); } - + if (nread) { *nread = nb_read; } - + // On success a non-negative integer indicating the number of bytes actually read is returned // (a value of 0 means the network connection is closed or end of file is reached). // Otherwise, a value of -1 is returned and errno is set to indicate the error. @@ -506,34 +519,34 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm)); } - + if (nb_read == 0) { errno = ECONNRESET; } - + return srs_error_new(ERROR_SOCKET_READ, "read"); } - + rbytes += nb_read; - + return err; } srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm); } - + if (nread) { *nread = nb_read; } - + // On success a non-negative integer indicating the number of bytes actually read is returned // (a value less than nbyte means the network connection is closed or end of file is reached) // Otherwise, a value of -1 is returned and errno is set to indicate the error. @@ -541,76 +554,76 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm)); } - + if (nb_read >= 0) { errno = ECONNRESET; } - + return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully"); } - + rbytes += nb_read; - + return err; } srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) { srs_error_t err = srs_success; - + ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_write((st_netfd_t)stfd, buf, size, stm); } - + if (nwrite) { *nwrite = nb_write; } - + // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", srsu2msi(stm)); } - + return srs_error_new(ERROR_SOCKET_WRITE, "write"); } - + sbytes += nb_write; - + return err; } srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { srs_error_t err = srs_success; - + ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm); } - + if (nwrite) { *nwrite = nb_write; } - + // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", srsu2msi(stm)); } - + return srs_error_new(ERROR_SOCKET_WRITE, "writev"); } - + sbytes += nb_write; - + return err; } @@ -618,7 +631,7 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) { stfd = NULL; io = new SrsStSocket(); - + host = h; port = p; timeout = tm; @@ -627,25 +640,25 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) SrsTcpClient::~SrsTcpClient() { close(); - + srs_freep(io); } srs_error_t SrsTcpClient::connect() { srs_error_t err = srs_success; - + close(); - + srs_assert(stfd == NULL); if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) { return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); } - + if ((err = io->initialize(stfd)) != srs_success) { return srs_error_wrap(err, "tcp: init socket object"); } - + return err; } @@ -655,7 +668,7 @@ void SrsTcpClient::close() if (!io) { return; } - + srs_close_stfd(stfd); } From 4356ff64198dc6f3a45d34c407cb9d01937e4677 Mon Sep 17 00:00:00 2001 From: wanglei Date: Tue, 11 Jan 2022 16:14:10 +0800 Subject: [PATCH 2/2] bugfix: SrsMetaCache memleak; getaddrinfo use delete. --- trunk/src/app/srs_app_source.cpp | 4 +- trunk/src/kernel/srs_kernel_utility.cpp | 292 ++++++++++++------------ trunk/src/protocol/srs_service_st.cpp | 77 ++++--- 3 files changed, 187 insertions(+), 186 deletions(-) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 623e5162b2..fd42b95449 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1526,8 +1526,8 @@ void SrsMetaCache::dispose() clear(); srs_freep(previous_video); srs_freep(previous_audio); - srs_freep(previous_video); - srs_freep(previous_audio); + srs_freep(vformat); + srs_freep(aformat); } void SrsMetaCache::clear() diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index b7fcb80ff2..3298970ab2 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -36,11 +36,11 @@ using namespace std; srs_error_t srs_avc_nalu_read_uev(SrsBitBuffer* stream, int32_t& v) { srs_error_t err = srs_success; - + if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "empty stream"); } - + // ue(v) in 9.1 Parsing process for Exp-Golomb codes // ISO_IEC_14496-10-AVC-2012.pdf, page 227. // Syntax elements coded as ue(v), me(v), or se(v) are Exp-Golomb-coded. @@ -53,34 +53,34 @@ srs_error_t srs_avc_nalu_read_uev(SrsBitBuffer* stream, int32_t& v) for (int8_t b = 0; !b && !stream->empty(); leadingZeroBits++) { b = stream->read_bit(); } - + if (leadingZeroBits >= 31) { return srs_error_new(ERROR_AVC_NALU_UEV, "%dbits overflow 31bits", leadingZeroBits); } - + v = (1 << leadingZeroBits) - 1; for (int i = 0; i < (int)leadingZeroBits; i++) { if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "no bytes for leadingZeroBits=%d", leadingZeroBits); } - + int32_t b = stream->read_bit(); v += b << (leadingZeroBits - 1 - i); } - + return err; } srs_error_t srs_avc_nalu_read_bit(SrsBitBuffer* stream, int8_t& v) { srs_error_t err = srs_success; - + if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "empty stream"); } - + v = stream->read_bit(); - + return err; } @@ -92,7 +92,7 @@ srs_utime_t srs_get_system_time() if (_srs_system_time_us_cache <= 0) { srs_update_system_time(); } - + return _srs_system_time_us_cache; } @@ -113,15 +113,15 @@ srs_gettimeofday_t _srs_gettimeofday = (srs_gettimeofday_t)::gettimeofday; srs_utime_t srs_update_system_time() { timeval now; - + if (_srs_gettimeofday(&now, NULL) < 0) { srs_warn("gettimeofday failed, ignore"); return -1; } - + // we must convert the tv_sec/tv_usec to int64_t. int64_t now_us = ((int64_t)now.tv_sec) * 1000 * 1000 + (int64_t)now.tv_usec; - + // for some ARM os, the starttime maybe invalid, // for example, on the cubieboard2, the srs_startup_time is 1262304014640, // while now is 1403842979210 in ms, diff is 141538964570 ms, 1638 days @@ -132,7 +132,7 @@ srs_utime_t srs_update_system_time() _srs_system_time_startup_time = _srs_system_time_us_cache = now_us; return _srs_system_time_us_cache; } - + // use relative time. int64_t diff = now_us - _srs_system_time_us_cache; diff = srs_max(0, diff); @@ -140,10 +140,10 @@ srs_utime_t srs_update_system_time() srs_warn("clock jump, history=%" PRId64 "us, now=%" PRId64 "us, diff=%" PRId64 "us", _srs_system_time_us_cache, now_us, diff); _srs_system_time_startup_time += diff; } - + _srs_system_time_us_cache = now_us; srs_info("clock updated, startup=%" PRId64 "us, now=%" PRId64 "us", _srs_system_time_startup_time, _srs_system_time_us_cache); - + return _srs_system_time_us_cache; } @@ -249,7 +249,7 @@ void srs_parse_endpoint(string hostport, string& ip, int& port) // Handle IP address ip = hostport.substr(0, pos); } - + const string sport = hostport.substr(pos + 1); port = ::atoi(sport.c_str()); } else { @@ -267,12 +267,12 @@ bool srs_check_ip_addr_valid(string ip) if (ret > 0) { return true; } - + ret = inet_pton(AF_INET6, ip.data(), buf); if (ret > 0) { return true; } - + return false; } @@ -301,84 +301,84 @@ bool srs_is_little_endian() // convert to network(big-endian) order, if not equals, // the system is little-endian, so need to convert the int64 static int little_endian_check = -1; - + if(little_endian_check == -1) { union { int32_t i; int8_t c; } little_check_union; - + little_check_union.i = 0x01; little_endian_check = little_check_union.c; } - + return (little_endian_check == 1); } string srs_string_replace(string str, string old_str, string new_str) { std::string ret = str; - + if (old_str == new_str) { return ret; } - + size_t pos = 0; while ((pos = ret.find(old_str, pos)) != std::string::npos) { ret = ret.replace(pos, old_str.length(), new_str); pos += new_str.length(); } - + return ret; } string srs_string_trim_end(string str, string trim_chars) { std::string ret = str; - + for (int i = 0; i < (int)trim_chars.length(); i++) { char ch = trim_chars.at(i); - + while (!ret.empty() && ret.at(ret.length() - 1) == ch) { ret.erase(ret.end() - 1); - + // ok, matched, should reset the search i = -1; } } - + return ret; } string srs_string_trim_start(string str, string trim_chars) { std::string ret = str; - + for (int i = 0; i < (int)trim_chars.length(); i++) { char ch = trim_chars.at(i); - + while (!ret.empty() && ret.at(0) == ch) { ret.erase(ret.begin()); - + // ok, matched, should reset the search i = -1; } } - + return ret; } string srs_string_remove(string str, string remove_chars) { std::string ret = str; - + for (int i = 0; i < (int)remove_chars.length(); i++) { char ch = remove_chars.at(i); - + for (std::string::iterator it = ret.begin(); it != ret.end();) { if (ch == *it) { it = ret.erase(it); - + // ok, matched, should reset the search i = -1; } else { @@ -386,7 +386,7 @@ string srs_string_remove(string str, string remove_chars) } } } - + return ret; } @@ -399,7 +399,7 @@ string srs_erase_first_substr(string str, string erase_string) if (pos != std::string::npos) { ret.erase(pos, erase_string.length()); } - + return ret; } @@ -412,7 +412,7 @@ string srs_erase_last_substr(string str, string erase_string) if (pos != std::string::npos) { ret.erase(pos, erase_string.length()); } - + return ret; } @@ -490,7 +490,7 @@ vector srs_string_split(string s, string seperator) result.push_back(s); return result; } - + size_t posBegin = 0; size_t posSeperator = s.find(seperator); while (posSeperator != string::npos) { @@ -506,42 +506,42 @@ vector srs_string_split(string s, string seperator) string srs_string_min_match(string str, vector seperators) { string match; - + if (seperators.empty()) { return str; } - + size_t min_pos = string::npos; for (vector::iterator it = seperators.begin(); it != seperators.end(); ++it) { string seperator = *it; - + size_t pos = str.find(seperator); if (pos == string::npos) { continue; } - + if (min_pos == string::npos || pos < min_pos) { min_pos = pos; match = seperator; } } - + return match; } vector srs_string_split(string str, vector seperators) { vector arr; - + size_t pos = string::npos; string s = str; - + while (true) { string seperator = srs_string_min_match(s, seperators); if (seperator.empty()) { break; } - + if ((pos = s.find(seperator)) == string::npos) { break; } @@ -549,23 +549,23 @@ vector srs_string_split(string str, vector seperators) arr.push_back(s.substr(0, pos)); s = s.substr(pos + seperator.length()); } - + if (!s.empty()) { arr.push_back(s); } - + return arr; } int srs_do_create_dir_recursively(string dir) { int ret = ERROR_SUCCESS; - + // stat current dir, if exists, return error. if (srs_path_exists(dir)) { return ERROR_SYSTEM_DIR_EXISTS; } - + // create parent first. size_t pos; if ((pos = dir.rfind("/")) != std::string::npos) { @@ -578,7 +578,7 @@ int srs_do_create_dir_recursively(string dir) // parent exists, set to ok. ret = ERROR_SUCCESS; } - + // create curren dir. #ifdef _WIN32 if (::_mkdir(dir.c_str()) < 0) { @@ -589,59 +589,59 @@ int srs_do_create_dir_recursively(string dir) if (errno == EEXIST) { return ERROR_SYSTEM_DIR_EXISTS; } - + ret = ERROR_SYSTEM_CREATE_DIR; srs_error("create dir %s failed. ret=%d", dir.c_str(), ret); return ret; } - + srs_info("create dir %s success.", dir.c_str()); - + return ret; } - + bool srs_bytes_equals(void* pa, void* pb, int size) { uint8_t* a = (uint8_t*)pa; uint8_t* b = (uint8_t*)pb; - + if (!a && !b) { return true; } - + if (!a || !b) { return false; } - + for(int i = 0; i < size; i++){ if(a[i] != b[i]){ return false; } } - + return true; } srs_error_t srs_create_dir_recursively(string dir) { int ret = srs_do_create_dir_recursively(dir); - + if (ret == ERROR_SYSTEM_DIR_EXISTS || ret == ERROR_SUCCESS) { return srs_success; } - + return srs_error_new(ret, "create dir %s", dir.c_str()); } bool srs_path_exists(std::string path) { struct stat st; - + // stat current dir, if exists, return error. if (stat(path.c_str(), &st) == 0) { return true; } - + return false; } @@ -669,7 +669,7 @@ string srs_path_basename(string path) { std::string dirname = path; size_t pos = string::npos; - + if ((pos = dirname.rfind("/")) != string::npos) { // the basename("/") is "/" if (dirname.length() == 1) { @@ -677,7 +677,7 @@ string srs_path_basename(string path) } dirname = dirname.substr(pos + 1); } - + return dirname; } @@ -685,22 +685,22 @@ string srs_path_filename(string path) { std::string filename = path; size_t pos = string::npos; - + if ((pos = filename.rfind(".")) != string::npos) { return filename.substr(0, pos); } - + return filename; } string srs_path_filext(string path) { size_t pos = string::npos; - + if ((pos = path.rfind(".")) != string::npos) { return path.substr(pos); } - + return ""; } @@ -709,20 +709,20 @@ bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code) if (!stream) { return false; } - + char* bytes = stream->data() + stream->pos(); char* p = bytes; - + for (;;) { if (!stream->require((int)(p - bytes + 3))) { return false; } - + // not match if (p[0] != (char)0x00 || p[1] != (char)0x00) { return false; } - + // match N[00] 00 00 01, where N>=0 if (p[2] == (char)0x01) { if (pnb_start_code) { @@ -730,10 +730,10 @@ bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code) } return true; } - + p++; } - + return false; } @@ -742,53 +742,53 @@ bool srs_aac_startswith_adts(SrsBuffer* stream) if (!stream) { return false; } - + char* bytes = stream->data() + stream->pos(); char* p = bytes; - + if (!stream->require((int)(p - bytes) + 2)) { return false; } - + // matched 12bits 0xFFF, // @remark, we must cast the 0xff to char to compare. if (p[0] != (char)0xff || (char)(p[1] & 0xf0) != (char)0xf0) { return false; } - + return true; } - + // @see pycrc reflect at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L107 uint64_t __crc32_reflect(uint64_t data, int width) { uint64_t res = data & 0x01; - + for (int i = 0; i < (int)width - 1; i++) { data >>= 1; res = (res << 1) | (data & 0x01); } - + return res; } - + // @see pycrc gen_table at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L178 void __crc32_make_table(uint32_t t[256], uint32_t poly, bool reflect_in) { int width = 32; // 32bits checksum. uint64_t msb_mask = (uint32_t)(0x01 << (width - 1)); uint64_t mask = (uint32_t)(((msb_mask - 1) << 1) | 1); - + int tbl_idx_width = 8; // table index size. int tbl_width = 0x01 << tbl_idx_width; // table size: 256 - + for (int i = 0; i < (int)tbl_width; i++) { uint64_t reg = uint64_t(i); - + if (reflect_in) { reg = __crc32_reflect(reg, tbl_idx_width); } - + reg = reg << (width - tbl_idx_width); for (int j = 0; j < tbl_idx_width; j++) { if ((reg&msb_mask) != 0) { @@ -797,53 +797,53 @@ void __crc32_make_table(uint32_t t[256], uint32_t poly, bool reflect_in) reg = reg << 1; } } - + if (reflect_in) { reg = __crc32_reflect(reg, width); } - + t[i] = (uint32_t)(reg & mask); } } - + // @see pycrc table_driven at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L207 uint32_t __crc32_table_driven(uint32_t* t, const void* buf, int size, uint32_t previous, bool reflect_in, uint32_t xor_in, bool reflect_out, uint32_t xor_out) { int width = 32; // 32bits checksum. uint64_t msb_mask = (uint32_t)(0x01 << (width - 1)); uint64_t mask = (uint32_t)(((msb_mask - 1) << 1) | 1); - + int tbl_idx_width = 8; // table index size. - + uint8_t* p = (uint8_t*)buf; uint64_t reg = 0; - + if (!reflect_in) { reg = xor_in; - + for (int i = 0; i < (int)size; i++) { uint8_t tblidx = (uint8_t)((reg >> (width - tbl_idx_width)) ^ p[i]); reg = t[tblidx] ^ (reg << tbl_idx_width); } } else { reg = previous ^ __crc32_reflect(xor_in, width); - + for (int i = 0; i < (int)size; i++) { uint8_t tblidx = (uint8_t)(reg ^ p[i]); reg = t[tblidx] ^ (reg >> tbl_idx_width); } - + reg = __crc32_reflect(reg, width); } - + if (reflect_out) { reg = __crc32_reflect(reg, width); } - + reg ^= xor_out; return (uint32_t)(reg & mask); } - + // @see pycrc https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L207 // IEEETable is the table for the IEEE polynomial. static uint32_t __crc32_IEEE_table[256]; @@ -868,20 +868,20 @@ uint32_t srs_crc32_ieee(const void* buf, int size, uint32_t previous) // @remark The poly of CRC32 IEEE is 0x04C11DB7, its reverse is 0xEDB88320, // please read https://en.wikipedia.org/wiki/Cyclic_redundancy_check uint32_t poly = 0x04C11DB7; - + bool reflect_in = true; uint32_t xor_in = 0xffffffff; bool reflect_out = true; uint32_t xor_out = 0xffffffff; - + if (!__crc32_IEEE_table_initialized) { __crc32_make_table(__crc32_IEEE_table, poly, reflect_in); __crc32_IEEE_table_initialized = true; } - + return __crc32_table_driven(__crc32_IEEE_table, buf, size, previous, reflect_in, xor_in, reflect_out, xor_out); } - + // @see pycrc https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L238 // IEEETable is the table for the MPEG polynomial. static uint32_t __crc32_MPEG_table[256]; @@ -906,17 +906,17 @@ uint32_t srs_crc32_mpegts(const void* buf, int size) // @remark The poly of CRC32 IEEE is 0x04C11DB7, its reverse is 0xEDB88320, // please read https://en.wikipedia.org/wiki/Cyclic_redundancy_check uint32_t poly = 0x04C11DB7; - + bool reflect_in = false; uint32_t xor_in = 0xffffffff; bool reflect_out = false; uint32_t xor_out = 0x0; - + if (!__crc32_MPEG_table_initialized) { __crc32_make_table(__crc32_MPEG_table, poly, reflect_in); __crc32_MPEG_table_initialized = true; } - + return __crc32_table_driven(__crc32_MPEG_table, buf, size, 0x00, reflect_in, xor_in, reflect_out, xor_out); } @@ -931,51 +931,51 @@ namespace { srs_error_t srs_av_base64_decode(string cipher, string& plaintext) { srs_error_t err = srs_success; - + uint8_t decodeMap[256]; memset(decodeMap, 0xff, sizeof(decodeMap)); - + for (int i = 0; i < (int)encoder.length(); i++) { decodeMap[(uint8_t)encoder.at(i)] = uint8_t(i); } - + // decode is like Decode but returns an additional 'end' value, which // indicates if end-of-message padding or a partial quantum was encountered // and thus any additional data is an error. int si = 0; - + // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } - + for (bool end = false; si < (int)cipher.length() && !end;) { // Decode quantum using the base64 alphabet uint8_t dbuf[4]; memset(dbuf, 0x00, sizeof(dbuf)); - + int dinc = 3; int dlen = 4; srs_assert(dinc > 0); - + for (int j = 0; j < (int)sizeof(dbuf); j++) { if (si == (int)cipher.length()) { if (padding != -1 || j < 2) { return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } - + dinc = j - 1; dlen = j; end = true; break; } - + char in = cipher.at(si); - + si++; // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } - + if (in == padding) { // We've reached the end and there's padding switch (j) { @@ -992,13 +992,13 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) // incorrect padding return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } - + si++; // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } } - + if (si < (int)cipher.length()) { // trailing garbage err = srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); @@ -1008,13 +1008,13 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) end = true; break; } - + dbuf[j] = decodeMap[(uint8_t)in]; if (dbuf[j] == 0xff) { return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } } - + // Convert 4x 6bit source bytes into 3 bytes uint32_t val = uint32_t(dbuf[0])<<18 | uint32_t(dbuf[1])<<12 | uint32_t(dbuf[2])<<6 | uint32_t(dbuf[3]); if (dlen >= 2) { @@ -1027,7 +1027,7 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) plaintext.append(1, char(val)); } } - + return err; } @@ -1037,7 +1037,7 @@ srs_error_t srs_av_base64_encode(std::string plaintext, std::string& cipher) srs_error_t err = srs_success; uint8_t decodeMap[256]; memset(decodeMap, 0xff, sizeof(decodeMap)); - + for (int i = 0; i < (int)encoder.length(); i++) { decodeMap[(uint8_t)encoder.at(i)] = uint8_t(i); } @@ -1098,7 +1098,7 @@ int av_toupper(int c) } return c; } - + // fromHexChar converts a hex character into its value and a success flag. uint8_t srs_from_hex_char(uint8_t c) { @@ -1122,11 +1122,11 @@ char* srs_data_to_hex(char* des, const u_int8_t* src, int len) } const char *hex_table = "0123456789ABCDEF"; - + for (int i=0; i> 4]; des[i * 2 + 1] = hex_table[src[i] & 0x0F]; - } + } return des; } @@ -1152,21 +1152,21 @@ int srs_hex_to_data(uint8_t* data, const char* p, int size) if (size <= 0 || (size%2) == 1) { return -1; } - + for (int i = 0; i < (int)size / 2; i++) { uint8_t a = srs_from_hex_char(p[i*2]); if (a == (uint8_t)-1) { return -1; } - + uint8_t b = srs_from_hex_char(p[i*2 + 1]); if (b == (uint8_t)-1) { return -1; } - + data[i] = (a << 4) | b; } - + return size / 2; } @@ -1174,18 +1174,18 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng { // to directly set the field. char* pp = NULL; - + // generate the header. char* p = cache; - + // no header. if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) { return 0; } - + // write new chunk stream header, fmt is 0 *p++ = 0x00 | (perfer_cid & 0x3F); - + // chunk message header, 11 bytes // timestamp, 3bytes, big-endian if (timestamp < RTMP_EXTENDED_TIMESTAMP) { @@ -1198,23 +1198,23 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng *p++ = (char)0xFF; *p++ = (char)0xFF; } - + // message_length, 3bytes, big-endian pp = (char*)&payload_length; *p++ = pp[2]; *p++ = pp[1]; *p++ = pp[0]; - + // message_type, 1bytes *p++ = message_type; - + // stream_id, 4bytes, little-endian pp = (char*)&stream_id; *p++ = pp[0]; *p++ = pp[1]; *p++ = pp[2]; *p++ = pp[3]; - + // for c0 // chunk extended timestamp header, 0 or 4 bytes, big-endian // @@ -1240,7 +1240,7 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng *p++ = pp[1]; *p++ = pp[0]; } - + // always has header return (int)(p - cache); } @@ -1249,20 +1249,20 @@ int srs_chunk_header_c3(int perfer_cid, uint32_t timestamp, char* cache, int nb_ { // to directly set the field. char* pp = NULL; - + // generate the header. char* p = cache; - + // no header. if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE) { return 0; } - + // write no message header chunk stream, fmt is 3 // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, // SRS will rollback to 1B chunk header. *p++ = 0xC0 | (perfer_cid & 0x3F); - + // for c0 // chunk extended timestamp header, 0 or 4 bytes, big-endian // @@ -1288,7 +1288,7 @@ int srs_chunk_header_c3(int perfer_cid, uint32_t timestamp, char* cache, int nb_ *p++ = pp[1]; *p++ = pp[0]; } - + // always has header return (int)(p - cache); } diff --git a/trunk/src/protocol/srs_service_st.cpp b/trunk/src/protocol/srs_service_st.cpp index b5c12cdfb3..a7edeb1ea8 100644 --- a/trunk/src/protocol/srs_service_st.cpp +++ b/trunk/src/protocol/srs_service_st.cpp @@ -27,12 +27,12 @@ using namespace std; bool srs_st_epoll_is_supported(void) { struct epoll_event ev; - + ev.events = EPOLLIN; ev.data.ptr = NULL; /* Guaranteed to fail */ epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev); - + return (errno != ENOSYS); } #endif @@ -45,7 +45,7 @@ srs_error_t srs_st_init() return srs_error_new(ERROR_ST_SET_EPOLL, "linux epoll disabled"); } #endif - + // Select the best event system available on the OS. In Linux this is // epoll(). On BSD it will be kqueue. if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) { @@ -57,7 +57,7 @@ srs_error_t srs_st_init() if (cid.empty()) { cid = _srs_context->generate_id(); } - + int r0 = 0; if((r0 = st_init()) != 0){ return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0); @@ -66,7 +66,7 @@ srs_error_t srs_st_init() // Switch to the background cid. _srs_context->set_id(cid); srs_info("st_init success, use %s", st_get_eventsys_name()); - + return srs_success; } @@ -277,6 +277,7 @@ srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) return err; } + srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) { srs_error_t err = srs_success; @@ -500,18 +501,18 @@ int64_t SrsStSocket::get_send_bytes() srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read((st_netfd_t)stfd, buf, size, rtm); } - + if (nread) { *nread = nb_read; } - + // On success a non-negative integer indicating the number of bytes actually read is returned // (a value of 0 means the network connection is closed or end of file is reached). // Otherwise, a value of -1 is returned and errno is set to indicate the error. @@ -519,34 +520,34 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm)); } - + if (nb_read == 0) { errno = ECONNRESET; } - + return srs_error_new(ERROR_SOCKET_READ, "read"); } - + rbytes += nb_read; - + return err; } srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm); } - + if (nread) { *nread = nb_read; } - + // On success a non-negative integer indicating the number of bytes actually read is returned // (a value less than nbyte means the network connection is closed or end of file is reached) // Otherwise, a value of -1 is returned and errno is set to indicate the error. @@ -554,76 +555,76 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm)); } - + if (nb_read >= 0) { errno = ECONNRESET; } - + return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully"); } - + rbytes += nb_read; - + return err; } srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) { srs_error_t err = srs_success; - + ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_write((st_netfd_t)stfd, buf, size, stm); } - + if (nwrite) { *nwrite = nb_write; } - + // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", srsu2msi(stm)); } - + return srs_error_new(ERROR_SOCKET_WRITE, "write"); } - + sbytes += nb_write; - + return err; } srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { srs_error_t err = srs_success; - + ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm); } - + if (nwrite) { *nwrite = nb_write; } - + // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", srsu2msi(stm)); } - + return srs_error_new(ERROR_SOCKET_WRITE, "writev"); } - + sbytes += nb_write; - + return err; } @@ -631,7 +632,7 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) { stfd = NULL; io = new SrsStSocket(); - + host = h; port = p; timeout = tm; @@ -640,25 +641,25 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) SrsTcpClient::~SrsTcpClient() { close(); - + srs_freep(io); } srs_error_t SrsTcpClient::connect() { srs_error_t err = srs_success; - + close(); - + srs_assert(stfd == NULL); if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) { return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); } - + if ((err = io->initialize(stfd)) != srs_success) { return srs_error_wrap(err, "tcp: init socket object"); } - + return err; } @@ -668,7 +669,7 @@ void SrsTcpClient::close() if (!io) { return; } - + srs_close_stfd(stfd); }