From ede0c0add9d16379aa8adfb3d03bac118cc9685d Mon Sep 17 00:00:00 2001 From: johzzy Date: Thu, 6 Jan 2022 19:41:18 +0800 Subject: [PATCH] Support Simulcast To RTMP (#2792) * add SimulcastBridgeAdapter, support simulcast to rtmp. * shared rtmp packet for dispatch_audio. --- trunk/src/app/srs_app_rtc_conn.cpp | 60 ++++++-- trunk/src/app/srs_app_rtc_source.cpp | 211 +++++++++++++++++++++++++++ trunk/src/app/srs_app_rtc_source.hpp | 8 +- trunk/src/app/srs_app_source.cpp | 2 + 4 files changed, 265 insertions(+), 16 deletions(-) diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 5d61b5d2adc..55622367593 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1135,6 +1135,51 @@ SrsRtcPublishStream::~SrsRtcPublishStream() stat->on_disconnect(cid_.c_str()); } +#ifdef SRS_FFMPEG_FIT +inline SrsRtmpFromRtcBridge* create_bridge(SrsLiveSource *rtmp) { + return new SrsRtmpFromRtcBridge(rtmp); +} + +extern SrsRtmpFromRtcBridge* create_bridge(std::vector& video_tracks); + +srs_error_t do_bridge_for_simulcast(SrsRtmpFromRtcBridge *&bridge, SrsRequest* r, std::vector& video_tracks) { + srs_error_t err = srs_success; + bridge = create_bridge(video_tracks); + if ((err = bridge->initialize(r)) != srs_success) { + srs_freep(bridge); + return srs_error_wrap(err, "create bridge"); + } + return err; +} + +srs_error_t do_bridge_for_single(SrsRtmpFromRtcBridge *&bridge, SrsRequest *r) { + srs_error_t err = srs_success; + SrsLiveSource *rtmp = NULL; + if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { + return srs_error_wrap(err, "create source"); + } + + // Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync, + // especially for stream merging. + rtmp->set_cache(false); + + bridge = create_bridge(rtmp); + if ((err = bridge->initialize(r)) != srs_success) { + srs_freep(bridge); + return srs_error_wrap(err, "create bridge"); + } + return err; +} + +srs_error_t do_bridge(SrsRtmpFromRtcBridge *&bridge, SrsRequest* r, std::vector& video_tracks) { + if (video_tracks.size() > 1) { + return do_bridge_for_simulcast(bridge, r, video_tracks); + } else { + return do_bridge_for_single(bridge, r); + } +} +#endif + srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescription* stream_desc) { srs_error_t err = srs_success; @@ -1218,18 +1263,9 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost); if (rtc_to_rtmp) { - if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) { - return srs_error_wrap(err, "create source"); - } - - // Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync, - // especially for stream merging. - rtmp->set_cache(false); - - SrsRtmpFromRtcBridge *bridge = new SrsRtmpFromRtcBridge(rtmp); - if ((err = bridge->initialize(r)) != srs_success) { - srs_freep(bridge); - return srs_error_wrap(err, "create bridge"); + SrsRtmpFromRtcBridge *bridge = NULL; + if ((err = do_bridge(bridge, r, video_tracks_)) != srs_success) { + return srs_error_wrap(err, "do_bridge"); } source->set_bridge(bridge); diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 5cbb987805d..5f5e64228dd 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -1780,6 +1780,217 @@ bool SrsRtmpFromRtcBridge::check_frame_complete(const uint16_t start, const uint return fu_s_c == fu_e_c; } + +extern srs_error_t do_bridge_for_single(SrsRtmpFromRtcBridge *&bridge, SrsRequest *r); + +// NOTE: One simulcast(low,mid,high) stream to Multiple rtmp streams (rtmp://xxx_{low,mid,high}) +std::string update_stream(size_t i, const SrsRtcTrackDescription* track_desc, const SrsRequest* rr) { + std::stringstream name; + name << rr->stream << '_'; + const std::string& ext = track_desc->rid_.rid; + if (ext.empty()) { + name << i; + } else { + name << ext; + } + return name.str(); +} + +struct BridgeContext { + SrsRtcTrackDescription* track_desc; + SrsRequest* r; + SrsRtmpFromRtcBridge* bridge; + + srs_error_t create_bridge(size_t i, SrsRequest* rr) { + srs_assert(!bridge); + srs_assert(!r); + srs_assert(rr); + r = rr->copy(); + r->stream = update_stream(i, track_desc, rr); + srs_error_t err = do_bridge_for_single(bridge, r); + if (err != srs_success) { + return srs_error_wrap(err, "do_bridge_for_single"); + } + err = bridge->on_publish(); + if (err != srs_success) { + return srs_error_wrap(err, "bridge on publish"); + } + return err; + } + + void clear() { + if (bridge) { + bridge->on_unpublish(); + srs_freep(bridge); + } + srs_freep(r); + } +}; + +class SimulcastBridgeAdapter: public SrsRtmpFromRtcBridge { + SrsRequest* r; + std::vector contexts_; + std::map bridges_; + std::vector sources_; + + srs_error_t dispatch_audio(SrsCommonMessage* rtmp) { + srs_error_t err = srs_success; + if (!rtmp->payload) { + srs_error("rtmp audio packet is nullptr, skip it"); + return err; + } + + SrsCommonMessage copy; + for (size_t i=0; ipayload); + SrsLiveSource* source = sources_.at(i); + + // backup + // @see SrsSharedPtrMessage::create(SrsCommonMessage* msg) + copy.create_payload(rtmp->size); + copy.size = rtmp->size; + memcpy(copy.payload, rtmp->payload, rtmp->size); + + if ((err = source->on_audio(rtmp)) != srs_success) { + err = srs_error_wrap(err, "source[%p] on audio", source); + break; + } + + // restore + std::swap(rtmp->payload, copy.payload); + std::swap(rtmp->size, copy.size); + } + return err; + } + + srs_error_t dispatch_video(SrsRtpPacket* pkt) { + srs_error_t err = srs_success; + uint32_t ssrc = pkt->header.get_ssrc(); + if (bridges_.end() != bridges_.find(ssrc)) { + return bridges_[ssrc]->packet_video(pkt); + } + + for (size_t i =0; ihas_ssrc(ssrc)) { + continue; + } + if (!ctx.bridge && (err = ctx.create_bridge(i, r)) != srs_success) { + return srs_error_wrap(err, "BridgeContext::create_bridge"); + } + srs_assert(ctx.bridge); + bridges_[ssrc] = ctx.bridge; + SimulcastBridgeAdapter* b = (SimulcastBridgeAdapter*)ctx.bridge; + sources_.push_back(b->source_); + return ctx.bridge->packet_video(pkt); + } + return err; + } + +public: + SimulcastBridgeAdapter(std::vector& video_tracks): SrsRtmpFromRtcBridge(NULL), r(NULL) { + for (size_t i=0; itrack_desc_, NULL, NULL}); + } + } + + srs_error_t transcode_audio(SrsRtpPacket *pkt) { + srs_error_t err = srs_success; + + // to common message. + uint32_t ts = pkt->get_avsync_time(); + if (is_first_audio) { + int header_len = 0; + uint8_t* header = NULL; + codec_->aac_codec_header(&header, &header_len); + + SrsCommonMessage out_rtmp; + packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio); + + if ((err = dispatch_audio(&out_rtmp)) != srs_success) { + return srs_error_wrap(err, "source on audio"); + } + + is_first_audio = false; + } + + std::vector out_pkts; + SrsRtpRawPayload *payload = dynamic_cast(pkt->payload()); + + SrsAudioFrame frame; + frame.add_sample(payload->payload, payload->nn_payload); + frame.dts = ts; + frame.cts = 0; + + err = codec_->transcode(&frame, out_pkts); + if (err != srs_success) { + return err; + } + + for (std::vector::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) { + SrsCommonMessage out_rtmp; + out_rtmp.header.timestamp = (*it)->dts; + packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio); + + if ((err = dispatch_audio(&out_rtmp)) != srs_success) { + err = srs_error_wrap(err, "source on audio"); + break; + } + } + codec_->free_frames(out_pkts); + + return err; + } + + srs_error_t on_rtp(SrsRtpPacket *pkt) override { + srs_error_t err = srs_success; + + if (!pkt->payload()) { + return err; + } + + // Have no received any sender report, can't calculate avsync_time, + // discard it to avoid timestamp problem in live source + if (pkt->get_avsync_time() <= 0) { + return err; + } + + if (pkt->is_audio()) { + err = transcode_audio(pkt); + } else { + err = dispatch_video(pkt); + } + + return err; + } + + void on_unpublish() override { + for (size_t i=0;icopy(); + return SrsRtmpFromRtcBridge::initialize(rr); + } +}; + +SrsRtmpFromRtcBridge* create_bridge(std::vector& video_tracks) { + return new SimulcastBridgeAdapter(video_tracks); +} + #endif SrsCodecPayload::SrsCodecPayload() diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index 2b9df361a18..84ffd2e40d3 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -285,7 +285,7 @@ class SrsRtcFromRtmpBridge : public ISrsLiveSourceBridge class SrsRtmpFromRtcBridge : public ISrsRtcSourceBridge { -private: +protected: SrsLiveSource *source_; SrsAudioTranscoder *codec_; bool is_first_audio; @@ -311,12 +311,12 @@ class SrsRtmpFromRtcBridge : public ISrsRtcSourceBridge SrsRtmpFromRtcBridge(SrsLiveSource *src); virtual ~SrsRtmpFromRtcBridge(); public: - srs_error_t initialize(SrsRequest* r); + virtual srs_error_t initialize(SrsRequest* r); public: virtual srs_error_t on_publish(); virtual srs_error_t on_rtp(SrsRtpPacket *pkt); virtual void on_unpublish(); -private: +public: srs_error_t transcode_audio(SrsRtpPacket *pkt); void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header); srs_error_t packet_video(SrsRtpPacket* pkt); @@ -512,7 +512,7 @@ class SrsRtcSourceDescription class SrsRtcRecvTrack { -protected: +public: SrsRtcTrackDescription* track_desc_; protected: SrsRtcConnection* session_; diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index a5d2e2a892f..c8402d91097 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -255,6 +255,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow { srs_error_t err = srs_success; + srs_assert(msg->payload); msgs.push_back(msg); // If jitter is off, the timestamp of first sequence header is zero, which wll cause SRS to shrink and drop the @@ -1747,6 +1748,7 @@ srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPac srs_error_t SrsMetaCache::update_ash(SrsSharedPtrMessage* msg) { + srs_assert(msg->payload); srs_freep(audio); audio = msg->copy(); update_previous_ash();