Skip to content

Commit

Permalink
DIOS-4033 Use fec packets for network probing
Browse files Browse the repository at this point in the history
  • Loading branch information
akola-dolby authored and murillo128 committed Aug 21, 2024
1 parent b79feea commit 86beae4
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 46 deletions.
3 changes: 3 additions & 0 deletions include/DTLSICETransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <arpa/inet.h>
#include <list>

#include "FecProbeGenerator.h"
#include "config.h"
#include "stunmessage.h"
#include "dtls.h"
Expand Down Expand Up @@ -139,6 +140,7 @@ class DTLSICETransport :
void ReSendPacket(RTPOutgoingSourceGroup *group,WORD seq);
DWORD SendProbe(const RTPPacket::shared& packet);
DWORD SendProbe(RTPOutgoingSourceGroup *group,BYTE padding);
DWORD SendFecProbe(const RTPPacket::shared& packet, DWORD protectedSsrc);
void SendTransportWideFeedbackMessage(DWORD ssrc);

int SetLocalCryptoSDES(const char* suite, const BYTE* key, const DWORD len);
Expand Down Expand Up @@ -182,6 +184,7 @@ class DTLSICETransport :
std::map<std::string,RTPIncomingSourceGroup*> rids;
std::map<std::string,std::set<RTPIncomingSourceGroup*>> mids;
CircularQueue<RTPPacket::shared> history;
FecProbeGenerator fecProbeGenerator;

DWORD mainSSRC = 1;
DWORD lastMediaSSRC = 0;
Expand Down
281 changes: 235 additions & 46 deletions src/DTLSICETransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ DTLSICETransport::DTLSICETransport(Sender *sender,TimeService& timeService, Obje
endpoint(timeService),
dtls(*this,timeService,endpoint.GetTransport()),
history(MaxProbingHistorySize, false),
fecProbeGenerator(history, sendMaps.ext),
outgoingBitrate(250, 1E3, 250),
rtxBitrate(250, 1E3, 250),
probingBitrate(250, 1E3, 250),
Expand Down Expand Up @@ -567,6 +568,167 @@ int DTLSICETransport::onData(const ICERemoteCandidate* candidate,const BYTE* dat
return 1;
}

DWORD DTLSICETransport::SendFecProbe(const RTPPacket::shared& packet, DWORD protectedSsrc)
{
//Check packet
if (!packet)
//Done
return Error("-DTLSICETransport::SendFecProbe() | No packet\n");

//Check if we have an active DTLS connection yet
if (!send.IsSetup())
//Done
return Warning("-DTLSICETransport::SendFecProbe() | Send SRTPSession is not setup yet\n");;

//Get outgoing group
RTPOutgoingSourceGroup* group = GetOutgoingSourceGroup(protectedSsrc);

//If not found
if (!group)
//Error
return Warning("-DTLSICETransport::SendFecProbe() | Outgoind source not registered for ssrc:%u\n",protectedSsrc);

//Get current time
auto now = getTime();

BYTE flexfec03 = sendMaps.rtp.GetTypeForCodec(VideoCodec::FLEXFEC03);

//Check if we ar using rtx or not
if (flexfec03==RTPMap::NotFound)
return Error("-DTLSICETransport::SendFecProbe() | No flexfec-03 found [group:%p,ssrc:%u]\n", group, group->fec.ssrc);

//Get fec source
RTPOutgoingSource& source = group->fec;

//Update fec headers
packet->SetSSRC(source.ssrc);
packet->SetSeqNum(source.NextSeqNum());
packet->SetPayloadType(flexfec03);
//No padding
packet->SetPadding(0);

//Add transport wide cc on video
if (group->type == MediaFrame::Video && sendMaps.ext.GetTypeForCodec(RTPHeaderExtension::TransportWideCC)!=RTPMap::NotFound)
//Set transport wide seq num
packet->SetTransportSeqNum(++transportSeqNum);
else
//Disable transport wide cc
packet->DisableTransportSeqNum();

//If we are using abs send time for sending
if (sendMaps.ext.GetTypeForCodec(RTPHeaderExtension::AbsoluteSendTime)!=RTPMap::NotFound)
//Set abs send time
packet->SetAbsSentTime(now/1000);
else
//Disable it
packet->DisableAbsSentTime();

//Disable rid & repair id
packet->DisableRId();
packet->DisableRepairedId();

//Update mid
if (!group->mid.empty())
//Set new mid
packet->SetMediaStreamId(group->mid);
else
//Disable it
packet->DisableMediaStreamId();

//No frame markings
packet->DisableFrameMarkings();

//Pick one packet buffer from the pool
Packet buffer = packetPool.pick();
BYTE* data = buffer.GetData();
DWORD size = buffer.GetCapacity();

//Serialize data
int len = packet->Serialize(data,size,sendMaps.ext);

//IF failed
if (!len)
{
//Return packet to pool
packetPool.release(std::move(buffer));
//Log warning and exit
return Warning("-DTLSICETransport::SendFecProbe() | Could not serialize packet\n");
}

//If we don't have an active candidate yet
if (!active)
{
//Return packet to pool
packetPool.release(std::move(buffer));
//Log warning and exit
//Error
return Warning("-DTLSICETransport::SendFecProbe() | We don't have an active candidate yet\n");
}

//If dumping
if (dumper && dumpOutRTP)
{
//Get truncate size
DWORD truncate = dumpRTPHeadersOnly ? len - packet->GetMediaLength() + 16 : 0;
//Write udp packet
dumper->WriteUDP(now/1000,0x7F000001,5004,active->GetIPAddress(),active->GetPort(),data,len,truncate);
}

//Encript
len = send.ProtectRTP(data,len);

//Check size
if (!len)
{
//Return packet to pool
packetPool.release(std::move(buffer));
//Log warning and exit
//Error
return Error("-DTLSICETransport::SendFecProbe() | Error protecting RTP packet [ssrc:%u,%s]\n",source.ssrc,send.GetLastError());
}

//Store candidate before unlocking
ICERemoteCandidate* candidate = active;

//Set buffer size
buffer.SetSize(len);

//Check if we are using transport wide for this packet
if (packet->HasTransportWideCC() && senderSideEstimationEnabled)
//Send packet and update stats in callback
sender->Send(candidate, std::move(buffer), [
weak = std::weak_ptr<SendSideBandwidthEstimation>(senderSideBandwidthEstimator),
stats = PacketStats::CreateProbing(packet, len, now)
](std::chrono::milliseconds now) mutable {
//Get shared pointer from weak reference
auto senderSideBandwidthEstimator = weak.lock();
//If already gone
if (!senderSideBandwidthEstimator)
//Ignore
return;
//Update sent timestamp
stats.timestamp = now.count();
//Add new stat
senderSideBandwidthEstimator->SentPacket(stats);
}
);
else
//Send packet
sender->Send(candidate, std::move(buffer));

//Update current time after sending
now = getTime();
//Update bitrate
outgoingBitrate.Update(now/1000,len);

//Update last send time and stats
source.Update(now/1000, packet, len);

//Log("-DTLSICETransport::SendFecProbe() | Sent probe [size:%d]\n", len);

return len;
}

DWORD DTLSICETransport::SendProbe(const RTPPacket::shared& original)
{
//Check packet
Expand Down Expand Up @@ -2846,74 +3008,101 @@ void DTLSICETransport::Probe(QWORD now)
DWORD target = senderSideBandwidthEstimator->GetAvailableBitrate();
DWORD limit = std::min(target, probingBitrateLimit);

//Log(">DTLSICETransport::Probe() | [target:%ubps,bitrate:%ubps,probing:%ubps,history:%d,probingBitrateLimit=%ubps,maxProbingBitrate=%ubps]\n",target,bitrate,probing,history.size(),probingBitrateLimit,maxProbingBitrate);
//Check if flexfec - 03 codec is available
bool useFecForProbing = sendMaps.rtp.HasCodec(VideoCodec::FLEXFEC03) && recvMaps.rtp.HasCodec(VideoCodec::FLEXFEC03);

//Log(">DTLSICETransport::Probe() | [target:%ubps,bitrate:%ubps,probing:%ubps,history:%d,probingBitrateLimit=%ubps,maxProbingBitrate=%ubps,useFecForProbing=%d]\n",target,bitrate,probing,history.size(),probingBitrateLimit,maxProbingBitrate,useFecForProbing);

//If we can still send more
if (bitrate<limit && probing<maxProbingBitrate)
{
//Calculate how much probe bitrate should we sent
DWORD probe = std::min<DWORD>(limit - bitrate, maxProbingBitrate);
DWORD probeBitrate = std::min<DWORD>(limit - bitrate, maxProbingBitrate);

//Get size of probes
DWORD probeSize = probe*sleep/8000;

//Log("-DTLSICETransport::Probe() | Sending probe packets [target:%ubps,bitrate:%ubps,limit:%ubps,probe:%u,probingSize:%d,sleep:%d]\n", target, bitrate, limit, probe, probeSize, sleep);
//Log("-DTLSICETransport::Probe() | Sending probe packets [target:%ubps,bitrate:%ubps,limit:%ubps,probeBitrate:%ubps,probingSize:%d,sleep:%d]\n", target, bitrate, limit, probeBitrate, probeSize, sleep);

//If we have packet history
if (!history.empty())
{
int found = true;
//Get first packet
auto smallest = history.front();

//Sent until no more probe
while (probeSize && found)
{
//We need to always send one at minimun
found = false;
//For each other packet in history
for (size_t i=1; i<history.length(); ++i)
//Use flecfec-03 packets for probing
if (useFecForProbing)
{
DWORD mediaBitrate = bitrate - probing;
//Calculate fec rate as Q8 number. It is 200 instead of 255 to avoid probe exceeding.
//TODO: fecRate can become big, when bitrate is low (e.g. streaming starts), limit it.
WORD fecRate = mediaBitrate ? probeBitrate * 200 / mediaBitrate : 0;

auto fecPackets = fecProbeGenerator.PrepareFecProbePackets(fecRate, 1);
//Log("-DTLSICETransport::Probe() | Fec probing packets produced [fecRate:%u,fecPackets.size(): %u]\n", fecRate, fecPackets.size());

for (auto fecPacket : fecPackets)
{
//Get candidate
auto candidate = history.at(i);
DWORD len = SendFecProbe(fecPacket, fecProbeGenerator.GetLastProtectedSsrc().value_or(0));
//Check len
if (!len)
//Done
return;
//Update probing
probingBitrate.Update(now, len);
}
} else {

int found = true;
//Get first packet
auto smallest = history.front();

//Don't send too much data
if (candidate->GetMediaLength()>probeSize)
//Sent until no more probe
while (probeSize && found)
{
//We need to always send one at minimun
found = false;
//For each other packet in history
for (size_t i = 1; i < history.length(); ++i)
{
if (candidate->GetMediaLength() < smallest->GetMediaLength())
smallest = candidate;
//Try next
continue;
//Get candidate
auto candidate = history.at(i);

//Don't send too much data
if (candidate->GetMediaLength() > probeSize)
{
if (candidate->GetMediaLength() < smallest->GetMediaLength())
smallest = candidate;
//Try next
continue;
}
//Send probe packet
DWORD len = SendProbe(candidate);
//Check len
if (!len)
//Error
return;
//Update probing
probingBitrate.Update(now, len);
//Check size
if (len > probeSize)
//Done
break;
//Reduce probe
probeSize -= len;
found = true;
}
//Send probe packet
DWORD len = SendProbe(candidate);
}
//If we have not found any packet
if (!found)
{
//Send the smallest one
DWORD len = SendProbe(smallest);
//Check len
if (!len)
//Error
//Done
return;
//Update probing
probingBitrate.Update(now,len);
//Check size
if (len>probeSize)
//Done
break;
//Reduce probe
probeSize -= len;
found = true;
probingBitrate.Update(now, len);
}
}
//If we have not found any packet
if (!found)
{
//Send the smallest one
DWORD len = SendProbe(smallest);
//Check len
if (!len)
//Done
return;
//Update probing
probingBitrate.Update(now, len);
}
} else {
//Ensure we send at least one packet
DWORD size = std::min(255u, probeSize);
Expand Down Expand Up @@ -3019,7 +3208,7 @@ void DTLSICETransport::SetBandwidthProbing(bool probe)
Debug("-DTLSICETransport::SetBandwidthProbing() [probe:%d]\n", probe);
//Set probing status
this->probe = probe;

//Check if we need to start the timar
CheckProbeTimer();
}
Expand Down

0 comments on commit 86beae4

Please sign in to comment.