Skip to content

Commit

Permalink
Added more logging around delivery (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethouris authored and rndi committed Nov 4, 2019
1 parent 291bdf2 commit aaef22a
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 19 deletions.
74 changes: 74 additions & 0 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,65 @@ CPacket* CRcvBuffer::getRcvReadyPacket()
return 0;
}

#if ENABLE_HEAVY_LOGGING
// This function is for debug purposes only and it's called only
// from within HLOG* macros.
void CRcvBuffer::reportBufferStats()
{
int nmissing = 0;
int32_t low_seq= -1, high_seq = -1;
int32_t low_ts = 0, high_ts = 0;

for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
{
if ( m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD )
{
low_seq = m_pUnit[i]->m_Packet.m_iSeqNo;
low_ts = m_pUnit[i]->m_Packet.m_iTimeStamp;
break;
}
++nmissing;
}

// Not sure if a packet MUST BE at the last ack pos position, so check, just in case.
int n = m_iLastAckPos;
if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD)
{
high_ts = m_pUnit[n]->m_Packet.m_iTimeStamp;
high_seq = m_pUnit[n]->m_Packet.m_iSeqNo;
}
else
{
// Possibilities are:
// m_iStartPos == m_iLastAckPos, high_ts == low_ts, defined.
// No packet: low_ts == 0, so high_ts == 0, too.
high_ts = low_ts;
}
// The 32-bit timestamps are relative and roll over oftten; what
// we really need is the timestamp difference. The only place where
// we can ask for the time base is the upper time because when trying
// to receive the time base for the lower time we'd break the requirement
// for monotonic clock.

uint64_t upper_time = high_ts;
uint64_t lower_time = low_ts;

if (lower_time > upper_time)
upper_time += uint64_t(CPacket::MAX_TIMESTAMP)+1;

int32_t timespan = upper_time - lower_time;
int seqspan = 0;
if (low_seq != -1 && high_seq != -1)
{
seqspan = CSeqNo::seqoff(low_seq, high_seq);
}

LOGC(dlog.Debug, log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing << "pkts");
LOGC(dlog.Debug, log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << FormatTime(lower_time) << " hi=" << FormatTime(upper_time) << ")");
}

#endif // ENABLE_HEAVY_LOGGING

bool CRcvBuffer::isRcvDataReady()
{
uint64_t tsbpdtime;
Expand Down Expand Up @@ -1546,7 +1605,18 @@ void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mut
printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift());
#endif /* SRT_DEBUG_TSBPD_DRIFT */

#if ENABLE_HEAVY_LOGGING
uint64_t oldbase = m_ullTsbPdTimeBase;
#endif
m_ullTsbPdTimeBase += m_DriftTracer.overdrift();

HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms AVG="
<< (m_DriftTracer.drift()/1000.0) << "ms, TB: "
<< FormatTime(oldbase) << " UPDATED TO: " << FormatTime(m_ullTsbPdTimeBase));
}
else
{
HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms TB REMAINS: " << FormatTime(m_ullTsbPdTimeBase));
}

CGuard::leaveCS(mutex_to_lock);
Expand All @@ -1567,6 +1637,10 @@ int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
bool empty = true;
uint64_t& rplaytime = msgctl.srctime;

#ifdef ENABLE_HEAVY_LOGGING
reportBufferStats();
#endif

if (m_bTsbPdMode)
{
passack = false;
Expand Down
7 changes: 6 additions & 1 deletion srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ class CRcvBuffer

void skipData(int len);

#if ENABLE_HEAVY_LOGGING
void reportBufferStats(); // Heavy logging Debug only
#endif

private:
/// Adjust receive queue to 1st ready to play message (tsbpdtime < now).
Expand All @@ -402,10 +405,12 @@ class CRcvBuffer

bool getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpktseq);

public:

// (This is exposed as used publicly in logs)
/// Get packet delivery local time base (adjusted for wrap around)
/// @param [in] timestamp packet timestamp (relative to peer StartTime), wrapping around every ~72 min
/// @return local delivery time (usec)

uint64_t getTsbPdTimeBase(uint32_t timestamp_us);

/// Get packet local delivery time
Expand Down
17 changes: 10 additions & 7 deletions srtcore/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,20 +423,23 @@ int CChannel::sendto(const sockaddr* addr, CPacket& packet) const

if (packet.isControl())
{
spec << " CONTROL size=" << packet.getLength()
spec << " type=CONTROL"
<< " cmd=" << MessageTypeStr(packet.getType(), packet.getExtendedType())
<< " arg=" << packet.header(SRT_PH_MSGNO);
}
else
{
spec << " DATA size=" << packet.getLength()
<< " seq=" << packet.getSeqNo();
if (packet.getRexmitFlag())
spec << " [REXMIT]";
spec << " type=DATA"
<< " %" << packet.getSeqNo()
<< " msgno=" << MSGNO_SEQ::unwrap(packet.m_iMsgNo)
<< packet.MessageFlagStr()
<< " !" << BufferStamp(packet.m_pcData, packet.getLength());
}

HLOGC(mglog.Debug, log << "CChannel::sendto: SENDING NOW DST=" << SockaddrToString(addr)
<< " target=%" << packet.m_iID
LOGC(mglog.Debug, log << "CChannel::sendto: SENDING NOW DST=" << SockaddrToString(addr)
<< " target=@" << packet.m_iID
<< " size=" << packet.getLength()
<< " pkt.ts=" << FormatTime(packet.m_iTimeStamp)
<< spec.str());
#endif

Expand Down
12 changes: 4 additions & 8 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ modified by

using namespace std;

#if ENABLE_HEAVY_LOGGING
#define IF_HEAVY_LOGGING(instr) instr
#else
#define IF_HEAVY_LOGGING(instr) (void)0
#endif

namespace srt_logging
{

Expand Down Expand Up @@ -8560,8 +8554,10 @@ int CUDT::processData(CUnit *in_unit)
}

HLOGC(mglog.Debug,
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " offset=" << offset << " (" << exc_type << "/"
<< rexmitstat[pktrexmitflag] << rexmit_reason << ") FLAGS: " << packet.MessageFlagStr());
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " offset=" << offset
<< " BUFr=" << avail_bufsize
<< " (" << exc_type << "/" << rexmitstat[pktrexmitflag] << rexmit_reason << ") FLAGS: "
<< packet.MessageFlagStr());

// Decryption should have made the crypto flags EK_NOENC.
// Otherwise it's an error.
Expand Down
50 changes: 48 additions & 2 deletions srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,22 @@ modified by

#include "common.h"
#include "epoll.h"
#include "logging.h"
#include "udt.h"

using namespace std;

namespace srt_logging
{
extern Logger mglog;
}

using namespace srt_logging;

#if ENABLE_HEAVY_LOGGING
#define IF_DIRNAME(tested, flag, name) (tested & flag ? name : "")
#endif

CEPoll::CEPoll():
m_iIDSeed(0)
{
Expand Down Expand Up @@ -456,6 +468,9 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
int total = 0;

int64_t entertime = CTimer::getTime();

HLOGC(mglog.Debug, log << "CEPoll::wait: START for eid=" << eid);

while (true)
{
{
Expand Down Expand Up @@ -485,10 +500,13 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

IF_HEAVY_LOGGING(int total_noticed);
IF_HEAVY_LOGGING(ostringstream debug_sockets);
// Sockets with exceptions are returned to both read and write sets.
for (CEPollDesc::enotice_t::iterator it = ed.enotice_begin(), it_next = it; it != ed.enotice_end(); it = it_next)
{
++it_next;
IF_HEAVY_LOGGING(++total_noticed);
if (readfds && ((it->events & UDT_EPOLL_IN) || (it->events & UDT_EPOLL_ERR)))
{
if (readfds->insert(it->fd).second)
Expand All @@ -501,16 +519,28 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
++total;
}

ed.checkEdge(it); // NOTE: potentially erases 'it'.
IF_HEAVY_LOGGING(debug_sockets << " " << it->fd << ":"
<< IF_DIRNAME(it->events, SRT_EPOLL_IN, "R")
<< IF_DIRNAME(it->events, SRT_EPOLL_OUT, "W")
<< IF_DIRNAME(it->events, SRT_EPOLL_ERR, "E"));

if (ed.checkEdge(it)) // NOTE: potentially erases 'it'.
{
IF_HEAVY_LOGGING(debug_sockets << "!");
}
}

HLOGC(mglog.Debug, log << "CEPoll::wait: REPORTED " << total << "/" << total_noticed
<< debug_sockets.str());

if (lrfds || lwfds)
{
#ifdef LINUX
const int max_events = ed.m_sLocals.size();
epoll_event ev[max_events];
int nfds = ::epoll_wait(ed.m_iLocalID, ev, max_events, 0);

IF_HEAVY_LOGGING(const int prev_total = total);
for (int i = 0; i < nfds; ++ i)
{
if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
Expand All @@ -524,12 +554,15 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
++ total;
}
}
HLOGC(mglog.Debug, log << "CEPoll::wait: LINUX: picking up " << (total - prev_total) << " ready fds.");

#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
struct timespec tmout = {0, 0};
const int max_events = ed.m_sLocals.size();
struct kevent ke[max_events];

int nfds = kevent(ed.m_iLocalID, NULL, 0, ke, max_events, &tmout);
IF_HEAVY_LOGGING(const int prev_total = total);

for (int i = 0; i < nfds; ++ i)
{
Expand All @@ -544,6 +577,9 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
++ total;
}
}

HLOGC(mglog.Debug, log << "CEPoll::wait: Darwin/BSD: picking up " << (total - prev_total) << " ready fds.");

#else
//currently "select" is used for all non-Linux platforms.
//faster approaches can be applied for specific systems in the future.
Expand All @@ -566,6 +602,7 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
max_fd = *i;
}

IF_HEAVY_LOGGING(const int prev_total = total);
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
Expand All @@ -585,18 +622,27 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
}
}
}

HLOGC(mglog.Debug, log << "CEPoll::wait: select(otherSYS): picking up " << (total - prev_total) << " ready fds.");
#endif
}

} // END-LOCK: m_EPollLock

HLOGC(mglog.Debug, log << "CEPoll::wait: Total of " << total << " READY SOCKETS");

if (total > 0)
return total;

if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000)))
{
HLOGP(mglog.Debug, "... not waiting longer - timeout");
throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
}

CTimer::waitForEvent();
CTimer::EWait wt ATR_UNUSED = CTimer::waitForEvent();
HLOGC(mglog.Debug, log << "CEPoll::wait: EVENT WAITING: "
<< (wt == CTimer::WT_TIMEOUT ? "CHECKPOINT" : wt == CTimer::WT_EVENT ? "TRIGGERED" : "ERROR"));
}

return 0;
Expand Down
6 changes: 5 additions & 1 deletion srtcore/epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,18 @@ struct CEPollDesc
}
}

void checkEdge(enotice_t::iterator i)
bool checkEdge(enotice_t::iterator i)
{
// This function should check if this event was subscribed
// as edge-triggered, and if so, clear the event from the notice.
// Update events and check edge mode at the subscriber
i->events &= ~i->parent->edgeOnly();
if(!i->events)
{
removeExistingNotices(*i->parent);
return true;
}
return false;
}
};

Expand Down
4 changes: 4 additions & 0 deletions srtcore/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ written by
#define HLOGP LOGP
#define HLOGF LOGF

#define IF_HEAVY_LOGGING(instr) instr

#else

#define HLOGC(...)
#define HLOGF(...)
#define HLOGP(...)

#define IF_HEAVY_LOGGING(instr) (void)0

#endif

#else
Expand Down

0 comments on commit aaef22a

Please sign in to comment.