Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] use seq larger than m_RcvBaseSeqNo to update group readablity #2026

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 42 additions & 20 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1116,8 +1116,10 @@ size_t CRcvBuffer::dropData(int len)
bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
bool& w_passack,
int32_t& w_skipseqno,
int32_t& w_curpktseq)
int32_t& w_curpktseq,
int32_t base_seq)
{
HLOGC(brlog.Debug, log << "getRcvFirstMsg: base_seq=" << base_seq);
w_skipseqno = SRT_SEQNO_NONE;
w_passack = false;
// tsbpdtime will be retrieved by the below call
Expand All @@ -1130,8 +1132,8 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,

/* Check the acknowledged packets */
// getRcvReadyMsg returns true if the time to play for the first message
// (returned in w_tsbpdtime) is in the past.
if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1))
// that larger than base_seq is in the past.
if (getRcvReadyMsg((w_tsbpdtime), (w_curpktseq), -1, base_seq))
{
HLOGC(brlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << w_curpktseq);
return true;
Expand Down Expand Up @@ -1160,9 +1162,10 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
* No acked packets ready but caller want to know next packet to wait for
* Check the not yet acked packets that may be stuck by missing packet(s).
*/
bool haslost = false;
w_tsbpdtime = steady_clock::time_point(); // redundant, for clarity
w_passack = true;
bool haslost = false;
steady_clock::time_point tsbpdtime = steady_clock::time_point();
w_tsbpdtime = steady_clock::time_point();
w_passack = true;

// XXX SUSPECTED ISSUE with this algorithm:
// The above call to getRcvReadyMsg() should report as to whether:
Expand All @@ -1188,8 +1191,11 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
// When done so, the below loop would be completely unnecessary.

// Logical description of the below algorithm:
// 1. Check if the VERY FIRST PACKET is valid; if so then:
// - check if it's ready to play, return boolean value that marks it.
// 1. update w_tsbpdtime and w_curpktseq if found one packet ready to play
// - keep check the next packet if still smaller than base_seq
// 2. set w_skipseqno if found packets before w_curpktseq lost
// if no packets larger than base_seq ready to play, return the largest RTP
// else return the first one that larger than base_seq and rady to play

for (int i = m_iLastAckPos, n = shift(m_iLastAckPos, m_iMaxPos); i != n; i = shiftFwd(i))
{
Expand All @@ -1201,19 +1207,21 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
}
else
{
/* We got the 1st valid packet */
w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
if (w_tsbpdtime <= steady_clock::now())
tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
if (tsbpdtime <= steady_clock::now())
{
/* Packet ready to play */
w_tsbpdtime = tsbpdtime;
w_curpktseq = m_pUnit[i]->m_Packet.m_iSeqNo;
if (haslost)
w_skipseqno = w_curpktseq;

if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqoff(base_seq, w_curpktseq) <= 0)
gou4shi1 marked this conversation as resolved.
Show resolved Hide resolved
{
/*
* Packet stuck on non-acked side because of missing packets.
* Tell 1st valid packet seqno so caller can skip (drop) the missing packets.
*/
w_skipseqno = m_pUnit[i]->m_Packet.m_iSeqNo;
w_curpktseq = w_skipseqno;
HLOGC(brlog.Debug,
log << "getRcvFirstMsg: found ready packet %" << w_curpktseq
<< " but not larger than base_seq, try next");
continue;
}

HLOGC(brlog.Debug,
Expand All @@ -1227,6 +1235,10 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
// ...
return true;
}

if (!is_zero(w_tsbpdtime)) {
return true;
}
HLOGC(brlog.Debug,
log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
Expand All @@ -1239,6 +1251,9 @@ bool CRcvBuffer::getRcvFirstMsg(steady_clock::time_point& w_tsbpdtime,
// the 'haslost' is set, which means that it continues only to find the first valid
// packet after stating that the very first packet isn't valid.
}
if (!is_zero(w_tsbpdtime)) {
return true;
}
HLOGC(brlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
return false;
}
Expand Down Expand Up @@ -1269,7 +1284,7 @@ int32_t CRcvBuffer::getTopMsgno() const
return m_pUnit[m_iStartPos]->m_Packet.getMsgSeq();
}

bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto)
bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq)
{
const bool havelimit = upto != -1;
int end = -1, past_end = -1;
Expand Down Expand Up @@ -1335,7 +1350,8 @@ bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t&
// 1. Get the TSBPD time of the unit. Stop and return false if this unit
// is not yet ready to play.
// 2. If it's ready to play, check also if it's decrypted. If not, skip it.
// 3. If it's ready to play and decrypted, stop and return it.
// 3. Check also if it's larger than base_seq, if not, skip it.
// 4. If it's ready to play, decrypted and larger than base, stop and return it.
if (!havelimit)
{
w_tsbpdtime = getPktTsbPdTime(m_pUnit[i]->m_Packet.getMsgTimeStamp());
Expand All @@ -1354,6 +1370,12 @@ bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t&
IF_HEAVY_LOGGING(reason = "DECRYPTION FAILED");
freeunit = true; /* packet not decrypted */
}
else if (base_seq != SRT_SEQNO_NONE && CSeqNo::seqoff(base_seq, w_curpktseq) <= 0)
gou4shi1 marked this conversation as resolved.
Show resolved Hide resolved
{
IF_HEAVY_LOGGING(reason = "smaller than base_seq");
w_tsbpdtime = steady_clock::time_point();
freeunit = true;
}
else
{
HLOGC(brlog.Debug,
Expand Down Expand Up @@ -1408,7 +1430,7 @@ bool CRcvBuffer::getRcvReadyMsg(steady_clock::time_point& w_tsbpdtime, int32_t&

if (freeunit)
{
HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED");
HLOGC(brlog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED: " << reason);
/* removed skipped, dropped, undecryptable bytes from rcv buffer */
const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
countBytes(-1, -rmbytes, true);
Expand Down
10 changes: 8 additions & 2 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,17 @@ class CRcvBuffer
/// if none
/// @param [out] w_passack true if 1st ready packet is not yet acknowleged (allowed to be delivered to the app)
/// @param [out] w_skipseqno SRT_SEQNO_NONE or seq number of 1st unacknowledged pkt ready to play preceeded by
/// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base if exist RTP larger than base
gou4shi1 marked this conversation as resolved.
Show resolved Hide resolved
/// missing packets.
/// @retval true 1st packet ready to play (tsbpdtime <= now). Not yet acknowledged if passack == true
/// @retval false IF tsbpdtime = 0: rcv buffer empty; ELSE:
/// IF skipseqno != SRT_SEQNO_NONE, packet ready to play preceeded by missing packets.;
/// IF skipseqno == SRT_SEQNO_NONE, no missing packet but 1st not ready to play.
bool getRcvFirstMsg(time_point& w_tsbpdtime, bool& w_passack, int32_t& w_skipseqno, int32_t& w_curpktseq);
bool getRcvFirstMsg(time_point& w_tsbpdtime,
bool& w_passack,
int32_t& w_skipseqno,
int32_t& w_curpktseq,
int32_t base_seq = SRT_SEQNO_NONE);

/// Update the ACK point of the buffer.
/// @param [in] len size of data to be skip & acknowledged.
Expand Down Expand Up @@ -473,9 +478,10 @@ class CRcvBuffer
/// Parameters (of the 1st packet queue, ready to play or not):
/// @param [out] tsbpdtime localtime-based (uSec) packet time stamp including buffering delay of 1st packet or 0 if
/// none
/// @param base_seq SRT_SEQNO_NONE or desired, ignore seq smaller than base
/// @retval true 1st packet ready to play without discontinuity (no hole)
/// @retval false tsbpdtime = 0: no packet ready to play
bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto);
bool getRcvReadyMsg(time_point& w_tsbpdtime, int32_t& w_curpktseq, int upto, int base_seq = SRT_SEQNO_NONE);

public:
/// @brief Get clock drift in microseconds.
Expand Down
11 changes: 10 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5149,8 +5149,17 @@ void * srt::CUDT::tsbpd(void *param)
int32_t current_pkt_seq = 0;
steady_clock::time_point tsbpdtime;
bool rxready = false;
int32_t rcv_base_seq = SRT_SEQNO_NONE;
#if ENABLE_EXPERIMENTAL_BONDING
bool shall_update_group = false;
if (gkeeper.group)
{
// Functions called below will lock m_GroupLock, which in hierarchy
// lies after m_RecvLock. Must unlock m_RecvLock to be able to lock
// m_GroupLock inside the calls.
InvertedLock unrecv(self->m_RecvLock);
rcv_base_seq = gkeeper.group->getRcvBaseSeqNo();
}
#endif

enterCS(self->m_RcvBufferLock);
Expand All @@ -5162,7 +5171,7 @@ void * srt::CUDT::tsbpd(void *param)
int32_t skiptoseqno = SRT_SEQNO_NONE;
bool passack = true; // Get next packet to wait for even if not acked

rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq));
rxready = self->m_pRcvBuffer->getRcvFirstMsg((tsbpdtime), (passack), (skiptoseqno), (current_pkt_seq), rcv_base_seq);

HLOGC(tslog.Debug,
log << boolalpha << "NEXT PKT CHECK: rdy=" << rxready << " passack=" << passack << " skipto=%"
Expand Down
6 changes: 6 additions & 0 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2134,6 +2134,12 @@ void CUDTGroup::updateReadState(SRTSOCKET /* not sure if needed */, int32_t sequ
}
}

int32_t CUDTGroup::getRcvBaseSeqNo()
{
ScopedLock lg(m_GroupLock);
return m_RcvBaseSeqNo;
}

void CUDTGroup::updateWriteState()
{
ScopedLock lg(m_GroupLock);
Expand Down
1 change: 1 addition & 0 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ class CUDTGroup
void updateWriteState();
void updateFailedLink();
void activateUpdateEvent(bool still_have_items);
int32_t getRcvBaseSeqNo();

/// Update the in-group array of packet providers per sequence number.
/// Also basing on the information already provided by possibly other sockets,
Expand Down