Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix MaxBW limitation #2232

Merged
merged 2 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion srtcore/congctl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class LiveCC: public SrtCongestionControlBase
{
m_llSndMaxBW = BW_INFINITE; // 1 Gbbps in Bytes/sec BW_INFINITE
m_zMaxPayloadSize = parent->OPT_PayloadSize();
if ( m_zMaxPayloadSize == 0 )
if (m_zMaxPayloadSize == 0)
m_zMaxPayloadSize = parent->maxPayloadSize();
m_zSndAvgPayloadSize = m_zMaxPayloadSize;

Expand Down
140 changes: 67 additions & 73 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5174,8 +5174,7 @@ void * srt::CUDT::tsbpd(void* param)
rxready = true;
if (info.seq_gap)
{
const int iDropCnt SRT_ATR_UNUSED = self->dropTooLateUpTo(info.seqno);

const int iDropCnt SRT_ATR_UNUSED = self->rcvDropTooLateUpTo(info.seqno);
#if ENABLE_EXPERIMENTAL_BONDING
shall_update_group = true;
#endif
Expand Down Expand Up @@ -5303,7 +5302,7 @@ void * srt::CUDT::tsbpd(void* param)
return NULL;
}

int srt::CUDT::dropTooLateUpTo(int seqno)
int srt::CUDT::rcvDropTooLateUpTo(int seqno)
{
const int seq_gap_len = CSeqNo::seqoff(m_iRcvLastSkipAck, seqno);

Expand Down Expand Up @@ -6327,10 +6326,10 @@ int srt::CUDT::receiveBuffer(char *data, int len)

// [[using maybe_locked(CUDTGroup::m_GroupLock, m_parent->m_GroupOf != NULL)]];
// [[using locked(m_SendLock)]];
bool srt::CUDT::checkNeedDrop()
int srt::CUDT::sndDropTooLate()
{
if (!m_bPeerTLPktDrop)
return false;
return 0;

if (!m_config.bMessageAPI)
{
Expand All @@ -6352,72 +6351,64 @@ bool srt::CUDT::checkNeedDrop()
+ (2 * COMM_SYN_INTERVAL_US / 1000)
: 0;

bool bCongestion = false;
if (threshold_ms && buffdelay_ms > threshold_ms)
{
// protect packet retransmission
enterCS(m_RecvAckLock);
int dbytes;
int32_t first_msgno;
int dpkts = m_pSndBuffer->dropLateData((dbytes), (first_msgno), tnow - milliseconds_from(threshold_ms));
if (dpkts > 0)
{
enterCS(m_StatsLock);
m_stats.sndr.dropped.count(stats::BytesPackets(dbytes, dpkts));
leaveCS(m_StatsLock);
if (threshold_ms == 0 || buffdelay_ms <= threshold_ms)
return 0;

IF_HEAVY_LOGGING(const int32_t realack = m_iSndLastDataAck);
const int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts);
// protect packet retransmission
ScopedLock rcvlck(m_RecvAckLock);
int dbytes;
int32_t first_msgno;
const int dpkts = m_pSndBuffer->dropLateData((dbytes), (first_msgno), tnow - milliseconds_from(threshold_ms));
if (dpkts <= 0)
return 0;

m_iSndLastAck = fakeack;
m_iSndLastDataAck = fakeack;
// If some packets were dropped update stats, socket state, loss list and the parent group if any.
enterCS(m_StatsLock);
m_stats.sndr.dropped.count(dbytes);;
leaveCS(m_StatsLock);

int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck);
m_pSndLossList->removeUpTo(minlastack);
/* If we dropped packets not yet sent, advance current position */
// THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1)
if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0)
{
m_iSndCurrSeqNo = minlastack;
}
IF_HEAVY_LOGGING(const int32_t realack = m_iSndLastDataAck);
const int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts);

HLOGC(aslog.Debug, log << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n="
<< dpkts << "pkt " << dbytes << "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno);
m_iSndLastAck = fakeack;
m_iSndLastDataAck = fakeack;

#if ENABLE_EXPERIMENTAL_BONDING
// This is done with a presumption that the group
// exists and if this is not NULL, it means that this
// function was called with locked m_GroupLock, as sendmsg2
// function was called from inside CUDTGroup::send, which
// locks the whole function.
//
// XXX This is true only because all existing groups are managed
// groups, that is, sockets cannot be added or removed from group
// manually, nor can send/recv operation be done on a single socket
// from the API call directly. This should be extra verified, if that
// changes in the future.
//
if (m_parent->m_GroupOf)
{
// What's important is that the lock on GroupLock cannot be applied
// here, both because it might be applied already, that is, according
// to the condition defined at this function's header, it is applied
// under this condition. Hence ackMessage can be defined as 100% locked.
m_parent->m_GroupOf->ackMessage(first_msgno);
}
#endif
}
bCongestion = true;
leaveCS(m_RecvAckLock);
}
else if (buffdelay_ms > (m_iPeerTsbPdDelay_ms / 2))
const int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck);
m_pSndLossList->removeUpTo(minlastack);
/* If we dropped packets not yet sent, advance current position */
// THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1)
if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0)
{
HLOGC(aslog.Debug,
log << "cong TIMESPAN " << buffdelay_ms << "ms");
m_iSndCurrSeqNo = minlastack;
}

bCongestion = true;
HLOGC(aslog.Debug, log << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n="
<< dpkts << "pkt " << dbytes << "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno);

#if ENABLE_EXPERIMENTAL_BONDING
// This is done with a presumption that the group
// exists and if this is not NULL, it means that this
// function was called with locked m_GroupLock, as sendmsg2
// function was called from inside CUDTGroup::send, which
// locks the whole function.
//
// XXX This is true only because all existing groups are managed
// groups, that is, sockets cannot be added or removed from group
// manually, nor can send/recv operation be done on a single socket
// from the API call directly. This should be extra verified, if that
// changes in the future.
//
if (m_parent->m_GroupOf)
{
// What's important is that the lock on GroupLock cannot be applied
// here, both because it might be applied already, that is, according
// to the condition defined at this function's header, it is applied
// under this condition. Hence ackMessage can be defined as 100% locked.
m_parent->m_GroupOf->ackMessage(first_msgno);
}
return bCongestion;
#endif

return dpkts;
}

int srt::CUDT::sendmsg(const char *data, int len, int msttl, bool inorder, int64_t srctime)
Expand Down Expand Up @@ -6520,9 +6511,9 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl)
m_iReXmitCount = 1;
}

// checkNeedDrop(...) may lock m_RecvAckLock
// sndDropTooLate(...) may lock m_RecvAckLock
// to modify m_pSndBuffer and m_pSndLossList
const bool bCongestion = checkNeedDrop();
const int iPktsTLDropped SRT_ATR_UNUSED = sndDropTooLate();

int minlen = 1; // Minimum sender buffer space required for STREAM API
if (m_config.bMessageAPI)
Expand Down Expand Up @@ -6701,12 +6692,13 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl)
}
}

// insert this socket to the snd list if it is not on the list yet
// Insert this socket to the snd list if it is not on the list already.
// m_pSndUList->pop may lock CSndUList::m_ListLock and then m_RecvAckLock
m_pSndQueue->m_pSndUList->update(this, CSndUList::rescheduleIf(bCongestion));
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);

#ifdef SRT_ENABLE_ECN
if (bCongestion)
// IF there was a packet drop on the sender side, report congestion to the app.
if (iPktsTLDropped > 0)
{
LOGC(aslog.Error, log << "sendmsg2: CONGESTION; reporting error");
throw CUDTException(MJ_AGAIN, MN_CONGESTION, 0);
Expand Down Expand Up @@ -8192,7 +8184,7 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)

// Guard access to m_iSndAckedMsgNo field
// Note: This can't be done inside CUDTGroup::ackMessage
// because this function is also called from CUDT::checkNeedDrop
// because this function is also called from CUDT::sndDropTooLate
// called from CUDT::sendmsg2 called from CUDTGroup::send, which
// applies the lock on m_GroupLock already.
ScopedLock glk (*m_parent->m_GroupOf->exp_groupLock());
Expand Down Expand Up @@ -8696,7 +8688,7 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt)
}

// the lost packet (retransmission) should be sent out immediately
m_pSndQueue->m_pSndUList->update(this, CSndUList::DO_RESCHEDULE);
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);

enterCS(m_StatsLock);
m_stats.sndr.recvdNak.count(1);
Expand Down Expand Up @@ -9321,18 +9313,20 @@ std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
m_stats.sndr.sentUnique.count(payload);
leaveCS(m_StatsLock);

const duration sendint = m_tdSendInterval;
if (probe)
{
// sends out probing packet pair
m_tsNextSendTime = enter_time;
// Sending earlier, need to adjust the pace later on.
m_tdSendTimeDiff = m_tdSendTimeDiff.load() - sendint;
probe = false;
}
else
{
#if USE_BUSY_WAITING
m_tsNextSendTime = enter_time + m_tdSendInterval.load();
#else
const duration sendint = m_tdSendInterval;
const duration sendbrw = m_tdSendTimeDiff;

if (sendbrw >= sendint)
Expand Down Expand Up @@ -11166,8 +11160,8 @@ void srt::CUDT::checkRexmitTimer(const steady_clock::time_point& currtime)
const ECheckTimerStage stage = is_fastrexmit ? TEV_CHT_FASTREXMIT : TEV_CHT_REXMIT;
updateCC(TEV_CHECKTIMER, EventVariant(stage));

// immediately restart transmission
m_pSndQueue->m_pSndUList->update(this, CSndUList::DO_RESCHEDULE);
// schedule sending if not scheduled already
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
}

void srt::CUDT::checkTimers()
Expand Down
9 changes: 6 additions & 3 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,10 @@ class CUDT

void updateIdleLinkFrom(CUDT* source);

bool checkNeedDrop();
/// @brief Drop packets too late to be delivered if any.
/// @returns the number of packets actually dropped.
SRT_ATTR_REQUIRES(m_RecvAckLock, m_StatsLock)
int sndDropTooLate();

/// Connect to a UDT entity as per hs request. This will update
/// required data in the entity, then update them also in the hs structure,
Expand Down Expand Up @@ -706,11 +709,11 @@ class CUDT
static void* tsbpd(void* param);

#if ENABLE_NEW_RCVBUFFER
/// Drop too late packets. Updaet loss lists and ACK positions.
/// Drop too late packets (receiver side). Updaet loss lists and ACK positions.
/// The @a seqno packet itself is not dropped.
/// @param seqno [in] The sequence number of the first packets following those to be dropped.
/// @return The number of packets dropped.
int dropTooLateUpTo(int seqno);
int rcvDropTooLateUpTo(int seqno);
#endif

void updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno);
Expand Down
3 changes: 1 addition & 2 deletions srtcore/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ class BytesPackets

uint64_t bytesWithHdr() const
{
static const int PKT_HDR_SIZE = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE;
return m_bytes + m_packets * PKT_HDR_SIZE;
return m_bytes + m_packets * CPacket::SRT_DATA_HDR_SIZE;
}

private:
Expand Down