Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Stats: do not count discarded packets as dropped. #2932

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ int CRcvBuffer::insert(CUnit* unit)
return 0;
}

int CRcvBuffer::dropUpTo(int32_t seqno)
std::pair<int, int> CRcvBuffer::dropUpTo(int32_t seqno)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);
Expand All @@ -215,16 +215,23 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
if (len <= 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
return 0;
return std::make_pair(0, 0);
}

m_iMaxPosOff -= len;
if (m_iMaxPosOff < 0)
m_iMaxPosOff = 0;

const int iDropCnt = len;
int iNumDropped = 0; // Number of dropped packets that were missing.
int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer.
while (len > 0)
{
// Note! Dropping a EntryState_Read must not be counted as a drop because it was read.
// Note! Dropping a EntryState_Drop must not be counted as a drop because it was already dropped and counted earlier.
if (m_entries[m_iStartPos].status == EntryState_Avail)
++iNumDiscarded;
else if (m_entries[m_iStartPos].status == EntryState_Empty)
++iNumDropped;
dropUnitInPos(m_iStartPos);
m_entries[m_iStartPos].status = EntryState_Empty;
SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty);
Expand All @@ -246,7 +253,7 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
updateFirstReadableOutOfOrder();
return iDropCnt;
return std::make_pair(iNumDropped, iNumDiscarded);
}

int CRcvBuffer::dropAll()
Expand All @@ -255,7 +262,8 @@ int CRcvBuffer::dropAll()
return 0;

const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff);
return dropUpTo(end_seqno);
const std::pair<int, int> numDropped = dropUpTo(end_seqno);
return numDropped.first + numDropped.second;
}

int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting)
Expand Down
4 changes: 2 additions & 2 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class CRcvBuffer

/// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno).
/// @param [in] seqno drop units up to this sequence number
/// @return number of dropped packets.
int dropUpTo(int32_t seqno);
/// @return number of dropped (missing) and discarded (available) packets as a pair(dropped, discarded).
std::pair<int, int> dropUpTo(int32_t seqno);

/// @brief Drop all the packets in the receiver buffer.
/// The starting position and seqno are shifted right after the last packet in the buffer.
Expand Down
19 changes: 13 additions & 6 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5576,24 +5576,30 @@ void * srt::CUDT::tsbpd(void* param)
return NULL;
}

int srt::CUDT::rcvDropTooLateUpTo(int seqno)
int srt::CUDT::rcvDropTooLateUpTo(int seqno, DropReason reason)
{
// Make sure that it would not drop over m_iRcvCurrSeqNo, which may break senders.
if (CSeqNo::seqcmp(seqno, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0)
seqno = CSeqNo::incseq(m_iRcvCurrSeqNo);

dropFromLossLists(SRT_SEQNO_NONE, CSeqNo::decseq(seqno));

const int iDropCnt = m_pRcvBuffer->dropUpTo(seqno);
if (iDropCnt > 0)
const std::pair<int, int> iDropDiscardedPkts = m_pRcvBuffer->dropUpTo(seqno);
const int iDropCnt = iDropDiscardedPkts.first;
const int iDiscardedCnt = iDropDiscardedPkts.second;
const int iDropCntTotal = iDropCnt + iDiscardedCnt;

// In case of DROP_TOO_LATE discarded packets should also be counted because they are not read from another member socket.
const int iDropStatCnt = (reason == DROP_DISCARD) ? iDropCnt : iDropCntTotal;
if (iDropStatCnt > 0)
{
enterCS(m_StatsLock);
// Estimate dropped bytes from average payload size.
const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize();
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt));
m_stats.rcvr.dropped.count(stats::BytesPackets(iDropStatCnt * avgpayloadsz, (uint32_t)iDropStatCnt));
leaveCS(m_StatsLock);
}
return iDropCnt;
return iDropCntTotal;
}

void srt::CUDT::setInitialRcvSeq(int32_t isn)
Expand Down Expand Up @@ -7835,7 +7841,7 @@ void srt::CUDT::dropToGroupRecvBase()
return;

ScopedLock lck(m_RcvBufferLock);
int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base));
const int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base), DROP_DISCARD);
if (cnt > 0)
{
HLOGC(grlog.Debug,
Expand Down Expand Up @@ -8063,6 +8069,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
string reason; // just for "a reason" of giving particular % for ACK

#if ENABLE_BONDING
// TODO: The group drops other members upon reading, maybe no longer needed here?
dropToGroupRecvBase();
#endif

Expand Down
9 changes: 8 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -755,11 +755,18 @@ class CUDT
// TSBPD thread main function.
static void* tsbpd(void* param);

enum DropReason
{
DROP_TOO_LATE, //< Drop to keep up to the live pace (TLPKTDROP).
DROP_DISCARD //< Drop because another group member already provided these packets.
};

/// Drop too late packets (receiver side). Update loss lists and ACK positions.
/// The @a seqno packet itself is not dropped.
/// @param seqno [in] The sequence number of the first packets following those to be dropped.
/// @param reason A reason for dropping (see @a DropReason).
/// @return The number of packets dropped.
int rcvDropTooLateUpTo(int seqno);
int rcvDropTooLateUpTo(int seqno, DropReason reason = DROP_TOO_LATE);

static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt);
static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt);
Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2324,7 +2324,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo), CUDT::DROP_DISCARD);
if (cnt > 0)
{
HLOGC(grlog.Debug,
Expand Down
Loading