From 9d716b82fe2332f0d73808783841f605b9d0b919 Mon Sep 17 00:00:00 2001 From: johzzy Date: Thu, 6 Jan 2022 19:41:18 +0800 Subject: [PATCH] Support Simulcast To RTMP (#2792) * add SimulcastBridgerAdapter, support simulcast to rtmp. * shared rtmp packet for dispatch_audio. --- trunk/src/app/srs_app_rtc_conn.cpp | 60 ++++++-- trunk/src/app/srs_app_rtc_source.cpp | 211 +++++++++++++++++++++++++++ trunk/src/app/srs_app_rtc_source.hpp | 8 +- trunk/src/app/srs_app_source.cpp | 2 + 4 files changed, 265 insertions(+), 16 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 48005215342..a07b660fe58 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1123,6 +1123,51 @@ SrsRtcPublishStream::~SrsRtcPublishStream() stat->on_disconnect(cid_.c_str()); } +#ifdef SRS_FFMPEG_FIT +inline SrsRtmpFromRtcBridger* create_bridger(SrsLiveSource *rtmp) { + return new SrsRtmpFromRtcBridger(rtmp); +} + +extern SrsRtmpFromRtcBridger* create_bridger(std::vector& video_tracks); + +srs_error_t do_bridger_for_simulcast(SrsRtmpFromRtcBridger *&bridger, SrsRequest* r, std::vector& video_tracks) { + srs_error_t err = srs_success; + bridger = create_bridger(video_tracks); + if ((err = bridger->initialize(r)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "create bridger"); + } + return err; +} + +srs_error_t do_bridger_for_single(SrsRtmpFromRtcBridger *&bridger, SrsRequest *r) { + srs_error_t err = srs_success; + SrsLiveSource *rtmp = NULL; + if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + // Disable GOP cache for RTC2RTMP bridger, to keep the streams in sync, + // especially for stream merging. + rtmp->set_cache(false); + + bridger = create_bridger(rtmp); + if ((err = bridger->initialize(r)) != srs_success) { + srs_freep(bridger); + return srs_error_wrap(err, "create bridger"); + } + return err; +} + +srs_error_t do_bridger(SrsRtmpFromRtcBridger *&bridger, SrsRequest* r, std::vector& video_tracks) { + if (video_tracks.size() > 1) { + return do_bridger_for_simulcast(bridger, r, video_tracks); + } else { + return do_bridger_for_single(bridger, r); + } +} +#endif + srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescription* stream_desc) { srs_error_t err = srs_success; @@ -1200,18 +1245,9 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost); if (rtc_to_rtmp) { - if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - - // Disable GOP cache for RTC2RTMP bridger, to keep the streams in sync, - // especially for stream merging. - rtmp->set_cache(false); - - SrsRtmpFromRtcBridger *bridger = new SrsRtmpFromRtcBridger(rtmp); - if ((err = bridger->initialize(r)) != srs_success) { - srs_freep(bridger); - return srs_error_wrap(err, "create bridger"); + SrsRtmpFromRtcBridger *bridger = NULL; + if ((err = do_bridger(bridger, r, video_tracks_)) != srs_success) { + return srs_error_wrap(err, "do_bridger"); } source->set_bridger(bridger); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index e68178cfa83..5f1d02c8ae5 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -1774,6 +1774,217 @@ bool SrsRtmpFromRtcBridger::check_frame_complete(const uint16_t start, const uin return fu_s_c == fu_e_c; } + +extern srs_error_t do_bridger_for_single(SrsRtmpFromRtcBridger *&bridger, SrsRequest *r); + +// NOTE: One simulcast(low,mid,high) stream to Multiple rtmp streams (rtmp://xxx_{low,mid,high}) +std::string update_stream(size_t i, const SrsRtcTrackDescription* track_desc, const SrsRequest* rr) { + std::stringstream name; + name << rr->stream << '_'; + const std::string& ext = track_desc->rid_.rid; + if (ext.empty()) { + name << i; + } else { + name << ext; + } + return name.str(); +} + +struct BridgerContext { + SrsRtcTrackDescription* track_desc; + SrsRequest* r; + SrsRtmpFromRtcBridger* bridger; + + srs_error_t create_bridger(size_t i, SrsRequest* rr) { + srs_assert(!bridger); + srs_assert(!r); + srs_assert(rr); + r = rr->copy(); + r->stream = update_stream(i, track_desc, rr); + srs_error_t err = do_bridger_for_single(bridger, r); + if (err != srs_success) { + return srs_error_wrap(err, "do_bridger_for_single"); + } + err = bridger->on_publish(); + if (err != srs_success) { + return srs_error_wrap(err, "bridger on publish"); + } + return err; + } + + void clear() { + if (bridger) { + bridger->on_unpublish(); + srs_freep(bridger); + } + srs_freep(r); + } +}; + +class SimulcastBridgerAdapter: public SrsRtmpFromRtcBridger { + SrsRequest* r; + std::vector contexts_; + std::map bridgers_; + std::vector sources_; + + srs_error_t dispatch_audio(SrsCommonMessage* rtmp) { + srs_error_t err = srs_success; + if (!rtmp->payload) { + srs_error("rtmp audio packet is nullptr, skip it"); + return err; + } + + SrsCommonMessage copy; + for (size_t i=0; ipayload); + SrsLiveSource* source = sources_.at(i); + + // backup + // @see SrsSharedPtrMessage::create(SrsCommonMessage* msg) + copy.create_payload(rtmp->size); + copy.size = rtmp->size; + memcpy(copy.payload, rtmp->payload, rtmp->size); + + if ((err = source->on_audio(rtmp)) != srs_success) { + err = srs_error_wrap(err, "source[%p] on audio", source); + break; + } + + // restore + std::swap(rtmp->payload, copy.payload); + std::swap(rtmp->size, copy.size); + } + return err; + } + + srs_error_t dispatch_video(SrsRtpPacket* pkt) { + srs_error_t err = srs_success; + uint32_t ssrc = pkt->header.get_ssrc(); + if (bridgers_.end() != bridgers_.find(ssrc)) { + return bridgers_[ssrc]->packet_video(pkt); + } + + for (size_t i =0; ihas_ssrc(ssrc)) { + continue; + } + if (!ctx.bridger && (err = ctx.create_bridger(i, r)) != srs_success) { + return srs_error_wrap(err, "BridgerContext::create_bridger"); + } + srs_assert(ctx.bridger); + bridgers_[ssrc] = ctx.bridger; + SimulcastBridgerAdapter* b = (SimulcastBridgerAdapter*)ctx.bridger; + sources_.push_back(b->source_); + return ctx.bridger->packet_video(pkt); + } + return err; + } + +public: + SimulcastBridgerAdapter(std::vector& video_tracks): SrsRtmpFromRtcBridger(NULL), r(NULL) { + for (size_t i=0; itrack_desc_, NULL, NULL}); + } + } + + srs_error_t transcode_audio(SrsRtpPacket *pkt) { + srs_error_t err = srs_success; + + // to common message. + uint32_t ts = pkt->get_avsync_time(); + if (is_first_audio) { + int header_len = 0; + uint8_t* header = NULL; + codec_->aac_codec_header(&header, &header_len); + + SrsCommonMessage out_rtmp; + packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio); + + if ((err = dispatch_audio(&out_rtmp)) != srs_success) { + return srs_error_wrap(err, "source on audio"); + } + + is_first_audio = false; + } + + std::vector out_pkts; + SrsRtpRawPayload *payload = dynamic_cast(pkt->payload()); + + SrsAudioFrame frame; + frame.add_sample(payload->payload, payload->nn_payload); + frame.dts = ts; + frame.cts = 0; + + err = codec_->transcode(&frame, out_pkts); + if (err != srs_success) { + return err; + } + + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { + SrsCommonMessage out_rtmp; + out_rtmp.header.timestamp = (*it)->dts; + packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio); + + if ((err = dispatch_audio(&out_rtmp)) != srs_success) { + err = srs_error_wrap(err, "source on audio"); + break; + } + } + codec_->free_frames(out_pkts); + + return err; + } + + srs_error_t on_rtp(SrsRtpPacket *pkt) override { + srs_error_t err = srs_success; + + if (!pkt->payload()) { + return err; + } + + // Have no received any sender report, can't calculate avsync_time, + // discard it to avoid timestamp problem in live source + if (pkt->get_avsync_time() <= 0) { + return err; + } + + if (pkt->is_audio()) { + err = transcode_audio(pkt); + } else { + err = dispatch_video(pkt); + } + + return err; + } + + void on_unpublish() override { + for (size_t i=0;icopy(); + return SrsRtmpFromRtcBridger::initialize(rr); + } +}; + +SrsRtmpFromRtcBridger* create_bridger(std::vector& video_tracks) { + return new SimulcastBridgerAdapter(video_tracks); +} + #endif SrsCodecPayload::SrsCodecPayload() diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index c65411ac3f3..f1adc6837d6 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -285,7 +285,7 @@ class SrsRtcFromRtmpBridger : public ISrsLiveSourceBridger class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger { -private: +protected: SrsLiveSource *source_; SrsAudioTranscoder *codec_; bool is_first_audio; @@ -311,12 +311,12 @@ class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger SrsRtmpFromRtcBridger(SrsLiveSource *src); virtual ~SrsRtmpFromRtcBridger(); public: - srs_error_t initialize(SrsRequest* r); + virtual srs_error_t initialize(SrsRequest* r); public: virtual srs_error_t on_publish(); virtual srs_error_t on_rtp(SrsRtpPacket *pkt); virtual void on_unpublish(); -private: +public: srs_error_t transcode_audio(SrsRtpPacket *pkt); void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header); srs_error_t packet_video(SrsRtpPacket* pkt); @@ -512,7 +512,7 @@ class SrsRtcSourceDescription class SrsRtcRecvTrack { -protected: +public: SrsRtcTrackDescription* track_desc_; protected: SrsRtcConnection* session_; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index f7156103307..f27fe45ea01 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -255,6 +255,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow { srs_error_t err = srs_success; + srs_assert(msg->payload); msgs.push_back(msg); // If jitter is off, the timestamp of first sequence header is zero, which wll cause SRS to shrink and drop the @@ -1744,6 +1745,7 @@ srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPac srs_error_t SrsMetaCache::update_ash(SrsSharedPtrMessage* msg) { + srs_assert(msg->payload); srs_freep(audio); audio = msg->copy(); update_previous_ash();