diff --git a/worker/include/RTC/Consumer.hpp b/worker/include/RTC/Consumer.hpp index 975d1dfcbf..34d1935283 100644 --- a/worker/include/RTC/Consumer.hpp +++ b/worker/include/RTC/Consumer.hpp @@ -143,9 +143,8 @@ namespace RTC virtual void ApplyLayers() = 0; virtual uint32_t GetDesiredBitrate() const = 0; virtual void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr& sharedPacket) = 0; - virtual const std::vector& GetRtpStreams() const = 0; - virtual void GetRtcp( - RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) = 0; + virtual bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) = 0; + virtual const std::vector& GetRtpStreams() const = 0; virtual void NeedWorstRemoteFractionLost(uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost) = 0; virtual void ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) = 0; virtual void ReceiveKeyFrameRequest( diff --git a/worker/include/RTC/PipeConsumer.hpp b/worker/include/RTC/PipeConsumer.hpp index 2250d81ee2..5452608cb6 100644 --- a/worker/include/RTC/PipeConsumer.hpp +++ b/worker/include/RTC/PipeConsumer.hpp @@ -30,7 +30,7 @@ namespace RTC void ApplyLayers() override; uint32_t GetDesiredBitrate() const override; void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr& sharedPacket) override; - void GetRtcp(RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) override; + bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) override; const std::vector& GetRtpStreams() const override { return this->rtpStreams; diff --git a/worker/include/RTC/Producer.hpp b/worker/include/RTC/Producer.hpp index 9dcb1f6a56..d3be43bda9 100644 --- a/worker/include/RTC/Producer.hpp +++ b/worker/include/RTC/Producer.hpp @@ -129,7 +129,7 @@ namespace RTC ReceiveRtpPacketResult ReceiveRtpPacket(RTC::RtpPacket* packet); void ReceiveRtcpSenderReport(RTC::RTCP::SenderReport* report); void ReceiveRtcpXrDelaySinceLastRr(RTC::RTCP::DelaySinceLastRr::SsrcInfo* ssrcInfo); - void GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs); + bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs); void RequestKeyFrame(uint32_t mappedSsrc); /* Methods inherited from Channel::ChannelSocket::RequestHandler. */ diff --git a/worker/include/RTC/RTCP/CompoundPacket.hpp b/worker/include/RTC/RTCP/CompoundPacket.hpp index 94bc0b953a..e38e019bc7 100644 --- a/worker/include/RTC/RTCP/CompoundPacket.hpp +++ b/worker/include/RTC/RTCP/CompoundPacket.hpp @@ -7,6 +7,7 @@ #include "RTC/RTCP/SenderReport.hpp" #include "RTC/RTCP/XrDelaySinceLastRr.hpp" #include "RTC/RTCP/XrReceiverReferenceTime.hpp" +#include "RTC/RtpPacket.hpp" // MtuSize. #include namespace RTC @@ -15,6 +16,13 @@ namespace RTC { class CompoundPacket { + public: + // Maximum size for a CompundPacket, leaving free space for encryption. + // 144 is the maximum number of octects that will be added to an RTP packet + // by srtp_protect(). + // srtp.h: SRTP_MAX_TRAILER_LEN (SRTP_MAX_TAG_LEN + SRTP_MAX_MKI_LEN) + constexpr static size_t MaxSize{ RTC::MtuSize - 144u }; + public: CompoundPacket() = default; @@ -23,10 +31,7 @@ namespace RTC { return this->header; } - size_t GetSize() const - { - return this->size; - } + size_t GetSize(); size_t GetSenderReportCount() const { return this->senderReportPacket.GetCount(); @@ -36,6 +41,22 @@ namespace RTC return this->receiverReportPacket.GetCount(); } void Dump(); + // RTCP additions per Consumer (non pipe). + // Adds the given data and returns true if there is enough space to hold it, + // false otherwise. + bool Add( + SenderReport* senderReport, SdesChunk* sdesChunk, DelaySinceLastRr* delaySinceLastRrReport); + // RTCP additions per Consumer (pipe). + // Adds the given data and returns true if there is enough space to hold it, + // false otherwise. + bool Add( + std::vector& senderReports, + std::vector& sdesChunks, + std::vector& delaySinceLastRrReports); + // RTCP additions per Producer. + // Adds the given data and returns true if there is enough space to hold it, + // false otherwise. + bool Add(std::vector&, ReceiverReferenceTime*); void AddSenderReport(SenderReport* report); void AddReceiverReport(ReceiverReport* report); void AddSdesChunk(SdesChunk* chunk); @@ -57,7 +78,6 @@ namespace RTC private: uint8_t* header{ nullptr }; - size_t size{ 0 }; SenderReportPacket senderReportPacket; ReceiverReportPacket receiverReportPacket; SdesPacket sdesPacket; diff --git a/worker/include/RTC/RTCP/ReceiverReport.hpp b/worker/include/RTC/RTCP/ReceiverReport.hpp index f681cd30c3..26f10394ff 100644 --- a/worker/include/RTC/RTCP/ReceiverReport.hpp +++ b/worker/include/RTC/RTCP/ReceiverReport.hpp @@ -131,6 +131,8 @@ namespace RTC class ReceiverReportPacket : public Packet { public: + static size_t MaxReportsPerPacket; + using Iterator = std::vector::iterator; public: @@ -163,6 +165,13 @@ namespace RTC { this->reports.push_back(report); } + void RemoveReport(ReceiverReport* report) + { + auto it = std::find(this->reports.begin(), this->reports.end(), report); + + if (it != this->reports.end()) + this->reports.erase(it); + } Iterator Begin() { return this->reports.begin(); @@ -190,12 +199,12 @@ namespace RTC } size_t GetSize() const override { - size_t size = Packet::CommonHeaderSize + 4u /* this->ssrc */; - - for (auto* report : reports) - { - size += report->GetSize(); - } + // A serialized packet can contain a maximum of 31 reports. + // If number of reports exceeds 31 then the required number of packets + // will be serialized which will take the size calculated below. + size_t size = (Packet::CommonHeaderSize + 4u /* this->ssrc */) * + ((this->GetCount() / MaxReportsPerPacket) + 1); + size += ReceiverReport::HeaderSize * this->GetCount(); return size; } diff --git a/worker/include/RTC/RTCP/Sdes.hpp b/worker/include/RTC/RTCP/Sdes.hpp index 79a021a41f..eb7750f980 100644 --- a/worker/include/RTC/RTCP/Sdes.hpp +++ b/worker/include/RTC/RTCP/Sdes.hpp @@ -154,6 +154,8 @@ namespace RTC class SdesPacket : public Packet { public: + static size_t MaxChunksPerPacket; + using Iterator = std::vector::iterator; public: @@ -178,6 +180,13 @@ namespace RTC { this->chunks.push_back(chunk); } + void RemoveChunk(SdesChunk* chunk) + { + auto it = std::find(this->chunks.begin(), this->chunks.end(), chunk); + + if (it != this->chunks.end()) + this->chunks.erase(it); + } Iterator Begin() { return this->chunks.begin(); @@ -197,7 +206,10 @@ namespace RTC } size_t GetSize() const override { - size_t size = Packet::CommonHeaderSize; + // A serialized packet can contain a maximum of 31 chunks. + // If number of chunks exceeds 31 then the required number of packets + // will be serialized which will take the size calculated below. + size_t size = Packet::CommonHeaderSize * ((this->GetCount() / MaxChunksPerPacket) + 1); for (auto* chunk : this->chunks) { diff --git a/worker/include/RTC/RTCP/SenderReport.hpp b/worker/include/RTC/RTCP/SenderReport.hpp index 3dd7419e91..80b2fcc38a 100644 --- a/worker/include/RTC/RTCP/SenderReport.hpp +++ b/worker/include/RTC/RTCP/SenderReport.hpp @@ -128,6 +128,13 @@ namespace RTC { this->reports.push_back(report); } + void RemoveReport(SenderReport* report) + { + auto it = std::find(this->reports.begin(), this->reports.end(), report); + + if (it != this->reports.end()) + this->reports.erase(it); + } Iterator Begin() { return this->reports.begin(); @@ -143,14 +150,17 @@ namespace RTC size_t Serialize(uint8_t* buffer) override; size_t GetCount() const override { - return 0; + return this->reports.size(); } size_t GetSize() const override { - size_t size = Packet::CommonHeaderSize; + // A serialized packet consists of a series of SR packets with + // one SR report each. + size_t size{ 0 }; for (auto* report : this->reports) { + size += Packet::CommonHeaderSize; size += report->GetSize(); } diff --git a/worker/include/RTC/RTCP/XR.hpp b/worker/include/RTC/RTCP/XR.hpp index 27a49b54e8..aadc54aff8 100644 --- a/worker/include/RTC/RTCP/XR.hpp +++ b/worker/include/RTC/RTCP/XR.hpp @@ -102,6 +102,13 @@ namespace RTC { this->reports.push_back(report); } + void RemoveReport(ExtendedReportBlock* report) + { + auto it = std::find(this->reports.begin(), this->reports.end(), report); + + if (it != this->reports.end()) + this->reports.erase(it); + } uint32_t GetSsrc() const { return this->ssrc; diff --git a/worker/include/RTC/SimpleConsumer.hpp b/worker/include/RTC/SimpleConsumer.hpp index 8e05ac1ef5..60a0df567b 100644 --- a/worker/include/RTC/SimpleConsumer.hpp +++ b/worker/include/RTC/SimpleConsumer.hpp @@ -44,7 +44,7 @@ namespace RTC { return this->rtpStreams; } - void GetRtcp(RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) override; + bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) override; void NeedWorstRemoteFractionLost(uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost) override; void ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket) override; void ReceiveKeyFrameRequest(RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc) override; diff --git a/worker/include/RTC/SimulcastConsumer.hpp b/worker/include/RTC/SimulcastConsumer.hpp index 7702e2b7b2..406442be77 100644 --- a/worker/include/RTC/SimulcastConsumer.hpp +++ b/worker/include/RTC/SimulcastConsumer.hpp @@ -56,7 +56,7 @@ namespace RTC void ApplyLayers() override; uint32_t GetDesiredBitrate() const override; void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr& sharedPacket) override; - void GetRtcp(RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) override; + bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) override; const std::vector& GetRtpStreams() const override { return this->rtpStreams; diff --git a/worker/include/RTC/SvcConsumer.hpp b/worker/include/RTC/SvcConsumer.hpp index d44c902332..34baba4e51 100644 --- a/worker/include/RTC/SvcConsumer.hpp +++ b/worker/include/RTC/SvcConsumer.hpp @@ -51,7 +51,7 @@ namespace RTC void ApplyLayers() override; uint32_t GetDesiredBitrate() const override; void SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr& sharedPacket) override; - void GetRtcp(RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) override; + bool GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) override; const std::vector& GetRtpStreams() const override { return this->rtpStreams; diff --git a/worker/src/RTC/DirectTransport.cpp b/worker/src/RTC/DirectTransport.cpp index 7a8a027bc8..9be530fe91 100644 --- a/worker/src/RTC/DirectTransport.cpp +++ b/worker/src/RTC/DirectTransport.cpp @@ -156,6 +156,8 @@ namespace RTC { MS_TRACE(); + packet->Serialize(RTC::RTCP::Buffer); + const uint8_t* data = packet->GetData(); size_t len = packet->GetSize(); diff --git a/worker/src/RTC/PipeConsumer.cpp b/worker/src/RTC/PipeConsumer.cpp index ecfef30d4c..12738a88cf 100644 --- a/worker/src/RTC/PipeConsumer.cpp +++ b/worker/src/RTC/PipeConsumer.cpp @@ -289,15 +289,10 @@ namespace RTC packet->SetSequenceNumber(origSeq); } - void PipeConsumer::GetRtcp( - RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) + bool PipeConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) { MS_TRACE(); - MS_ASSERT( - std::find(this->rtpStreams.begin(), this->rtpStreams.end(), rtpStream) != this->rtpStreams.end(), - "RTP stream does exist"); - // Special condition for PipeConsumer since this method will be called in a loop for // each stream in this PipeConsumer. // clang-format off @@ -307,32 +302,44 @@ namespace RTC ) // clang-format on { - return; + return true; } - auto* report = rtpStream->GetRtcpSenderReport(nowMs); + std::vector senderReports; + std::vector sdesChunks; + std::vector xrReports; - if (!report) - return; + for (auto* rtpStream : this->rtpStreams) + { + auto* report = rtpStream->GetRtcpSenderReport(nowMs); - packet->AddSenderReport(report); + if (!report) + continue; - // Build SDES chunk for this sender. - auto* sdesChunk = rtpStream->GetRtcpSdesChunk(); + senderReports.push_back(report); - packet->AddSdesChunk(sdesChunk); + // Build SDES chunk for this sender. + auto* sdesChunk = rtpStream->GetRtcpSdesChunk(); + sdesChunks.push_back(sdesChunk); - auto* dlrr = rtpStream->GetRtcpXrDelaySinceLastRr(nowMs); + auto* dlrr = rtpStream->GetRtcpXrDelaySinceLastRr(nowMs); - if (dlrr) - { - auto* report = new RTC::RTCP::DelaySinceLastRr(); + if (dlrr) + { + auto* report = new RTC::RTCP::DelaySinceLastRr(); + report->AddSsrcInfo(dlrr); - report->AddSsrcInfo(dlrr); - packet->AddDelaySinceLastRr(report); + xrReports.push_back(report); + } } + // RTCP Compound packet buffer cannot hold the data. + if (!packet->Add(senderReports, sdesChunks, xrReports)) + return false; + this->lastRtcpSentTime = nowMs; + + return true; } void PipeConsumer::NeedWorstRemoteFractionLost(uint32_t /*mappedSsrc*/, uint8_t& worstRemoteFractionLost) diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index f1db0e4e19..0092e001bd 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -509,6 +509,8 @@ namespace RTC if (!IsConnected()) return; + packet->Serialize(RTC::RTCP::Buffer); + const uint8_t* data = packet->GetData(); auto intLen = static_cast(packet->GetSize()); diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index ffe427209c..e020d21ab3 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -780,6 +780,8 @@ namespace RTC if (!IsConnected()) return; + packet->Serialize(RTC::RTCP::Buffer); + const uint8_t* data = packet->GetData(); auto intLen = static_cast(packet->GetSize()); diff --git a/worker/src/RTC/Producer.cpp b/worker/src/RTC/Producer.cpp index 4dc92a5607..793b9912f7 100644 --- a/worker/src/RTC/Producer.cpp +++ b/worker/src/RTC/Producer.cpp @@ -825,38 +825,46 @@ namespace RTC rtpStream->ReceiveRtcpXrDelaySinceLastRr(ssrcInfo); } - void Producer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) + bool Producer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) { MS_TRACE(); if (static_cast((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval) - return; + return true; + + std::vector receiverReports; + RTCP::ReceiverReferenceTime* receiverReferenceTimeReport{ nullptr }; for (auto& kv : this->mapSsrcRtpStream) { auto* rtpStream = kv.second; auto* report = rtpStream->GetRtcpReceiverReport(); - packet->AddReceiverReport(report); + receiverReports.push_back(report); auto* rtxReport = rtpStream->GetRtxRtcpReceiverReport(); if (rtxReport) - packet->AddReceiverReport(rtxReport); + receiverReports.push_back(rtxReport); } // Add a receiver reference time report if no present in the packet. if (!packet->HasReceiverReferenceTime()) { - auto ntp = Utils::Time::TimeMs2Ntp(nowMs); - auto* report = new RTC::RTCP::ReceiverReferenceTime(); + auto ntp = Utils::Time::TimeMs2Ntp(nowMs); + receiverReferenceTimeReport = new RTC::RTCP::ReceiverReferenceTime(); - report->SetNtpSec(ntp.seconds); - report->SetNtpFrac(ntp.fractions); - packet->AddReceiverReferenceTime(report); + receiverReferenceTimeReport->SetNtpSec(ntp.seconds); + receiverReferenceTimeReport->SetNtpFrac(ntp.fractions); } + // RTCP Compound packet buffer cannot hold the data. + if (!packet->Add(receiverReports, receiverReferenceTimeReport)) + return false; + this->lastRtcpSentTime = nowMs; + + return true; } void Producer::RequestKeyFrame(uint32_t mappedSsrc) diff --git a/worker/src/RTC/RTCP/CompoundPacket.cpp b/worker/src/RTC/RTCP/CompoundPacket.cpp index 1fff255ade..ff1b875373 100644 --- a/worker/src/RTC/RTCP/CompoundPacket.cpp +++ b/worker/src/RTC/RTCP/CompoundPacket.cpp @@ -10,83 +10,180 @@ namespace RTC { /* Instance methods. */ + size_t CompoundPacket::GetSize() + { + size_t size{ 0 }; + + if (this->senderReportPacket.GetCount() > 0u) + { + size += this->senderReportPacket.GetSize(); + } + if (this->receiverReportPacket.GetCount() > 0u) + { + size += this->receiverReportPacket.GetSize(); + } + + if (this->sdesPacket.GetCount() > 0u) + size += this->sdesPacket.GetSize(); + + if (this->xrPacket.Begin() != this->xrPacket.End()) + size += this->xrPacket.GetSize(); + + return size; + } + void CompoundPacket::Serialize(uint8_t* data) { MS_TRACE(); this->header = data; - // Calculate the total required size for the entire message. - if (HasSenderReport()) - { - this->size = this->senderReportPacket.GetSize(); + // Fill it. + size_t offset{ 0 }; - if (this->receiverReportPacket.GetCount() != 0u) - { - this->size += ReceiverReport::HeaderSize * this->receiverReportPacket.GetCount(); - } + MS_ASSERT( + this->senderReportPacket.GetCount() > 0u || this->receiverReportPacket.GetCount() > 0u, + "no Sender or Receiver report present"); + + if (this->senderReportPacket.GetCount() > 0u) + { + offset += this->senderReportPacket.Serialize(this->header); } - // If no sender nor receiver reports are present send an empty Receiver Report - // packet as the head of the compound packet. - else + + if (this->receiverReportPacket.GetCount() > 0u) { - this->size = this->receiverReportPacket.GetSize(); + offset += this->receiverReportPacket.Serialize(this->header + offset); } - if (this->sdesPacket.GetCount() != 0u) - this->size += this->sdesPacket.GetSize(); + if (this->sdesPacket.GetCount() > 0u) + { + offset += this->sdesPacket.Serialize(this->header + offset); + } if (this->xrPacket.Begin() != this->xrPacket.End()) - this->size += this->xrPacket.GetSize(); + { + offset += this->xrPacket.Serialize(this->header + offset); + } + } - // Fill it. - size_t offset{ 0 }; + bool CompoundPacket::Add( + SenderReport* senderReport, SdesChunk* sdesChunk, DelaySinceLastRr* delaySinceLastRrReport) + { + // Add the items into the packet. - if (HasSenderReport()) + if (senderReport) + this->senderReportPacket.AddReport(senderReport); + + if (sdesChunk) + this->sdesPacket.AddChunk(sdesChunk); + + if (delaySinceLastRrReport) + this->xrPacket.AddReport(delaySinceLastRrReport); + + // New items can hold in the packet, report it. + if (GetSize() <= MaxSize) + return true; + + // New items can not hold in the packet, remove them, + // delete and report it. + + if (senderReport) { - this->senderReportPacket.Serialize(this->header); - offset = this->senderReportPacket.GetSize(); + this->senderReportPacket.RemoveReport(senderReport); + delete senderReport; + } - // Fix header count field. - auto* header = reinterpret_cast(this->header); + if (sdesChunk) + { + this->sdesPacket.RemoveChunk(sdesChunk); + delete sdesChunk; + } - header->count = 0; + if (delaySinceLastRrReport) + { + this->xrPacket.RemoveReport(delaySinceLastRrReport); + delete delaySinceLastRrReport; + } - if (this->receiverReportPacket.GetCount() != 0u) - { - // Fix header length field. - size_t length = - ((SenderReport::HeaderSize + - (ReceiverReport::HeaderSize * this->receiverReportPacket.GetCount())) / - 4); + return false; + } - header->length = uint16_t{ htons(length) }; + bool CompoundPacket::Add( + std::vector& senderReports, + std::vector& sdesChunks, + std::vector& delaySinceLastRrReports) + { + // Add the items into the packet. + + for (auto* report : senderReports) + this->senderReportPacket.AddReport(report); + + for (auto* chunk : sdesChunks) + this->sdesPacket.AddChunk(chunk); - // Fix header count field. - header->count = this->receiverReportPacket.GetCount(); + for (auto* report : delaySinceLastRrReports) + this->xrPacket.AddReport(report); - auto it = this->receiverReportPacket.Begin(); + // New items can hold in the packet, report it. + if (GetSize() <= MaxSize) + return true; - for (; it != this->receiverReportPacket.End(); ++it) - { - ReceiverReport* report = (*it); + // New items can not hold in the packet, remove them, + // delete and report it. - report->Serialize(this->header + offset); - offset += ReceiverReport::HeaderSize; - } - } + for (auto* report : senderReports) + { + this->senderReportPacket.RemoveReport(report); + delete report; } - else + + for (auto* chunk : sdesChunks) { - this->receiverReportPacket.Serialize(this->header); - offset = this->receiverReportPacket.GetSize(); + this->sdesPacket.RemoveChunk(chunk); + delete chunk; } - if (this->sdesPacket.GetCount() != 0u) - offset += this->sdesPacket.Serialize(this->header + offset); + for (auto* report : delaySinceLastRrReports) + { + this->xrPacket.RemoveReport(report); + delete report; + } - if (this->xrPacket.Begin() != this->xrPacket.End()) - this->xrPacket.Serialize(this->header + offset); + return false; + } + + bool CompoundPacket::Add( + std::vector& receiverReports, + ReceiverReferenceTime* receiverReferenceTimeReport) + { + // Add the items into the packet. + + for (auto* report : receiverReports) + this->receiverReportPacket.AddReport(report); + + if (receiverReferenceTimeReport) + this->xrPacket.AddReport(receiverReferenceTimeReport); + + // New items can hold in the packet, report it. + if (GetSize() <= MaxSize) + return true; + + // New items can not hold in the packet, remove them, + // delete and report it. + + for (auto* report : receiverReports) + { + this->receiverReportPacket.RemoveReport(report); + delete report; + } + + if (receiverReferenceTimeReport) + { + this->xrPacket.RemoveReport(receiverReferenceTimeReport); + delete receiverReferenceTimeReport; + } + + return false; } void CompoundPacket::Dump() @@ -120,8 +217,6 @@ namespace RTC { MS_TRACE(); - MS_ASSERT(!HasSenderReport(), "a Sender Report is already present"); - this->senderReportPacket.AddReport(report); } diff --git a/worker/src/RTC/RTCP/ReceiverReport.cpp b/worker/src/RTC/RTCP/ReceiverReport.cpp index 3394f381b8..761f6dcd49 100644 --- a/worker/src/RTC/RTCP/ReceiverReport.cpp +++ b/worker/src/RTC/RTCP/ReceiverReport.cpp @@ -57,6 +57,10 @@ namespace RTC return HeaderSize; } + /* Static Class members */ + + size_t ReceiverReportPacket::MaxReportsPerPacket = 31; + /* Class methods. */ /** @@ -116,16 +120,35 @@ namespace RTC { MS_TRACE(); - size_t offset = Packet::Serialize(buffer); - - // Copy the SSRC. - Utils::Byte::Set4Bytes(buffer, Packet::CommonHeaderSize, this->ssrc); - offset += 4u; + size_t offset = 0; + uint8_t* header = { nullptr }; - // Serialize reports. - for (auto* report : this->reports) + for (size_t i = 0; i < this->GetCount(); i++) { - offset += report->Serialize(buffer + offset); + // Create a new RR packet header for each 31 reports. + if (i % MaxReportsPerPacket == 0) + { + // Reference current common header. + header = buffer + offset; + offset += Packet::Serialize(buffer + offset); + + // Copy the SSRC. + Utils::Byte::Set4Bytes(header, Packet::CommonHeaderSize, this->ssrc); + offset += 4u; + } + + // Serialize next report. + offset += this->reports[i]->Serialize(buffer + offset); + + // Adjust the header count field. + reinterpret_cast(header)->count = + static_cast((i % MaxReportsPerPacket) + 1); + + // Adjust the header length field. + size_t length = (Packet::CommonHeaderSize + 4u /* this->ssrc */); + length += ReceiverReport::HeaderSize * ((i % MaxReportsPerPacket) + 1); + + reinterpret_cast(header)->length = uint16_t{ htons((length / 4) - 1) }; } return offset; diff --git a/worker/src/RTC/RTCP/Sdes.cpp b/worker/src/RTC/RTCP/Sdes.cpp index a31cd64965..574df0b4dc 100644 --- a/worker/src/RTC/RTCP/Sdes.cpp +++ b/worker/src/RTC/RTCP/Sdes.cpp @@ -179,6 +179,10 @@ namespace RTC MS_DUMP(""); } + /* Static Class members */ + + size_t SdesPacket::MaxChunksPerPacket = 31; + /* Class methods. */ SdesPacket* SdesPacket::Parse(const uint8_t* data, size_t len) @@ -215,11 +219,34 @@ namespace RTC { MS_TRACE(); - size_t offset = Packet::Serialize(buffer); + size_t offset = 0; + size_t length = 0; + uint8_t* header = { nullptr }; - for (auto* chunk : this->chunks) + for (size_t i = 0; i < this->GetCount(); i++) { - offset += chunk->Serialize(buffer + offset); + // Create a new SDES packet header for each 31 chunks. + if (i % MaxChunksPerPacket == 0) + { + // Reference current common header. + header = buffer + offset; + offset += Packet::Serialize(buffer + offset); + + length = Packet::CommonHeaderSize; + } + + // Serialize the next chunk. + auto chunkSize = chunks[i]->Serialize(buffer + offset); + + offset += chunkSize; + length += chunkSize; + + // Adjust the header count field. + reinterpret_cast(header)->count = + static_cast((i % MaxChunksPerPacket) + 1); + + // Adjust the header length field. + reinterpret_cast(header)->length = uint16_t{ htons((length / 4) - 1) }; } return offset; diff --git a/worker/src/RTC/RTCP/SenderReport.cpp b/worker/src/RTC/RTCP/SenderReport.cpp index 1b40faf95d..a3d2171588 100644 --- a/worker/src/RTC/RTCP/SenderReport.cpp +++ b/worker/src/RTC/RTCP/SenderReport.cpp @@ -81,14 +81,26 @@ namespace RTC { MS_TRACE(); - MS_ASSERT(this->reports.size() == 1, "invalid number of sender reports"); + size_t offset{ 0 }; + uint8_t* header = { nullptr }; - size_t offset = Packet::Serialize(buffer); - - // Serialize reports. + // Serialize packets (common header + 1 report) each. for (auto* report : this->reports) { + // Reference current common header. + header = buffer + offset; + + offset += Packet::Serialize(buffer + offset); offset += report->Serialize(buffer + offset); + + // Adjust the header count field. + reinterpret_cast(header)->count = 0; + + // Adjust the header length field. + size_t length = Packet::CommonHeaderSize; + length += SenderReport::HeaderSize; + + reinterpret_cast(header)->length = uint16_t{ htons((length / 4) - 1) }; } return offset; diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index ded781e1c6..c8c7c9de72 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -363,39 +363,38 @@ namespace RTC packet->SetSequenceNumber(origSeq); } - void SimpleConsumer::GetRtcp( - RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) + bool SimpleConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) { MS_TRACE(); - MS_ASSERT(rtpStream == this->rtpStream, "RTP stream does not match"); - if (static_cast((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval) - return; + return true; - auto* report = this->rtpStream->GetRtcpSenderReport(nowMs); - - if (!report) - return; + auto* senderReport = this->rtpStream->GetRtcpSenderReport(nowMs); - packet->AddSenderReport(report); + if (!senderReport) + return true; // Build SDES chunk for this sender. auto* sdesChunk = this->rtpStream->GetRtcpSdesChunk(); - packet->AddSdesChunk(sdesChunk); + RTC::RTCP::DelaySinceLastRr* delaySinceLastRrReport{ nullptr }; auto* dlrr = this->rtpStream->GetRtcpXrDelaySinceLastRr(nowMs); if (dlrr) { - auto* report = new RTC::RTCP::DelaySinceLastRr(); - - report->AddSsrcInfo(dlrr); - packet->AddDelaySinceLastRr(report); + delaySinceLastRrReport = new RTC::RTCP::DelaySinceLastRr(); + delaySinceLastRrReport->AddSsrcInfo(dlrr); } + // RTCP Compound packet buffer cannot hold the data. + if (!packet->Add(senderReport, sdesChunk, delaySinceLastRrReport)) + return false; + this->lastRtcpSentTime = nowMs; + + return true; } void SimpleConsumer::NeedWorstRemoteFractionLost( diff --git a/worker/src/RTC/SimulcastConsumer.cpp b/worker/src/RTC/SimulcastConsumer.cpp index 15b5b79fb0..be37da5bba 100644 --- a/worker/src/RTC/SimulcastConsumer.cpp +++ b/worker/src/RTC/SimulcastConsumer.cpp @@ -957,39 +957,38 @@ namespace RTC packet->RestorePayload(); } - void SimulcastConsumer::GetRtcp( - RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) + bool SimulcastConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) { MS_TRACE(); - MS_ASSERT(rtpStream == this->rtpStream, "RTP stream does not match"); - if (static_cast((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval) - return; + return true; - auto* report = this->rtpStream->GetRtcpSenderReport(nowMs); - - if (!report) - return; + auto* senderReport = this->rtpStream->GetRtcpSenderReport(nowMs); - packet->AddSenderReport(report); + if (!senderReport) + return true; // Build SDES chunk for this sender. auto* sdesChunk = this->rtpStream->GetRtcpSdesChunk(); - packet->AddSdesChunk(sdesChunk); + RTC::RTCP::DelaySinceLastRr* delaySinceLastRrReport{ nullptr }; auto* dlrr = this->rtpStream->GetRtcpXrDelaySinceLastRr(nowMs); if (dlrr) { - auto* report = new RTC::RTCP::DelaySinceLastRr(); - - report->AddSsrcInfo(dlrr); - packet->AddDelaySinceLastRr(report); + delaySinceLastRrReport = new RTC::RTCP::DelaySinceLastRr(); + delaySinceLastRrReport->AddSsrcInfo(dlrr); } + // RTCP Compound packet buffer cannot hold the data. + if (!packet->Add(senderReport, sdesChunk, delaySinceLastRrReport)) + return false; + this->lastRtcpSentTime = nowMs; + + return true; } void SimulcastConsumer::NeedWorstRemoteFractionLost( diff --git a/worker/src/RTC/SvcConsumer.cpp b/worker/src/RTC/SvcConsumer.cpp index e9cc89ee71..8735425aea 100644 --- a/worker/src/RTC/SvcConsumer.cpp +++ b/worker/src/RTC/SvcConsumer.cpp @@ -673,39 +673,38 @@ namespace RTC packet->RestorePayload(); } - void SvcConsumer::GetRtcp( - RTC::RTCP::CompoundPacket* packet, RTC::RtpStreamSend* rtpStream, uint64_t nowMs) + bool SvcConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs) { MS_TRACE(); - MS_ASSERT(rtpStream == this->rtpStream, "RTP stream does not match"); - if (static_cast((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval) - return; + return true; - auto* report = this->rtpStream->GetRtcpSenderReport(nowMs); - - if (!report) - return; + auto* senderReport = this->rtpStream->GetRtcpSenderReport(nowMs); - packet->AddSenderReport(report); + if (!senderReport) + return true; // Build SDES chunk for this sender. auto* sdesChunk = this->rtpStream->GetRtcpSdesChunk(); - packet->AddSdesChunk(sdesChunk); + RTC::RTCP::DelaySinceLastRr* delaySinceLastRrReport{ nullptr }; auto* dlrr = this->rtpStream->GetRtcpXrDelaySinceLastRr(nowMs); if (dlrr) { - auto* report = new RTC::RTCP::DelaySinceLastRr(); - - report->AddSsrcInfo(dlrr); - packet->AddDelaySinceLastRr(report); + delaySinceLastRrReport = new RTC::RTCP::DelaySinceLastRr(); + delaySinceLastRrReport->AddSsrcInfo(dlrr); } + // RTCP Compound packet buffer cannot hold the data. + if (!packet->Add(senderReport, sdesChunk, delaySinceLastRrReport)) + return false; + this->lastRtcpSentTime = nowMs; + + return true; } void SvcConsumer::NeedWorstRemoteFractionLost(uint32_t /*mappedSsrc*/, uint8_t& worstRemoteFractionLost) diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index f50c25f1c9..ae9d277173 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -2236,51 +2236,43 @@ namespace RTC { MS_TRACE(); - std::unique_ptr packet{ nullptr }; + std::unique_ptr packet{ new RTC::RTCP::CompoundPacket() }; for (auto& kv : this->mapConsumers) { auto* consumer = kv.second; - for (auto* rtpStream : consumer->GetRtpStreams()) + // Send the RTCP compound packet if it's full. + if (!consumer->GetRtcp(packet.get(), nowMs)) { - // Reset the Compound packet. - packet.reset(new RTC::RTCP::CompoundPacket()); - - consumer->GetRtcp(packet.get(), rtpStream, nowMs); + SendRtcpCompoundPacket(packet.get()); - // Send the RTCP compound packet if there is a sender report. - if (packet->HasSenderReport()) - { - packet->Serialize(RTC::RTCP::Buffer); - SendRtcpCompoundPacket(packet.get()); - } + // Create a new compount packet. + packet.reset(new RTC::RTCP::CompoundPacket()); } - } - // Reset the Compound packet. - packet.reset(new RTC::RTCP::CompoundPacket()); + consumer->GetRtcp(packet.get(), nowMs); + } for (auto& kv : this->mapProducers) { auto* producer = kv.second; - producer->GetRtcp(packet.get(), nowMs); - - // One more RR would exceed the MTU, send the compound packet now. - if (packet->GetSize() + sizeof(RTCP::ReceiverReport::Header) > RTC::MtuSize) + // Send the RTCP compound packet if it's full. + if (!producer->GetRtcp(packet.get(), nowMs)) { - packet->Serialize(RTC::RTCP::Buffer); SendRtcpCompoundPacket(packet.get()); - // Reset the Compound packet. + // Create a new compount packet. packet.reset(new RTC::RTCP::CompoundPacket()); } + + producer->GetRtcp(packet.get(), nowMs); } - if (packet->GetReceiverReportCount() != 0u) + // Send the RTCP compound packet if there is any sender or receiver report. + if (packet->GetReceiverReportCount() > 0u || packet->GetSenderReportCount() > 0u) { - packet->Serialize(RTC::RTCP::Buffer); SendRtcpCompoundPacket(packet.get()); } } @@ -3038,34 +3030,12 @@ namespace RTC SendRtcp(nowMs); - // Recalculate next RTCP interval. - if (!this->mapConsumers.empty()) - { - // Transmission rate in kbps. - uint32_t rate{ 0 }; - - // Get the RTP sending rate. - for (auto& kv : this->mapConsumers) - { - auto* consumer = kv.second; - - rate += consumer->GetTransmissionRate(nowMs) / 1000; - } - - // Calculate bandwidth: 360 / transmission bandwidth in kbit/s. - if (rate != 0u) - interval = 360000 / rate; - - if (interval > RTC::RTCP::MaxVideoIntervalMs) - interval = RTC::RTCP::MaxVideoIntervalMs; - } - /* * The interval between RTCP packets is varied randomly over the range - * [0.5,1.5] times the calculated interval to avoid unintended synchronization + * [1.0,1.5] times the calculated interval to avoid unintended synchronization * of all participants. */ - interval *= static_cast(Utils::Crypto::GetRandomUInt(5, 15)) / 10; + interval *= static_cast(Utils::Crypto::GetRandomUInt(10, 15)) / 10; this->rtcpTimer->Start(interval); } diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index 596d13c239..698231dbbc 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -918,6 +918,8 @@ namespace RTC if (!IsConnected()) return; + packet->Serialize(RTC::RTCP::Buffer); + const uint8_t* data = packet->GetData(); auto intLen = static_cast(packet->GetSize()); diff --git a/worker/test/src/RTC/RTCP/TestReceiverReport.cpp b/worker/test/src/RTC/RTCP/TestReceiverReport.cpp index eac415f1d1..879ab0c080 100644 --- a/worker/test/src/RTC/RTCP/TestReceiverReport.cpp +++ b/worker/test/src/RTC/RTCP/TestReceiverReport.cpp @@ -51,10 +51,12 @@ using namespace TestReceiverReport; SCENARIO("RTCP RR parsing", "[parser][rtcp][rr]") { - SECTION("parse RR packet") + SECTION("parse RR packet with a single report") { ReceiverReportPacket* packet = ReceiverReportPacket::Parse(buffer, sizeof(buffer)); + REQUIRE(packet->GetCount() == 1); + auto* report = *(packet->Begin()); verify(report); @@ -65,6 +67,22 @@ SCENARIO("RTCP RR parsing", "[parser][rtcp][rr]") packet->Serialize(serialized); + ReceiverReportPacket* packet2 = ReceiverReportPacket::Parse(serialized, sizeof(buffer)); + + REQUIRE(packet2->GetType() == Type::RR); + REQUIRE(packet2->GetCount() == 1); + REQUIRE(packet2->GetSize() == 32); + + auto* buf = reinterpret_cast(buffer); + + REQUIRE(ntohs(buf->length) == 7); + + report = *(packet2->Begin()); + + verify(report); + + delete packet2; + SECTION("compare serialized packet with original buffer") { REQUIRE(std::memcmp(buffer, serialized, sizeof(buffer)) == 0); @@ -85,7 +103,80 @@ SCENARIO("RTCP RR parsing", "[parser][rtcp][rr]") delete report; } - SECTION("create RR") + SECTION("create RR packet with more than 31 reports") + { + const size_t count = 33; + + ReceiverReportPacket packet; + + for (auto i = 1; i <= count; i++) + { + // Create report and add to packet. + ReceiverReport* report = new ReceiverReport(); + + report->SetSsrc(i); + report->SetFractionLost(i); + report->SetTotalLost(i); + report->SetLastSeq(i); + report->SetJitter(i); + report->SetLastSenderReport(i); + report->SetDelaySinceLastSenderReport(i); + + packet.AddReport(report); + } + + REQUIRE(packet.GetCount() == count); + + uint8_t buffer[1500] = { 0 }; + + // Serialization must contain 2 RR packets since report count exceeds 31. + packet.Serialize(buffer); + + auto* packet2 = static_cast(Packet::Parse(buffer, sizeof(buffer))); + + REQUIRE(packet2 != nullptr); + REQUIRE(packet2->GetCount() == 31); + + auto reportIt = packet2->Begin(); + + for (auto i = 1; i <= 31; i++, reportIt++) + { + auto* report = *reportIt; + + REQUIRE(report->GetSsrc() == i); + REQUIRE(report->GetFractionLost() == i); + REQUIRE(report->GetTotalLost() == i); + REQUIRE(report->GetLastSeq() == i); + REQUIRE(report->GetJitter() == i); + REQUIRE(report->GetLastSenderReport() == i); + REQUIRE(report->GetDelaySinceLastSenderReport() == i); + } + + ReceiverReportPacket* packet3 = static_cast(packet2->GetNext()); + + REQUIRE(packet3 != nullptr); + REQUIRE(packet3->GetCount() == 2); + + reportIt = packet3->Begin(); + + for (auto i = 1; i <= 2; i++, reportIt++) + { + auto* report = *reportIt; + + REQUIRE(report->GetSsrc() == 31 + i); + REQUIRE(report->GetFractionLost() == 31 + i); + REQUIRE(report->GetTotalLost() == 31 + i); + REQUIRE(report->GetLastSeq() == 31 + i); + REQUIRE(report->GetJitter() == 31 + i); + REQUIRE(report->GetLastSenderReport() == 31 + i); + REQUIRE(report->GetDelaySinceLastSenderReport() == 31 + i); + } + + delete packet2; + delete packet3; + } + + SECTION("create RR report") { // Create local report and check content. ReceiverReport report1; diff --git a/worker/test/src/RTC/RTCP/TestSdes.cpp b/worker/test/src/RTC/RTCP/TestSdes.cpp index 765c81d1c3..46bf9ae77c 100644 --- a/worker/test/src/RTC/RTCP/TestSdes.cpp +++ b/worker/test/src/RTC/RTCP/TestSdes.cpp @@ -52,6 +52,9 @@ SCENARIO("RTCP SDES parsing", "[parser][rtcp][sdes]") SdesPacket* packet = SdesPacket::Parse(buffer, sizeof(buffer)); SdesChunk* chunk = *(packet->Begin()); + auto* header = reinterpret_cast(buffer); + + REQUIRE(ntohs(header->length) == 6); REQUIRE(chunk); verify(chunk); @@ -70,6 +73,75 @@ SCENARIO("RTCP SDES parsing", "[parser][rtcp][sdes]") delete packet; } + SECTION("create SDES packet with more than 31 chunks") + { + const size_t count = 33; + + SdesPacket packet; + + for (auto i = 1; i <= count; i++) + { + // Create chunk and add to packet. + SdesChunk* chunk = new SdesChunk(i /*ssrc*/); + + auto* item = new RTC::RTCP::SdesItem(SdesItem::Type::CNAME, value.size(), value.c_str()); + + chunk->AddItem(item); + + packet.AddChunk(chunk); + } + + REQUIRE(packet.GetCount() == count); + + uint8_t buffer[1500] = { 0 }; + + // Serialization must contain 2 RR packets since report count exceeds 31. + packet.Serialize(buffer); + + auto* packet2 = static_cast(Packet::Parse(buffer, sizeof(buffer))); + + REQUIRE(packet2 != nullptr); + REQUIRE(packet2->GetCount() == 31); + + auto reportIt = packet2->Begin(); + + for (auto i = 1; i <= 31; i++, reportIt++) + { + auto* chunk = *reportIt; + + REQUIRE(chunk->GetSsrc() == i); + + auto* item = *(chunk->Begin()); + + REQUIRE(item->GetType() == SdesItem::Type::CNAME); + REQUIRE(item->GetSize() == 2 + value.size()); + REQUIRE(std::string(item->GetValue()) == value); + } + + SdesPacket* packet3 = static_cast(packet2->GetNext()); + + REQUIRE(packet3 != nullptr); + REQUIRE(packet3->GetCount() == 2); + + reportIt = packet3->Begin(); + + for (auto i = 1; i <= 2; i++, reportIt++) + { + auto* chunk = *reportIt; + + REQUIRE(chunk->GetSsrc() == 31 + i); + + auto* item = *(chunk->Begin()); + + REQUIRE(item->GetType() == SdesItem::Type::CNAME); + REQUIRE(item->GetSize() == 2 + value.size()); + REQUIRE(std::string(item->GetValue()) == value); + } + + delete packet2; + delete packet3; + } + SECTION("create SdesChunk") { auto* item = new SdesItem(type, length, value.c_str()); diff --git a/worker/test/src/RTC/RTCP/TestSenderReport.cpp b/worker/test/src/RTC/RTCP/TestSenderReport.cpp index 55693a19fc..48862861a0 100644 --- a/worker/test/src/RTC/RTCP/TestSenderReport.cpp +++ b/worker/test/src/RTC/RTCP/TestSenderReport.cpp @@ -94,6 +94,71 @@ SCENARIO("RTCP SR parsing", "[parser][rtcp][sr]") delete report; } + SECTION("create SR packet multiple reports") + { + const size_t count = 3; + + SenderReportPacket packet; + + for (auto i = 1; i <= count; i++) + { + // Create report and add to packet. + SenderReport* report = new SenderReport(); + + report->SetSsrc(i); + report->SetNtpSec(i); + report->SetNtpFrac(i); + report->SetRtpTs(i); + report->SetPacketCount(i); + report->SetOctetCount(i); + + packet.AddReport(report); + } + + uint8_t buffer[1500] = { 0 }; + + // Serialization must contain 3 SR packets. + packet.Serialize(buffer); + + SenderReport* reports[count]{ nullptr }; + + auto* packet2 = static_cast(Packet::Parse(buffer, sizeof(buffer))); + + REQUIRE(packet2 != nullptr); + + reports[0] = *(packet2->Begin()); + + auto* packet3 = static_cast(packet2->GetNext()); + + REQUIRE(packet3 != nullptr); + + reports[1] = *(packet3->Begin()); + + auto* packet4 = static_cast(packet3->GetNext()); + + REQUIRE(packet4 != nullptr); + + reports[2] = *(packet4->Begin()); + + for (auto i = 1; i <= count; i++) + { + auto* report = reports[i - 1]; + + REQUIRE(report != nullptr); + + REQUIRE(report->GetSsrc() == i); + REQUIRE(report->GetNtpSec() == i); + REQUIRE(report->GetNtpFrac() == i); + REQUIRE(report->GetRtpTs() == i); + REQUIRE(report->GetPacketCount() == i); + REQUIRE(report->GetOctetCount() == i); + } + + delete packet2; + delete packet3; + delete packet4; + } + SECTION("create SR") { // Create local report and check content.