diff --git a/include/DTLSICETransport.h b/include/DTLSICETransport.h index f78ce8948..a9125cd8d 100644 --- a/include/DTLSICETransport.h +++ b/include/DTLSICETransport.h @@ -17,6 +17,7 @@ #include #include +#include "FecProbeGenerator.h" #include "config.h" #include "stunmessage.h" #include "dtls.h" @@ -139,6 +140,7 @@ class DTLSICETransport : void ReSendPacket(RTPOutgoingSourceGroup *group,WORD seq); DWORD SendProbe(const RTPPacket::shared& packet); DWORD SendProbe(RTPOutgoingSourceGroup *group,BYTE padding); + DWORD SendFecProbe(const RTPPacket::shared& packet, DWORD protectedSsrc); void SendTransportWideFeedbackMessage(DWORD ssrc); int SetLocalCryptoSDES(const char* suite, const BYTE* key, const DWORD len); @@ -182,6 +184,7 @@ class DTLSICETransport : std::map rids; std::map> mids; CircularQueue history; + FecProbeGenerator fecProbeGenerator; DWORD mainSSRC = 1; DWORD lastMediaSSRC = 0; diff --git a/src/DTLSICETransport.cpp b/src/DTLSICETransport.cpp index c93dff990..4061a5eae 100644 --- a/src/DTLSICETransport.cpp +++ b/src/DTLSICETransport.cpp @@ -57,6 +57,7 @@ DTLSICETransport::DTLSICETransport(Sender *sender,TimeService& timeService, Obje endpoint(timeService), dtls(*this,timeService,endpoint.GetTransport()), history(MaxProbingHistorySize, false), + fecProbeGenerator(history, sendMaps.ext), outgoingBitrate(250, 1E3, 250), rtxBitrate(250, 1E3, 250), probingBitrate(250, 1E3, 250), @@ -567,6 +568,167 @@ int DTLSICETransport::onData(const ICERemoteCandidate* candidate,const BYTE* dat return 1; } +DWORD DTLSICETransport::SendFecProbe(const RTPPacket::shared& packet, DWORD protectedSsrc) +{ + //Check packet + if (!packet) + //Done + return Error("-DTLSICETransport::SendFecProbe() | No packet\n"); + + //Check if we have an active DTLS connection yet + if (!send.IsSetup()) + //Done + return Warning("-DTLSICETransport::SendFecProbe() | Send SRTPSession is not setup yet\n");; + + //Get outgoing group + RTPOutgoingSourceGroup* group = GetOutgoingSourceGroup(protectedSsrc); + + //If not found + if (!group) + //Error + return Warning("-DTLSICETransport::SendFecProbe() | Outgoind source not registered for ssrc:%u\n",protectedSsrc); + + //Get current time + auto now = getTime(); + + BYTE flexfec03 = sendMaps.rtp.GetTypeForCodec(VideoCodec::FLEXFEC03); + + //Check if we ar using rtx or not + if (flexfec03==RTPMap::NotFound) + return Error("-DTLSICETransport::SendFecProbe() | No flexfec-03 found [group:%p,ssrc:%u]\n", group, group->fec.ssrc); + + //Get fec source + RTPOutgoingSource& source = group->fec; + + //Update fec headers + packet->SetSSRC(source.ssrc); + packet->SetSeqNum(source.NextSeqNum()); + packet->SetPayloadType(flexfec03); + //No padding + packet->SetPadding(0); + + //Add transport wide cc on video + if (group->type == MediaFrame::Video && sendMaps.ext.GetTypeForCodec(RTPHeaderExtension::TransportWideCC)!=RTPMap::NotFound) + //Set transport wide seq num + packet->SetTransportSeqNum(++transportSeqNum); + else + //Disable transport wide cc + packet->DisableTransportSeqNum(); + + //If we are using abs send time for sending + if (sendMaps.ext.GetTypeForCodec(RTPHeaderExtension::AbsoluteSendTime)!=RTPMap::NotFound) + //Set abs send time + packet->SetAbsSentTime(now/1000); + else + //Disable it + packet->DisableAbsSentTime(); + + //Disable rid & repair id + packet->DisableRId(); + packet->DisableRepairedId(); + + //Update mid + if (!group->mid.empty()) + //Set new mid + packet->SetMediaStreamId(group->mid); + else + //Disable it + packet->DisableMediaStreamId(); + + //No frame markings + packet->DisableFrameMarkings(); + + //Pick one packet buffer from the pool + Packet buffer = packetPool.pick(); + BYTE* data = buffer.GetData(); + DWORD size = buffer.GetCapacity(); + + //Serialize data + int len = packet->Serialize(data,size,sendMaps.ext); + + //IF failed + if (!len) + { + //Return packet to pool + packetPool.release(std::move(buffer)); + //Log warning and exit + return Warning("-DTLSICETransport::SendFecProbe() | Could not serialize packet\n"); + } + + //If we don't have an active candidate yet + if (!active) + { + //Return packet to pool + packetPool.release(std::move(buffer)); + //Log warning and exit + //Error + return Warning("-DTLSICETransport::SendFecProbe() | We don't have an active candidate yet\n"); + } + + //If dumping + if (dumper && dumpOutRTP) + { + //Get truncate size + DWORD truncate = dumpRTPHeadersOnly ? len - packet->GetMediaLength() + 16 : 0; + //Write udp packet + dumper->WriteUDP(now/1000,0x7F000001,5004,active->GetIPAddress(),active->GetPort(),data,len,truncate); + } + + //Encript + len = send.ProtectRTP(data,len); + + //Check size + if (!len) + { + //Return packet to pool + packetPool.release(std::move(buffer)); + //Log warning and exit + //Error + return Error("-DTLSICETransport::SendFecProbe() | Error protecting RTP packet [ssrc:%u,%s]\n",source.ssrc,send.GetLastError()); + } + + //Store candidate before unlocking + ICERemoteCandidate* candidate = active; + + //Set buffer size + buffer.SetSize(len); + + //Check if we are using transport wide for this packet + if (packet->HasTransportWideCC() && senderSideEstimationEnabled) + //Send packet and update stats in callback + sender->Send(candidate, std::move(buffer), [ + weak = std::weak_ptr(senderSideBandwidthEstimator), + stats = PacketStats::CreateProbing(packet, len, now) + ](std::chrono::milliseconds now) mutable { + //Get shared pointer from weak reference + auto senderSideBandwidthEstimator = weak.lock(); + //If already gone + if (!senderSideBandwidthEstimator) + //Ignore + return; + //Update sent timestamp + stats.timestamp = now.count(); + //Add new stat + senderSideBandwidthEstimator->SentPacket(stats); + } + ); + else + //Send packet + sender->Send(candidate, std::move(buffer)); + + //Update current time after sending + now = getTime(); + //Update bitrate + outgoingBitrate.Update(now/1000,len); + + //Update last send time and stats + source.Update(now/1000, packet, len); + + //Log("-DTLSICETransport::SendFecProbe() | Sent probe [size:%d]\n", len); + + return len; +} + DWORD DTLSICETransport::SendProbe(const RTPPacket::shared& original) { //Check packet @@ -2846,74 +3008,101 @@ void DTLSICETransport::Probe(QWORD now) DWORD target = senderSideBandwidthEstimator->GetAvailableBitrate(); DWORD limit = std::min(target, probingBitrateLimit); - //Log(">DTLSICETransport::Probe() | [target:%ubps,bitrate:%ubps,probing:%ubps,history:%d,probingBitrateLimit=%ubps,maxProbingBitrate=%ubps]\n",target,bitrate,probing,history.size(),probingBitrateLimit,maxProbingBitrate); + //Check if flexfec - 03 codec is available + bool useFecForProbing = sendMaps.rtp.HasCodec(VideoCodec::FLEXFEC03) && recvMaps.rtp.HasCodec(VideoCodec::FLEXFEC03); + + //Log(">DTLSICETransport::Probe() | [target:%ubps,bitrate:%ubps,probing:%ubps,history:%d,probingBitrateLimit=%ubps,maxProbingBitrate=%ubps,useFecForProbing=%d]\n",target,bitrate,probing,history.size(),probingBitrateLimit,maxProbingBitrate,useFecForProbing); //If we can still send more if (bitrate(limit - bitrate, maxProbingBitrate); + DWORD probeBitrate = std::min(limit - bitrate, maxProbingBitrate); //Get size of probes DWORD probeSize = probe*sleep/8000; - //Log("-DTLSICETransport::Probe() | Sending probe packets [target:%ubps,bitrate:%ubps,limit:%ubps,probe:%u,probingSize:%d,sleep:%d]\n", target, bitrate, limit, probe, probeSize, sleep); + //Log("-DTLSICETransport::Probe() | Sending probe packets [target:%ubps,bitrate:%ubps,limit:%ubps,probeBitrate:%ubps,probingSize:%d,sleep:%d]\n", target, bitrate, limit, probeBitrate, probeSize, sleep); //If we have packet history if (!history.empty()) { - int found = true; - //Get first packet - auto smallest = history.front(); - - //Sent until no more probe - while (probeSize && found) - { - //We need to always send one at minimun - found = false; - //For each other packet in history - for (size_t i=1; iGetMediaLength()>probeSize) + //Sent until no more probe + while (probeSize && found) + { + //We need to always send one at minimun + found = false; + //For each other packet in history + for (size_t i = 1; i < history.length(); ++i) { - if (candidate->GetMediaLength() < smallest->GetMediaLength()) - smallest = candidate; - //Try next - continue; + //Get candidate + auto candidate = history.at(i); + + //Don't send too much data + if (candidate->GetMediaLength() > probeSize) + { + if (candidate->GetMediaLength() < smallest->GetMediaLength()) + smallest = candidate; + //Try next + continue; + } + //Send probe packet + DWORD len = SendProbe(candidate); + //Check len + if (!len) + //Error + return; + //Update probing + probingBitrate.Update(now, len); + //Check size + if (len > probeSize) + //Done + break; + //Reduce probe + probeSize -= len; + found = true; } - //Send probe packet - DWORD len = SendProbe(candidate); + } + //If we have not found any packet + if (!found) + { + //Send the smallest one + DWORD len = SendProbe(smallest); //Check len if (!len) - //Error + //Done return; //Update probing - probingBitrate.Update(now,len); - //Check size - if (len>probeSize) - //Done - break; - //Reduce probe - probeSize -= len; - found = true; + probingBitrate.Update(now, len); } } - //If we have not found any packet - if (!found) - { - //Send the smallest one - DWORD len = SendProbe(smallest); - //Check len - if (!len) - //Done - return; - //Update probing - probingBitrate.Update(now, len); - } } else { //Ensure we send at least one packet DWORD size = std::min(255u, probeSize); @@ -3019,7 +3208,7 @@ void DTLSICETransport::SetBandwidthProbing(bool probe) Debug("-DTLSICETransport::SetBandwidthProbing() [probe:%d]\n", probe); //Set probing status this->probe = probe; - + //Check if we need to start the timar CheckProbeTimer(); }