diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 660f2b335..e2efcefff 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -1722,6 +1722,75 @@ unsigned CRcvBuffer::getRcvAvgPayloadSize() const return m_uAvgPayloadSz; } +CRcvBuffer::ReadingState CRcvBuffer::debugGetReadingState() const +{ + ReadingState readstate; + + readstate.iNumAcknowledged = 0; + readstate.iNumUnacknowledged = m_iMaxPos; + + if ((NULL != m_pUnit[m_iStartPos]) && (m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD)) + { + if (m_tsbpd.isEnabled()) + readstate.tsStart = m_tsbpd.getPktTsbPdTime(m_pUnit[m_iStartPos]->m_Packet.getMsgTimeStamp()); + + readstate.iNumAcknowledged = m_iLastAckPos > m_iStartPos + ? m_iLastAckPos - m_iStartPos + : m_iLastAckPos + (m_iSize - m_iStartPos); + } + + // All further stats are valid if TSBPD is enabled. + if (!m_tsbpd.isEnabled()) + return readstate; + + // m_iLastAckPos points to the first unacknowledged packet + const int iLastAckPos = (m_iLastAckPos - 1) % m_iSize; + if (m_iLastAckPos != m_iStartPos && (NULL != m_pUnit[iLastAckPos]) && (m_pUnit[iLastAckPos]->m_iFlag == CUnit::GOOD)) + { + readstate.tsLastAck = m_tsbpd.getPktTsbPdTime(m_pUnit[iLastAckPos]->m_Packet.getMsgTimeStamp()); + } + + const int iEndPos = (m_iLastAckPos + m_iMaxPos - 1) % m_iSize; + if (m_iMaxPos == 0) + { + readstate.tsEnd = readstate.tsLastAck; + } + else if ((NULL != m_pUnit[iEndPos]) && (m_pUnit[iEndPos]->m_iFlag == CUnit::GOOD)) + { + readstate.tsEnd = m_tsbpd.getPktTsbPdTime(m_pUnit[iEndPos]->m_Packet.getMsgTimeStamp()); + } + + return readstate; +} + +string CRcvBuffer::strFullnessState(const time_point& tsNow) const +{ + const ReadingState bufstate = debugGetReadingState(); + stringstream ss; + + ss << "Space avail " << getAvailBufSize() << "/" << m_iSize; + ss << " pkts. Packets ACKed: " << bufstate.iNumAcknowledged; + if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsLastAck)) + { + ss << " (TSBPD ready in "; + ss << count_milliseconds(bufstate.tsStart - tsNow); + ss << " : "; + ss << count_milliseconds(bufstate.tsLastAck - tsNow); + ss << " ms)"; + } + + ss << ", not ACKed: " << bufstate.iNumUnacknowledged; + if (!is_zero(bufstate.tsStart) && !is_zero(bufstate.tsEnd)) + { + ss << ", timespan "; + ss << count_milliseconds(bufstate.tsEnd - bufstate.tsStart); + ss << " ms"; + } + + ss << ". " SRT_SYNC_CLOCK_STR " drift " << getDrift() / 1000 << " ms."; + return ss.str(); +} + void CRcvBuffer::dropMsg(int32_t msgno, bool using_rexmit_flag) { for (int i = m_iStartPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i)) diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 6a21df1ce..d32e1d29e 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -348,6 +348,21 @@ class CRcvBuffer /// @return size (bytes) of payload size unsigned getRcvAvgPayloadSize() const; + struct ReadingState + { + time_point tsStart; + time_point tsLastAck; + time_point tsEnd; + int iNumAcknowledged; + int iNumUnacknowledged; + }; + + ReadingState debugGetReadingState() const; + + /// Form a string of the current buffer fullness state. + /// number of packets acknowledged, TSBPD readiness, etc. + std::string strFullnessState(const time_point& tsNow) const; + /// Mark the message to be dropped from the message list. /// @param [in] msgno message number. /// @param [in] using_rexmit_flag whether the MSGNO field uses rexmit flag (if not, one more bit is part of the @@ -462,6 +477,7 @@ class CRcvBuffer bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto); public: + /// @brief Get clock drift in microseconds. int64_t getDrift() const { return m_tsbpd.drift(); } public: diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 35db04142..81eaa9628 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -9622,12 +9622,11 @@ int CUDT::processData(CUnit* in_unit) } else { - LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet: offset=" - << offset << " avail=" << avail_bufsize - << " ack.seq=" << m_iRcvLastSkipAck << " pkt.seq=" << rpkt.m_iSeqNo - << " rcv-remain=" << m_pRcvBuffer->debugGetSize() - << " drift=" << m_pRcvBuffer->getDrift() - ); + LOGC(qrlog.Warn, log << CONID() << "No room to store incoming packet seqno " << rpkt.m_iSeqNo + << ", insert offset " << offset << ". " + << m_pRcvBuffer->strFullnessState(steady_clock::now()) + ); + return -1; } }