Skip to content

Commit

Permalink
Support Simulcast To RTMP (ossrs#2792)
Browse files Browse the repository at this point in the history
* add SimulcastBridgerAdapter, support simulcast to rtmp.
* shared rtmp packet for dispatch_audio.
  • Loading branch information
johzzy committed Apr 6, 2022
1 parent b780ffa commit 7f95974
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 16 deletions.
60 changes: 48 additions & 12 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,6 +1123,51 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
stat->on_disconnect(cid_.c_str());
}

#ifdef SRS_FFMPEG_FIT
inline SrsRtmpFromRtcBridger* create_bridger(SrsLiveSource *rtmp) {
return new SrsRtmpFromRtcBridger(rtmp);
}

extern SrsRtmpFromRtcBridger* create_bridger(std::vector<SrsRtcVideoRecvTrack*>& video_tracks);

srs_error_t do_bridger_for_simulcast(SrsRtmpFromRtcBridger *&bridger, SrsRequest* r, std::vector<SrsRtcVideoRecvTrack*>& video_tracks) {
srs_error_t err = srs_success;
bridger = create_bridger(video_tracks);
if ((err = bridger->initialize(r)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "create bridger");
}
return err;
}

srs_error_t do_bridger_for_single(SrsRtmpFromRtcBridger *&bridger, SrsRequest *r) {
srs_error_t err = srs_success;
SrsLiveSource *rtmp = NULL;
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
return srs_error_wrap(err, "create source");
}

// Disable GOP cache for RTC2RTMP bridger, to keep the streams in sync,
// especially for stream merging.
rtmp->set_cache(false);

bridger = create_bridger(rtmp);
if ((err = bridger->initialize(r)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "create bridger");
}
return err;
}

srs_error_t do_bridger(SrsRtmpFromRtcBridger *&bridger, SrsRequest* r, std::vector<SrsRtcVideoRecvTrack*>& video_tracks) {
if (video_tracks.size() > 1) {
return do_bridger_for_simulcast(bridger, r, video_tracks);
} else {
return do_bridger_for_single(bridger, r);
}
}
#endif

srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescription* stream_desc)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -1200,18 +1245,9 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost);
if (rtc_to_rtmp) {
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
return srs_error_wrap(err, "create source");
}

// Disable GOP cache for RTC2RTMP bridger, to keep the streams in sync,
// especially for stream merging.
rtmp->set_cache(false);

SrsRtmpFromRtcBridger *bridger = new SrsRtmpFromRtcBridger(rtmp);
if ((err = bridger->initialize(r)) != srs_success) {
srs_freep(bridger);
return srs_error_wrap(err, "create bridger");
SrsRtmpFromRtcBridger *bridger = NULL;
if ((err = do_bridger(bridger, r, video_tracks_)) != srs_success) {
return srs_error_wrap(err, "do_bridger");
}

source->set_bridger(bridger);
Expand Down
211 changes: 211 additions & 0 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,217 @@ bool SrsRtmpFromRtcBridger::check_frame_complete(const uint16_t start, const uin

return fu_s_c == fu_e_c;
}

extern srs_error_t do_bridger_for_single(SrsRtmpFromRtcBridger *&bridger, SrsRequest *r);

// NOTE: One simulcast(low,mid,high) stream to Multiple rtmp streams (rtmp://xxx_{low,mid,high})
std::string update_stream(size_t i, const SrsRtcTrackDescription* track_desc, const SrsRequest* rr) {
std::stringstream name;
name << rr->stream << '_';
const std::string& ext = track_desc->rid_.rid;
if (ext.empty()) {
name << i;
} else {
name << ext;
}
return name.str();
}

struct BridgerContext {
SrsRtcTrackDescription* track_desc;
SrsRequest* r;
SrsRtmpFromRtcBridger* bridger;

srs_error_t create_bridger(size_t i, SrsRequest* rr) {
srs_assert(!bridger);
srs_assert(!r);
srs_assert(rr);
r = rr->copy();
r->stream = update_stream(i, track_desc, rr);
srs_error_t err = do_bridger_for_single(bridger, r);
if (err != srs_success) {
return srs_error_wrap(err, "do_bridger_for_single");
}
err = bridger->on_publish();
if (err != srs_success) {
return srs_error_wrap(err, "bridger on publish");
}
return err;
}

void clear() {
if (bridger) {
bridger->on_unpublish();
srs_freep(bridger);
}
srs_freep(r);
}
};

class SimulcastBridgerAdapter: public SrsRtmpFromRtcBridger {
SrsRequest* r;
std::vector<BridgerContext> contexts_;
std::map<uint32_t, SrsRtmpFromRtcBridger*> bridgers_;
std::vector<SrsLiveSource*> sources_;

srs_error_t dispatch_audio(SrsCommonMessage* rtmp) {
srs_error_t err = srs_success;
if (!rtmp->payload) {
srs_error("rtmp audio packet is nullptr, skip it");
return err;
}

SrsCommonMessage copy;
for (size_t i=0; i<sources_.size(); ++i) {
srs_assert(rtmp->payload);
SrsLiveSource* source = sources_.at(i);

// backup
// @see SrsSharedPtrMessage::create(SrsCommonMessage* msg)
copy.create_payload(rtmp->size);
copy.size = rtmp->size;
memcpy(copy.payload, rtmp->payload, rtmp->size);

if ((err = source->on_audio(rtmp)) != srs_success) {
err = srs_error_wrap(err, "source[%p] on audio", source);
break;
}

// restore
std::swap(rtmp->payload, copy.payload);
std::swap(rtmp->size, copy.size);
}
return err;
}

srs_error_t dispatch_video(SrsRtpPacket* pkt) {
srs_error_t err = srs_success;
uint32_t ssrc = pkt->header.get_ssrc();
if (bridgers_.end() != bridgers_.find(ssrc)) {
return bridgers_[ssrc]->packet_video(pkt);
}

for (size_t i =0; i<contexts_.size(); i++) {
BridgerContext& ctx = contexts_.at(i);
if (!ctx.track_desc->has_ssrc(ssrc)) {
continue;
}
if (!ctx.bridger && (err = ctx.create_bridger(i, r)) != srs_success) {
return srs_error_wrap(err, "BridgerContext::create_bridger");
}
srs_assert(ctx.bridger);
bridgers_[ssrc] = ctx.bridger;
SimulcastBridgerAdapter* b = (SimulcastBridgerAdapter*)ctx.bridger;
sources_.push_back(b->source_);
return ctx.bridger->packet_video(pkt);
}
return err;
}

public:
SimulcastBridgerAdapter(std::vector<SrsRtcVideoRecvTrack*>& video_tracks): SrsRtmpFromRtcBridger(NULL), r(NULL) {
for (size_t i=0; i<video_tracks.size(); ++i) {
SrsRtcVideoRecvTrack* track = video_tracks.at(i);
contexts_.push_back(BridgerContext{track->track_desc_, NULL, NULL});
}
}

srs_error_t transcode_audio(SrsRtpPacket *pkt) {
srs_error_t err = srs_success;

// to common message.
uint32_t ts = pkt->get_avsync_time();
if (is_first_audio) {
int header_len = 0;
uint8_t* header = NULL;
codec_->aac_codec_header(&header, &header_len);

SrsCommonMessage out_rtmp;
packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio);

if ((err = dispatch_audio(&out_rtmp)) != srs_success) {
return srs_error_wrap(err, "source on audio");
}

is_first_audio = false;
}

std::vector<SrsAudioFrame *> out_pkts;
SrsRtpRawPayload *payload = dynamic_cast<SrsRtpRawPayload *>(pkt->payload());

SrsAudioFrame frame;
frame.add_sample(payload->payload, payload->nn_payload);
frame.dts = ts;
frame.cts = 0;

err = codec_->transcode(&frame, out_pkts);
if (err != srs_success) {
return err;
}

for (std::vector<SrsAudioFrame *>::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) {
SrsCommonMessage out_rtmp;
out_rtmp.header.timestamp = (*it)->dts;
packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio);

if ((err = dispatch_audio(&out_rtmp)) != srs_success) {
err = srs_error_wrap(err, "source on audio");
break;
}
}
codec_->free_frames(out_pkts);

return err;
}

srs_error_t on_rtp(SrsRtpPacket *pkt) override {
srs_error_t err = srs_success;

if (!pkt->payload()) {
return err;
}

// Have no received any sender report, can't calculate avsync_time,
// discard it to avoid timestamp problem in live source
if (pkt->get_avsync_time() <= 0) {
return err;
}

if (pkt->is_audio()) {
err = transcode_audio(pkt);
} else {
err = dispatch_video(pkt);
}

return err;
}

void on_unpublish() override {
for (size_t i=0;i<contexts_.size(); ++i) {
BridgerContext& ctx = contexts_.at(i);
ctx.clear();
}
srs_freep(r);
bridgers_.clear();
sources_.clear();
}

srs_error_t on_publish() override {
// note delay on publish.
return srs_success;
}

srs_error_t initialize(SrsRequest* rr) override {
// note: delay initialization.
r = rr->copy();
return SrsRtmpFromRtcBridger::initialize(rr);
}
};

SrsRtmpFromRtcBridger* create_bridger(std::vector<SrsRtcVideoRecvTrack*>& video_tracks) {
return new SimulcastBridgerAdapter(video_tracks);
}

#endif

SrsCodecPayload::SrsCodecPayload()
Expand Down
8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class SrsRtcFromRtmpBridger : public ISrsLiveSourceBridger

class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger
{
private:
protected:
SrsLiveSource *source_;
SrsAudioTranscoder *codec_;
bool is_first_audio;
Expand All @@ -311,12 +311,12 @@ class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger
SrsRtmpFromRtcBridger(SrsLiveSource *src);
virtual ~SrsRtmpFromRtcBridger();
public:
srs_error_t initialize(SrsRequest* r);
virtual srs_error_t initialize(SrsRequest* r);
public:
virtual srs_error_t on_publish();
virtual srs_error_t on_rtp(SrsRtpPacket *pkt);
virtual void on_unpublish();
private:
public:
srs_error_t transcode_audio(SrsRtpPacket *pkt);
void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header);
srs_error_t packet_video(SrsRtpPacket* pkt);
Expand Down Expand Up @@ -512,7 +512,7 @@ class SrsRtcSourceDescription

class SrsRtcRecvTrack
{
protected:
public:
SrsRtcTrackDescription* track_desc_;
protected:
SrsRtcConnection* session_;
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
{
srs_error_t err = srs_success;

srs_assert(msg->payload);
msgs.push_back(msg);

// If jitter is off, the timestamp of first sequence header is zero, which wll cause SRS to shrink and drop the
Expand Down Expand Up @@ -1744,6 +1745,7 @@ srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPac

srs_error_t SrsMetaCache::update_ash(SrsSharedPtrMessage* msg)
{
srs_assert(msg->payload);
srs_freep(audio);
audio = msg->copy();
update_previous_ash();
Expand Down

0 comments on commit 7f95974

Please sign in to comment.