Skip to content

Commit

Permalink
Do not clone audio RTP packets... (#850)
Browse files Browse the repository at this point in the history
Only clone RTP packets when needed
  • Loading branch information
jmillan authored Jul 6, 2022
1 parent 41ff822 commit afd3bbb
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 79 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog


### NEXT

* Do not clone RTP packets if not needed (PR #850).


### 3.10.3

* `SimpleConsumer`: Fix. Only process Opus codec (PR #865).
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/Consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ namespace RTC
virtual uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) = 0;
virtual void ApplyLayers() = 0;
virtual uint32_t GetDesiredBitrate() const = 0;
virtual void SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet) = 0;
virtual std::vector<RTC::RtpStreamSend*> GetRtpStreams() = 0;
virtual void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket) = 0;
virtual std::vector<RTC::RtpStreamSend*> GetRtpStreams() = 0;
virtual void GetRtcp(
RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) = 0;
virtual void NeedWorstRemoteFractionLost(uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost) = 0;
Expand Down
2 changes: 1 addition & 1 deletion worker/include/RTC/PipeConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace RTC
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
uint32_t GetDesiredBitrate() const override;
void SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet) override;
void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket) override;
void GetRtcp(RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) override;
std::vector<RTC::RtpStreamSend*> GetRtpStreams() override
{
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/RtpStreamSend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ namespace RTC

void FillJsonStats(json& jsonObject) override;
void SetRtx(uint8_t payloadType, uint32_t ssrc) override;
bool ReceivePacket(std::shared_ptr<RTC::RtpPacket> packet);
bool ReceivePacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket);
void ReceiveKeyFrameRequest(RTC::RTCP::FeedbackPs::MessageType messageType);
void ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report);
Expand All @@ -90,7 +90,7 @@ namespace RTC
uint32_t GetLayerBitrate(uint64_t nowMs, uint8_t spatialLayer, uint8_t temporalLayer) override;

private:
void StorePacket(std::shared_ptr<RTC::RtpPacket> packet);
void StorePacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket);
void ClearOldPackets(const RtpPacket* packet);
void ClearBuffer();
void FillRetransmissionContainer(uint16_t seq, uint16_t bitmask);
Expand Down
2 changes: 1 addition & 1 deletion worker/include/RTC/SimpleConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace RTC
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
uint32_t GetDesiredBitrate() const override;
void SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet) override;
void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket) override;
std::vector<RTC::RtpStreamSend*> GetRtpStreams() override
{
return this->rtpStreams;
Expand Down
2 changes: 1 addition & 1 deletion worker/include/RTC/SimulcastConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ namespace RTC
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
uint32_t GetDesiredBitrate() const override;
void SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet) override;
void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket) override;
void GetRtcp(RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) override;
std::vector<RTC::RtpStreamSend*> GetRtpStreams() override
{
Expand Down
2 changes: 1 addition & 1 deletion worker/include/RTC/SvcConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace RTC
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
uint32_t GetDesiredBitrate() const override;
void SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet) override;
void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket) override;
void GetRtcp(RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) override;
std::vector<RTC::RtpStreamSend*> GetRtpStreams() override
{
Expand Down
8 changes: 4 additions & 4 deletions worker/src/RTC/PipeConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ namespace RTC
return 0u;
}

void PipeConsumer::SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet)
void PipeConsumer::SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

Expand Down Expand Up @@ -253,13 +253,13 @@ namespace RTC
}

// Process the packet.
if (rtpStream->ReceivePacket(packet))
if (rtpStream->ReceivePacket(packet, sharedPacket))
{
// Send the packet.
this->listener->OnConsumerSendRtpPacket(this, packet.get());
this->listener->OnConsumerSendRtpPacket(this, packet);

// May emit 'trace' event.
EmitTraceEventRtpAndKeyFrameTypes(packet.get());
EmitTraceEventRtpAndKeyFrameTypes(packet);
}
else
{
Expand Down
9 changes: 5 additions & 4 deletions worker/src/RTC/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,9 +798,10 @@ namespace RTC

if (!consumers.empty())
{
// Clone the packet so it holds its own buffer, usable for future
// retransmissions.
std::shared_ptr<RTC::RtpPacket> sharedPacket(packet->Clone());
// Cloned ref-counted packet that RtpStreamSend will store for as long as
// needed avoiding multiple allocations unless absolutely necessary.
// Clone only happens if needed.
std::shared_ptr<RTC::RtpPacket> sharedPacket;

for (auto* consumer : consumers)
{
Expand All @@ -810,7 +811,7 @@ namespace RTC
if (!mid.empty())
packet->UpdateMid(mid);

consumer->SendRtpPacket(sharedPacket);
consumer->SendRtpPacket(packet, sharedPacket);
}
}

Expand Down
20 changes: 13 additions & 7 deletions worker/src/RTC/RtpStreamSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,20 @@ namespace RTC
this->rtxSeq = Utils::Crypto::GetRandomUInt(0u, 0xFFFF);
}

bool RtpStreamSend::ReceivePacket(std::shared_ptr<RTC::RtpPacket> packet)
bool RtpStreamSend::ReceivePacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

// Call the parent method.
if (!RtpStream::ReceiveStreamPacket(packet.get()))
if (!RtpStream::ReceiveStreamPacket(packet))
return false;

// If NACK is enabled, store the packet into the buffer.
if (this->params.useNack)
StorePacket(packet);
StorePacket(packet, sharedPacket);

// Increase transmission counter.
this->transmissionCounter.Update(packet.get());
this->transmissionCounter.Update(packet);

return true;
}
Expand Down Expand Up @@ -433,7 +433,7 @@ namespace RTC
MS_ABORT("invalid method call");
}

void RtpStreamSend::StorePacket(std::shared_ptr<RTC::RtpPacket> packet)
void RtpStreamSend::StorePacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

Expand All @@ -452,7 +452,7 @@ namespace RTC
return;
}

this->ClearOldPackets(packet.get());
this->ClearOldPackets(packet);

auto seq = packet->GetSequenceNumber();
auto* storageItem = this->storageItemBuffer.Get(seq);
Expand All @@ -476,8 +476,14 @@ namespace RTC
this->storageItemBuffer.Insert(seq, storageItem);
}

// Only clone once and only if necessary.
if (!sharedPacket.get())
{
sharedPacket.reset(packet->Clone());
}

// Store original packet and some extra info into the storage item.
storageItem->packet = packet;
storageItem->packet = sharedPacket;
storageItem->ssrc = packet->GetSsrc();
storageItem->sequenceNumber = packet->GetSequenceNumber();
storageItem->timestamp = packet->GetTimestamp();
Expand Down
8 changes: 4 additions & 4 deletions worker/src/RTC/SimpleConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ namespace RTC
return desiredBitrate;
}

void SimpleConsumer::SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet)
void SimpleConsumer::SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

Expand Down Expand Up @@ -328,13 +328,13 @@ namespace RTC
}

// Process the packet.
if (this->rtpStream->ReceivePacket(packet))
if (this->rtpStream->ReceivePacket(packet, sharedPacket))
{
// Send the packet.
this->listener->OnConsumerSendRtpPacket(this, packet.get());
this->listener->OnConsumerSendRtpPacket(this, packet);

// May emit 'trace' event.
EmitTraceEventRtpAndKeyFrameTypes(packet.get());
EmitTraceEventRtpAndKeyFrameTypes(packet);
}
else
{
Expand Down
9 changes: 5 additions & 4 deletions worker/src/RTC/SimulcastConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ namespace RTC
return desiredBitrate;
}

void SimulcastConsumer::SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet)
void SimulcastConsumer::SendRtpPacket(
RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

Expand Down Expand Up @@ -912,16 +913,16 @@ namespace RTC
}

// Process the packet.
if (this->rtpStream->ReceivePacket(packet))
if (this->rtpStream->ReceivePacket(packet, sharedPacket))
{
if (this->rtpSeqManager.GetMaxOutput() == packet->GetSequenceNumber())
this->lastSentPacketHasMarker = packet->HasMarker();

// Send the packet.
this->listener->OnConsumerSendRtpPacket(this, packet.get());
this->listener->OnConsumerSendRtpPacket(this, packet);

// May emit 'trace' event.
EmitTraceEventRtpAndKeyFrameTypes(packet.get());
EmitTraceEventRtpAndKeyFrameTypes(packet);
}
else
{
Expand Down
8 changes: 4 additions & 4 deletions worker/src/RTC/SvcConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ namespace RTC
return desiredBitrate;
}

void SvcConsumer::SendRtpPacket(std::shared_ptr<RTC::RtpPacket> packet)
void SvcConsumer::SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();

Expand Down Expand Up @@ -633,13 +633,13 @@ namespace RTC
}

// Process the packet.
if (this->rtpStream->ReceivePacket(packet))
if (this->rtpStream->ReceivePacket(packet, sharedPacket))
{
// Send the packet.
this->listener->OnConsumerSendRtpPacket(this, packet.get());
this->listener->OnConsumerSendRtpPacket(this, packet);

// May emit 'trace' event.
EmitTraceEventRtpAndKeyFrameTypes(packet.get());
EmitTraceEventRtpAndKeyFrameTypes(packet);
}
else
{
Expand Down
Loading

0 comments on commit afd3bbb

Please sign in to comment.