Skip to content

Commit

Permalink
Support Simulcast To RTMP (ossrs#2792)
Browse files Browse the repository at this point in the history
* add SimulcastBridgeAdapter, support simulcast to rtmp.
* shared rtmp packet for dispatch_audio.
  • Loading branch information
johzzy committed Nov 26, 2022
1 parent f75c177 commit ede0c0a
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 16 deletions.
60 changes: 48 additions & 12 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,51 @@ SrsRtcPublishStream::~SrsRtcPublishStream()
stat->on_disconnect(cid_.c_str());
}

#ifdef SRS_FFMPEG_FIT
inline SrsRtmpFromRtcBridge* create_bridge(SrsLiveSource *rtmp) {
return new SrsRtmpFromRtcBridge(rtmp);
}

extern SrsRtmpFromRtcBridge* create_bridge(std::vector<SrsRtcVideoRecvTrack*>& video_tracks);

srs_error_t do_bridge_for_simulcast(SrsRtmpFromRtcBridge *&bridge, SrsRequest* r, std::vector<SrsRtcVideoRecvTrack*>& video_tracks) {
srs_error_t err = srs_success;
bridge = create_bridge(video_tracks);
if ((err = bridge->initialize(r)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
}
return err;
}

srs_error_t do_bridge_for_single(SrsRtmpFromRtcBridge *&bridge, SrsRequest *r) {
srs_error_t err = srs_success;
SrsLiveSource *rtmp = NULL;
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
return srs_error_wrap(err, "create source");
}

// Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync,
// especially for stream merging.
rtmp->set_cache(false);

bridge = create_bridge(rtmp);
if ((err = bridge->initialize(r)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
}
return err;
}

srs_error_t do_bridge(SrsRtmpFromRtcBridge *&bridge, SrsRequest* r, std::vector<SrsRtcVideoRecvTrack*>& video_tracks) {
if (video_tracks.size() > 1) {
return do_bridge_for_simulcast(bridge, r, video_tracks);
} else {
return do_bridge_for_single(bridge, r);
}
}
#endif

srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescription* stream_desc)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -1218,18 +1263,9 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti
#if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT)
bool rtc_to_rtmp = _srs_config->get_rtc_to_rtmp(req_->vhost);
if (rtc_to_rtmp) {
if ((err = _srs_sources->fetch_or_create(r, _srs_hybrid->srs()->instance(), &rtmp)) != srs_success) {
return srs_error_wrap(err, "create source");
}

// Disable GOP cache for RTC2RTMP bridge, to keep the streams in sync,
// especially for stream merging.
rtmp->set_cache(false);

SrsRtmpFromRtcBridge *bridge = new SrsRtmpFromRtcBridge(rtmp);
if ((err = bridge->initialize(r)) != srs_success) {
srs_freep(bridge);
return srs_error_wrap(err, "create bridge");
SrsRtmpFromRtcBridge *bridge = NULL;
if ((err = do_bridge(bridge, r, video_tracks_)) != srs_success) {
return srs_error_wrap(err, "do_bridge");
}

source->set_bridge(bridge);
Expand Down
211 changes: 211 additions & 0 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1780,6 +1780,217 @@ bool SrsRtmpFromRtcBridge::check_frame_complete(const uint16_t start, const uint

return fu_s_c == fu_e_c;
}

extern srs_error_t do_bridge_for_single(SrsRtmpFromRtcBridge *&bridge, SrsRequest *r);

// NOTE: One simulcast(low,mid,high) stream to Multiple rtmp streams (rtmp://xxx_{low,mid,high})
std::string update_stream(size_t i, const SrsRtcTrackDescription* track_desc, const SrsRequest* rr) {
std::stringstream name;
name << rr->stream << '_';
const std::string& ext = track_desc->rid_.rid;
if (ext.empty()) {
name << i;
} else {
name << ext;
}
return name.str();
}

struct BridgeContext {
SrsRtcTrackDescription* track_desc;
SrsRequest* r;
SrsRtmpFromRtcBridge* bridge;

srs_error_t create_bridge(size_t i, SrsRequest* rr) {
srs_assert(!bridge);
srs_assert(!r);
srs_assert(rr);
r = rr->copy();
r->stream = update_stream(i, track_desc, rr);
srs_error_t err = do_bridge_for_single(bridge, r);
if (err != srs_success) {
return srs_error_wrap(err, "do_bridge_for_single");
}
err = bridge->on_publish();
if (err != srs_success) {
return srs_error_wrap(err, "bridge on publish");
}
return err;
}

void clear() {
if (bridge) {
bridge->on_unpublish();
srs_freep(bridge);
}
srs_freep(r);
}
};

class SimulcastBridgeAdapter: public SrsRtmpFromRtcBridge {
SrsRequest* r;
std::vector<BridgeContext> contexts_;
std::map<uint32_t, SrsRtmpFromRtcBridge*> bridges_;
std::vector<SrsLiveSource*> sources_;

srs_error_t dispatch_audio(SrsCommonMessage* rtmp) {
srs_error_t err = srs_success;
if (!rtmp->payload) {
srs_error("rtmp audio packet is nullptr, skip it");
return err;
}

SrsCommonMessage copy;
for (size_t i=0; i<sources_.size(); ++i) {
srs_assert(rtmp->payload);
SrsLiveSource* source = sources_.at(i);

// backup
// @see SrsSharedPtrMessage::create(SrsCommonMessage* msg)
copy.create_payload(rtmp->size);
copy.size = rtmp->size;
memcpy(copy.payload, rtmp->payload, rtmp->size);

if ((err = source->on_audio(rtmp)) != srs_success) {
err = srs_error_wrap(err, "source[%p] on audio", source);
break;
}

// restore
std::swap(rtmp->payload, copy.payload);
std::swap(rtmp->size, copy.size);
}
return err;
}

srs_error_t dispatch_video(SrsRtpPacket* pkt) {
srs_error_t err = srs_success;
uint32_t ssrc = pkt->header.get_ssrc();
if (bridges_.end() != bridges_.find(ssrc)) {
return bridges_[ssrc]->packet_video(pkt);
}

for (size_t i =0; i<contexts_.size(); i++) {
BridgeContext& ctx = contexts_.at(i);
if (!ctx.track_desc->has_ssrc(ssrc)) {
continue;
}
if (!ctx.bridge && (err = ctx.create_bridge(i, r)) != srs_success) {
return srs_error_wrap(err, "BridgeContext::create_bridge");
}
srs_assert(ctx.bridge);
bridges_[ssrc] = ctx.bridge;
SimulcastBridgeAdapter* b = (SimulcastBridgeAdapter*)ctx.bridge;
sources_.push_back(b->source_);
return ctx.bridge->packet_video(pkt);
}
return err;
}

public:
SimulcastBridgeAdapter(std::vector<SrsRtcVideoRecvTrack*>& video_tracks): SrsRtmpFromRtcBridge(NULL), r(NULL) {
for (size_t i=0; i<video_tracks.size(); ++i) {
SrsRtcVideoRecvTrack* track = video_tracks.at(i);
contexts_.push_back(BridgeContext{track->track_desc_, NULL, NULL});
}
}

srs_error_t transcode_audio(SrsRtpPacket *pkt) {
srs_error_t err = srs_success;

// to common message.
uint32_t ts = pkt->get_avsync_time();
if (is_first_audio) {
int header_len = 0;
uint8_t* header = NULL;
codec_->aac_codec_header(&header, &header_len);

SrsCommonMessage out_rtmp;
packet_aac(&out_rtmp, (char *)header, header_len, ts, is_first_audio);

if ((err = dispatch_audio(&out_rtmp)) != srs_success) {
return srs_error_wrap(err, "source on audio");
}

is_first_audio = false;
}

std::vector<SrsAudioFrame *> out_pkts;
SrsRtpRawPayload *payload = dynamic_cast<SrsRtpRawPayload *>(pkt->payload());

SrsAudioFrame frame;
frame.add_sample(payload->payload, payload->nn_payload);
frame.dts = ts;
frame.cts = 0;

err = codec_->transcode(&frame, out_pkts);
if (err != srs_success) {
return err;
}

for (std::vector<SrsAudioFrame *>::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) {
SrsCommonMessage out_rtmp;
out_rtmp.header.timestamp = (*it)->dts;
packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio);

if ((err = dispatch_audio(&out_rtmp)) != srs_success) {
err = srs_error_wrap(err, "source on audio");
break;
}
}
codec_->free_frames(out_pkts);

return err;
}

srs_error_t on_rtp(SrsRtpPacket *pkt) override {
srs_error_t err = srs_success;

if (!pkt->payload()) {
return err;
}

// Have no received any sender report, can't calculate avsync_time,
// discard it to avoid timestamp problem in live source
if (pkt->get_avsync_time() <= 0) {
return err;
}

if (pkt->is_audio()) {
err = transcode_audio(pkt);
} else {
err = dispatch_video(pkt);
}

return err;
}

void on_unpublish() override {
for (size_t i=0;i<contexts_.size(); ++i) {
BridgeContext& ctx = contexts_.at(i);
ctx.clear();
}
srs_freep(r);
bridges_.clear();
sources_.clear();
}

srs_error_t on_publish() override {
// note delay on publish.
return srs_success;
}

srs_error_t initialize(SrsRequest* rr) override {
// note: delay initialization.
r = rr->copy();
return SrsRtmpFromRtcBridge::initialize(rr);
}
};

SrsRtmpFromRtcBridge* create_bridge(std::vector<SrsRtcVideoRecvTrack*>& video_tracks) {
return new SimulcastBridgeAdapter(video_tracks);
}

#endif

SrsCodecPayload::SrsCodecPayload()
Expand Down
8 changes: 4 additions & 4 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class SrsRtcFromRtmpBridge : public ISrsLiveSourceBridge

class SrsRtmpFromRtcBridge : public ISrsRtcSourceBridge
{
private:
protected:
SrsLiveSource *source_;
SrsAudioTranscoder *codec_;
bool is_first_audio;
Expand All @@ -311,12 +311,12 @@ class SrsRtmpFromRtcBridge : public ISrsRtcSourceBridge
SrsRtmpFromRtcBridge(SrsLiveSource *src);
virtual ~SrsRtmpFromRtcBridge();
public:
srs_error_t initialize(SrsRequest* r);
virtual srs_error_t initialize(SrsRequest* r);
public:
virtual srs_error_t on_publish();
virtual srs_error_t on_rtp(SrsRtpPacket *pkt);
virtual void on_unpublish();
private:
public:
srs_error_t transcode_audio(SrsRtpPacket *pkt);
void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header);
srs_error_t packet_video(SrsRtpPacket* pkt);
Expand Down Expand Up @@ -512,7 +512,7 @@ class SrsRtcSourceDescription

class SrsRtcRecvTrack
{
protected:
public:
SrsRtcTrackDescription* track_desc_;
protected:
SrsRtcConnection* session_;
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow
{
srs_error_t err = srs_success;

srs_assert(msg->payload);
msgs.push_back(msg);

// If jitter is off, the timestamp of first sequence header is zero, which wll cause SRS to shrink and drop the
Expand Down Expand Up @@ -1747,6 +1748,7 @@ srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPac

srs_error_t SrsMetaCache::update_ash(SrsSharedPtrMessage* msg)
{
srs_assert(msg->payload);
srs_freep(audio);
audio = msg->copy();
update_previous_ash();
Expand Down

0 comments on commit ede0c0a

Please sign in to comment.