From 2a9a3403ae60db017dd3a4296bc883dae2347fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Fri, 29 Sep 2017 13:20:58 +0200 Subject: [PATCH 1/2] General refactoring around existing API --- common/socketoptions.cpp | 3 +- common/utilities.h | 12 + haicrypt/hc_openssl_aes.c | 6 +- haicrypt/hc_openssl_evp_ctr.c | 2 +- haicrypt/hcrypt.h | 6 + haicrypt/hcrypt_ctx_tx.c | 2 +- srtcore/api.cpp | 26 -- srtcore/buffer.cpp | 519 +++++++++++++++++++++------------- srtcore/buffer.h | 14 +- srtcore/common.cpp | 6 +- srtcore/common.h | 7 +- srtcore/core.cpp | 150 +++++----- srtcore/core.h | 48 ++-- srtcore/crypto.cpp | 2 +- srtcore/crypto.h | 4 +- srtcore/epoll.cpp | 16 +- srtcore/epoll.h | 22 +- srtcore/handshake.h | 49 ++-- srtcore/logging.h | 11 +- srtcore/packet.h | 4 +- srtcore/queue.cpp | 18 +- srtcore/queue.h | 14 +- srtcore/smoother.cpp | 42 ++- srtcore/srt.h | 80 ++++-- srtcore/srt4udt.h | 7 - srtcore/srt_c_api.cpp | 58 ++-- srtcore/udt.h | 5 - 27 files changed, 649 insertions(+), 484 deletions(-) diff --git a/common/socketoptions.cpp b/common/socketoptions.cpp index bd5769d5b..696ba387e 100644 --- a/common/socketoptions.cpp +++ b/common/socketoptions.cpp @@ -9,8 +9,7 @@ extern const set false_names = { "0", "no", "off", "false" }; extern const std::map enummap_transtype = { { "live", SRTT_LIVE }, - { "vod", SRTT_VOD }, - { "file", SRTT_VOD } + { "file", SRTT_FILE } }; SocketOption::Mode SrtConfigurePre(SRTSOCKET socket, string host, map options, vector* failures) diff --git a/common/utilities.h b/common/utilities.h index 013387fb5..4e7e0a578 100644 --- a/common/utilities.h +++ b/common/utilities.h @@ -256,6 +256,9 @@ struct ref_t: public std::reference_wrapper { this->get() = i; } + + T operator->() const + { return this->get(); } }; // This alias was created so that 'Ref' (not 'ref') is used everywhere. @@ -370,6 +373,9 @@ class ref_t Type& get() const { return *m_data; } + + Type operator->() const + { return *m_data; } }; template @@ -609,4 +615,10 @@ inline size_t safe_advance(It& it, size_t num, It end) template inline ATR_CONSTEXPR size_t Size(const V (&)[N]) ATR_NOEXCEPT { return N; } +template +inline ValueType avg_iir(ValueType old_value, ValueType new_value) +{ + return (old_value*(DEPRLEN-1) + new_value)/DEPRLEN; +} + #endif diff --git a/haicrypt/hc_openssl_aes.c b/haicrypt/hc_openssl_aes.c index 07b26fb73..74d9a39a6 100644 --- a/haicrypt/hc_openssl_aes.c +++ b/haicrypt/hc_openssl_aes.c @@ -209,9 +209,9 @@ static int hcOpenSSL_AES_SetKey(hcrypt_CipherData *cipher_data, hcrypt_Ctx *ctx, } static int hcOpenSSL_AES_Encrypt( - hcrypt_CipherData *cipher_data, + hcrypt_CipherData *cipher_data, hcrypt_Ctx *ctx, - hcrypt_DataDesc *in_data, int nbin, + hcrypt_DataDesc *in_data, int nbin ATR_UNUSED, void *out_p[], size_t out_len_p[], int *nbout_p) { hcOpenSSL_AES_data *aes_data = (hcOpenSSL_AES_data *)cipher_data; @@ -356,7 +356,7 @@ static int hcOpenSSL_AES_Encrypt( static int hcOpenSSL_AES_Decrypt(hcrypt_CipherData *cipher_data, hcrypt_Ctx *ctx, - hcrypt_DataDesc *in_data, int nbin, void *out_p[], size_t out_len_p[], int *nbout_p) + hcrypt_DataDesc *in_data, int nbin ATR_UNUSED, void *out_p[], size_t out_len_p[], int *nbout_p) { hcOpenSSL_AES_data *aes_data = (hcOpenSSL_AES_data *)cipher_data; unsigned char *out_txt; diff --git a/haicrypt/hc_openssl_evp_ctr.c b/haicrypt/hc_openssl_evp_ctr.c index 3adbf202b..5e4ccfb5b 100644 --- a/haicrypt/hc_openssl_evp_ctr.c +++ b/haicrypt/hc_openssl_evp_ctr.c @@ -235,7 +235,7 @@ static int hcOpenSSL_EVP_CTR_SetKey(hcrypt_CipherData *cipher_data, hcrypt_Ctx * } static int hcOpenSSL_EVP_CTR_Crypt(hcrypt_CipherData *cipher_data, hcrypt_Ctx *ctx, - hcrypt_DataDesc *in_data, int nbin, void *out_p[], size_t out_len_p[], int *nbout_p) + hcrypt_DataDesc *in_data, int nbin ATR_UNUSED, void *out_p[], size_t out_len_p[], int *nbout_p) { hcOpenSSL_EVP_CTR_data *evp_data = (hcOpenSSL_EVP_CTR_data *)cipher_data; unsigned char iv[HCRYPT_EVP_CTR_BLK_SZ]; diff --git a/haicrypt/hcrypt.h b/haicrypt/hcrypt.h index 093aebe31..88ea4c558 100644 --- a/haicrypt/hcrypt.h +++ b/haicrypt/hcrypt.h @@ -48,6 +48,12 @@ written by #include #endif +#ifdef __GNUC__ +#define ATR_UNUSED __attribute__((unused)) +#else +#define ATR_UNUSED +#endif + #include "haicrypt.h" #include "hcrypt_msg.h" #include "hcrypt_ctx.h" diff --git a/haicrypt/hcrypt_ctx_tx.c b/haicrypt/hcrypt_ctx_tx.c index d91d20ac8..1fcacf581 100644 --- a/haicrypt/hcrypt_ctx_tx.c +++ b/haicrypt/hcrypt_ctx_tx.c @@ -333,7 +333,7 @@ int hcryptCtx_Tx_ManageKM(hcrypt_Session *crypto) } int hcryptCtx_Tx_InjectKM(hcrypt_Session *crypto, - void *out_p[], size_t out_len_p[], int maxout) + void *out_p[], size_t out_len_p[], int maxout ATR_UNUSED) { int i, nbout = 0; diff --git a/srtcore/api.cpp b/srtcore/api.cpp index cec26fe9a..cb2b0820f 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -2161,7 +2161,6 @@ int CUDT::recv(SRTSOCKET u, char* buf, int len, int) } } -#ifdef SRT_ENABLE_SRCTIMESTAMP int CUDT::sendmsg( SRTSOCKET u, const char* buf, int len, int ttl, bool inorder, uint64_t srctime) @@ -2170,15 +2169,6 @@ int CUDT::sendmsg( { CUDT* udt = s_UDTUnited.lookup(u); return udt->sendmsg(buf, len, ttl, inorder, srctime); -#else -int CUDT::sendmsg( - SRTSOCKET u, const char* buf, int len, int ttl, bool inorder) -{ - try - { - CUDT* udt = s_UDTUnited.lookup(u); - return udt->sendmsg(buf, len, ttl, inorder); -#endif } catch (CUDTException e) { @@ -2222,7 +2212,6 @@ int CUDT::recvmsg(SRTSOCKET u, char* buf, int len) } } -#ifdef SRT_ENABLE_SRCTIMESTAMP int CUDT::recvmsg(SRTSOCKET u, char* buf, int len, uint64_t& srctime) { try @@ -2244,7 +2233,6 @@ int CUDT::recvmsg(SRTSOCKET u, char* buf, int len, uint64_t& srctime) return ERROR; } } -#endif int64_t CUDT::sendfile( SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block) @@ -2766,7 +2754,6 @@ int recv(SRTSOCKET u, char* buf, int len, int flags) return CUDT::recv(u, buf, len, flags); } -#ifdef SRT_ENABLE_SRCTIMESTAMP int sendmsg( SRTSOCKET u, const char* buf, int len, int ttl, bool inorder, @@ -2781,19 +2768,6 @@ int recvmsg(SRTSOCKET u, char* buf, int len, uint64_t& srctime) return CUDT::recvmsg(u, buf, len, srctime); } -#else -int sendmsg( - SRTSOCKET u, - const char* buf, - int len, - int ttl, - bool inorder, - uint64_t /*ignored*/) -{ - return CUDT::sendmsg(u, buf, len, ttl, inorder); -} - -#endif int recvmsg(SRTSOCKET u, char* buf, int len) { diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 113f2f193..ccf905680 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -83,7 +83,7 @@ m_iSize(size), m_iMSS(mss), m_iCount(0) ,m_iBytesCount(0) -,m_LastOriginTime(0) +,m_ullLastOriginTime_us(0) #ifdef SRT_ENABLE_SNDBUFSZ_MAVG ,m_LastSamplingTime(0) ,m_iCountMAvg(0) @@ -150,19 +150,20 @@ CSndBuffer::~CSndBuffer() pthread_mutex_destroy(&m_BufLock); } -#ifdef SRT_ENABLE_SRCTIMESTAMP void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order, uint64_t srctime) -#else -void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order) -#endif { int size = len / m_iMSS; if ((len % m_iMSS) != 0) size ++; + LOGC(mglog.Debug) << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " << len << " bytes"; + // dynamically increase sender buffer while (size + m_iCount >= m_iSize) + { + LOGC(mglog.Debug) << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers..."; increase(); + } uint64_t time = CTimer::getTime(); int32_t inorder = order ? MSGNO_PACKET_INORDER::mask : 0; @@ -194,10 +195,8 @@ void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order) // [PB_FIRST] [PB_LAST] - 2 packets per message // [PB_SOLO] - 1 packet per message -#ifdef SRT_ENABLE_SRCTIMESTAMP - s->m_SourceTime = srctime; -#endif - s->m_OriginTime = time; + s->m_ullSourceTime_us = srctime; + s->m_ullOriginTime_us = time; s->m_iTTL = ttl; // XXX unchecked condition: s->m_pNext == NULL. @@ -211,9 +210,9 @@ void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order) m_iBytesCount += len; #ifdef SRT_ENABLE_CBRTIMESTAMP - m_LastOriginTime = srctime; + m_ullLastOriginTime_us = srctime; #else - m_LastOriginTime = time; + m_ullLastOriginTime_us = time; #endif /* SRT_ENABLE_CBRTIMESTAMP */ updInputRate(time, size, len); @@ -302,9 +301,17 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len) if ((len % m_iMSS) != 0) size ++; + LOGC(mglog.Debug) << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " << len << " bytes"; + // dynamically increase sender buffer while (size + m_iCount >= m_iSize) + { + LOGC(mglog.Debug) << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers..."; increase(); + } + + LOGC(dlog.Debug) << CONID() << "addBufferFromFile: adding " + << size << " packets (" << len << " bytes) to send, msgno=" << m_iNextMsgNo; Block* s = m_pLastBlock; int total = 0; @@ -317,6 +324,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len) if (pktlen > m_iMSS) pktlen = m_iMSS; + LOGC(dlog.Debug) << "addBufferFromFile: reading from=" << (i*m_iMSS) << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData; ifs.read(s->m_pcData, pktlen); if ((pktlen = int(ifs.gcount())) <= 0) break; @@ -387,11 +395,9 @@ int CSndBuffer::readData(char** data, int32_t& msgno_bitset, uint64_t& srctime, m_pCurrBlock->m_iMsgNoBitset |= MSGNO_ENCKEYSPEC::wrap(kflgs); msgno_bitset = m_pCurrBlock->m_iMsgNoBitset; - srctime = -#ifdef SRT_ENABLE_SRCTIMESTAMP - m_pCurrBlock->m_SourceTime ? m_pCurrBlock->m_SourceTime : -#endif /* SRT_ENABLE_SRCTIMESTAMP */ - m_pCurrBlock->m_OriginTime; + srctime = + m_pCurrBlock->m_ullSourceTime_us ? m_pCurrBlock->m_ullSourceTime_us : + m_pCurrBlock->m_ullOriginTime_us; m_pCurrBlock = m_pCurrBlock->m_pNext; @@ -425,7 +431,7 @@ int CSndBuffer::readData(char** data, const int offset, int32_t& msgno_bitset, u // if found block is stale // (This is for messages that have declared TTL - messages that fail to be sent // before the TTL defined time comes, will be dropped). - if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_OriginTime) / 1000 > (uint64_t)p->m_iTTL)) + if ((p->m_iTTL >= 0) && ((CTimer::getTime() - p->m_ullOriginTime_us) / 1000 > (uint64_t)p->m_iTTL)) { int32_t msgno = p->getMsgSeq(); msglen = 1; @@ -462,10 +468,8 @@ int CSndBuffer::readData(char** data, const int offset, int32_t& msgno_bitset, u msgno_bitset = p->m_iMsgNoBitset; srctime = -#ifdef SRT_ENABLE_SRCTIMESTAMP - p->m_SourceTime ? p->m_SourceTime : -#endif /* SRT_ENABLE_SRCTIMESTAMP */ - p->m_OriginTime; + p->m_ullSourceTime_us ? p->m_ullSourceTime_us : + p->m_ullOriginTime_us; LOGC(dlog.Debug) << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send [REXMIT]"; @@ -562,9 +566,9 @@ int CSndBuffer::getCurrBufSize(ref_t bytes, ref_t timespan) * Therefore, always add 1 ms if not empty. */ #ifdef SRT_ENABLE_CBRTIMESTAMP - timespan = 0 < m_iCount ? int((m_LastOriginTime - m_pFirstBlock->m_SourceTime) / 1000) + 1 : 0; + timespan = 0 < m_iCount ? int((m_ullLastOriginTime_us - m_pFirstBlock->m_ullSourceTime_us) / 1000) + 1 : 0; #else - timespan = 0 < m_iCount ? int((m_LastOriginTime - m_pFirstBlock->m_OriginTime) / 1000) + 1 : 0; + timespan = 0 < m_iCount ? int((m_ullLastOriginTime_us - m_pFirstBlock->m_ullOriginTime_us) / 1000) + 1 : 0; #endif return m_iCount; @@ -577,7 +581,7 @@ int CSndBuffer::dropLateData(int &bytes, uint64_t latetime) bool move = false; CGuard bufferguard(m_BufLock); - for (int i = 0; i < m_iCount && m_pFirstBlock->m_OriginTime < latetime; ++ i) + for (int i = 0; i < m_iCount && m_pFirstBlock->m_ullOriginTime_us < latetime; ++ i) { dpkts++; dbytes += m_pFirstBlock->m_iLength; @@ -642,8 +646,6 @@ void CSndBuffer::increase() pb = pb->m_pNext; } - LOGC(dlog.Debug) << "CSndBuffer: BUFFER FULL - adding " << (unitsize*m_iMSS) << " bytes spread to " << unitsize << " blocks"; - // insert the new blocks onto the existing one pb->m_pNext = m_pLastBlock->m_pNext; m_pLastBlock->m_pNext = nblk; @@ -658,6 +660,10 @@ void CSndBuffer::increase() } m_iSize += unitsize; + + LOGC(dlog.Debug) << "CSndBuffer: BUFFER FULL - adding " << (unitsize*m_iMSS) << " bytes spread to " << unitsize << " blocks" + << " (total size: " << m_iSize << " bytes)"; + } //////////////////////////////////////////////////////////////////////////////// @@ -928,27 +934,67 @@ void CRcvBuffer::skipData(int len) bool CRcvBuffer::getRcvFirstMsg(ref_t tsbpdtime, ref_t passack, ref_t skipseqno, CPacket** pppkt) { skipseqno = -1; + passack = false; + // tsbpdtime will be retrieved by the below call + // Returned values: + // - tsbpdtime: real time when the packet is ready to play (whether ready to play or not) + // - passack: false (the report concerns a packet with an exactly next sequence) + // - skipseqno == -1: no packets to skip towards the first RTP + // - ppkt: that exactly packet is reported (for debugging purposes) + // - @return: whether the reported packet is ready to play /* Check the acknowledged packets */ if (getRcvReadyMsg(tsbpdtime, pppkt)) { - passack = false; return true; } else if (tsbpdtime != 0) { - passack = false; return false; } + // getRcvReadyMsg returned false and tsbpdtime == 0. + + // Below this line we have only two options: + // - m_iMaxPos == 0, which means that no more packets are in the buffer + // - returned: tsbpdtime=0, passack=true, skipseqno=-1, ppkt=0, @return false + // - m_iMaxPos > 0, which means that there are packets arrived after a lost packet: + // - returned: tsbpdtime=PKT.TS, passack=true, skipseqno=PKT.SEQ, ppkt=PKT, @return LOCAL(PKT.TS) <= NOW + /* * No acked packets ready but caller want to know next packet to wait for * Check the not yet acked packets that may be stuck by missing packet(s). */ bool haslost = false; - tsbpdtime = 0; + tsbpdtime = 0; // redundant, for clarity passack = true; - skipseqno = -1; + + // XXX SUSPECTED ISSUE with this algorithm: + // The above call to getRcvReadyMsg() should report as to whether: + // - there is an EXACTLY NEXT SEQUENCE packet + // - this packet is ready to play. + // + // Situations handled after the call are when: + // - there's the next sequence packet available and it is ready to play + // - there are no packets at all, ready to play or not + // + // So, the remaining situation is that THERE ARE PACKETS that follow + // the current sequence, but they are not ready to play. This includes + // packets that have the exactly next sequence and packets that jump + // over a lost packet. + // + // As the getRcvReadyMsg() function walks through the incoming units + // to see if there's anything that satisfies these conditions, it *SHOULD* + // be also capable of checking if the next available packet, if it is + // there, is the next sequence packet or not. Retrieving this exactly + // packet would be most useful, as the test for play-readiness and + // "monotonicness" can be done on it directly. + // + // When done so, the below loop would be completely unnecessary. + + // Logical description of the below algorithm: + // 1. Check if the VERY FIRST PACKET is valid; if so then: + // - check if it's ready to play, return boolean value that marks it. for (int i = m_iLastAckPos, n = (m_iLastAckPos + m_iMaxPos) % m_iSize; i != n; i = (i + 1) % m_iSize) { @@ -972,11 +1018,25 @@ bool CRcvBuffer::getRcvFirstMsg(ref_t tsbpdtime, ref_t passack, * Tell 1st valid packet seqno so caller can skip (drop) the missing packets. */ skipseqno = m_pUnit[i]->m_Packet.m_iSeqNo; + if ( pppkt ) + *pppkt = &m_pUnit[i]->m_Packet; } + + // NOTE: if haslost is not set, it means that this is the VERY FIRST + // packet, that is, packet currently at pos = m_iLastAckPos. There's no + // possibility that it is so otherwise because: + // - if this first good packet is ready to play, THIS HERE RETURNS NOW. + // ... return true; } + // ... and if this first good packet WASN'T ready to play, THIS HERE RETURNS NOW, TOO, + // just states that there's no ready packet to play. + // ... return false; } + // ... and if this first packet WASN'T GOOD, the loop continues, however since now + // the 'haslost' is set, which means that it continues only to find the first valid + // packet after stating that the very first packet isn't valid. } return false; } @@ -1389,7 +1449,7 @@ void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg) } #endif /* SRT_DEBUG_TSBPD_DRIFT */ -void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp) +void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mutex_to_lock) { if (!m_bTsbPdMode) // Not checked unless in TSBPD mode return; @@ -1412,6 +1472,9 @@ void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp) // either schedule time or a time supplied by the application). int64_t iDrift = CTimer::getTime() - (getTsbPdTimeBase(timestamp) + timestamp); + + CGuard::enterCS(mutex_to_lock); + bool updated = m_DriftTracer.update(iDrift); #ifdef SRT_DEBUG_TSBPD_DRIFT @@ -1427,6 +1490,7 @@ void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp) m_ullTsbPdTimeBase += m_DriftTracer.overdrift(); } + CGuard::leaveCS(mutex_to_lock); } int CRcvBuffer::readMsg(char* data, int len) @@ -1438,223 +1502,272 @@ int CRcvBuffer::readMsg(char* data, int len) int CRcvBuffer::readMsg(char* data, int len, uint64_t& tsbpdtime) { - int p, q; - bool passack; - bool empty = true; + int p, q; + bool passack; + bool empty = true; - if (m_bTsbPdMode) - { - passack = false; + if (m_bTsbPdMode) + { + passack = false; - if (getRcvReadyMsg(Ref(tsbpdtime))) - { - empty = false; - p = q = m_iStartPos; + if (getRcvReadyMsg(Ref(tsbpdtime))) + { + empty = false; + + // In TSBPD mode you always read one message + // at a time and a message always fits in one UDP packet, + // so in one "unit". + p = q = m_iStartPos; #ifdef SRT_DEBUG_TSBPD_OUTJITTER - uint64_t now = CTimer::getTime(); - if ((now - tsbpdtime)/10 < 10) - m_ulPdHisto[0][(now - tsbpdtime)/10]++; - else if ((now - tsbpdtime)/100 < 10) - m_ulPdHisto[1][(now - tsbpdtime)/100]++; - else if ((now - tsbpdtime)/1000 < 10) - m_ulPdHisto[2][(now - tsbpdtime)/1000]++; - else - m_ulPdHisto[3][1]++; + uint64_t now = CTimer::getTime(); + if ((now - tsbpdtime)/10 < 10) + m_ulPdHisto[0][(now - tsbpdtime)/10]++; + else if ((now - tsbpdtime)/100 < 10) + m_ulPdHisto[1][(now - tsbpdtime)/100]++; + else if ((now - tsbpdtime)/1000 < 10) + m_ulPdHisto[2][(now - tsbpdtime)/1000]++; + else + m_ulPdHisto[3][1]++; #endif /* SRT_DEBUG_TSBPD_OUTJITTER */ - } - } - else - { - tsbpdtime = 0; - if (scanMsg(p, q, passack)) - empty = false; + } + } + else + { + tsbpdtime = 0; + if (scanMsg(p, q, passack)) + empty = false; - } + } - if (empty) - return 0; + if (empty) + return 0; - int rs = len; - while (p != (q + 1) % m_iSize) - { - int unitsize = m_pUnit[p]->m_Packet.getLength(); - if ((rs >= 0) && (unitsize > rs)) - unitsize = rs; + int rs = len; + while (p != (q + 1) % m_iSize) + { + int unitsize = m_pUnit[p]->m_Packet.getLength(); + if ((rs >= 0) && (unitsize > rs)) + unitsize = rs; - if (unitsize > 0) - { - memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize); - data += unitsize; - rs -= unitsize; - /* we removed bytes form receive buffer */ - countBytes(-1, -unitsize, true); + if (unitsize > 0) + { + memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize); + data += unitsize; + rs -= unitsize; + /* we removed bytes form receive buffer */ + countBytes(-1, -unitsize, true); #if ENABLE_LOGGING - { - static uint64_t prev_now; - static uint64_t prev_srctime; + { + static uint64_t prev_now; + static uint64_t prev_srctime; - int32_t seq = m_pUnit[p]->m_Packet.m_iSeqNo; + int32_t seq = m_pUnit[p]->m_Packet.m_iSeqNo; - uint64_t nowtime = CTimer::getTime(); - //CTimer::rdtsc(nowtime); - uint64_t srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()); + uint64_t nowtime = CTimer::getTime(); + //CTimer::rdtsc(nowtime); + uint64_t srctime = getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()); - int64_t timediff = nowtime - srctime; - int64_t nowdiff = prev_now ? (nowtime - prev_now) : 0; - uint64_t srctimediff = prev_srctime ? (srctime - prev_srctime) : 0; + int64_t timediff = nowtime - srctime; + int64_t nowdiff = prev_now ? (nowtime - prev_now) : 0; + uint64_t srctimediff = prev_srctime ? (srctime - prev_srctime) : 0; - LOGC(dlog.Debug) << CONID() << "readMsg: DELIVERED seq=" << seq << " T=" << logging::FormatTime(srctime) << " in " << (timediff/1000.0) << "ms - " - "TIME-PREVIOUS: PKT: " << (srctimediff/1000.0) << " LOCAL: " << (nowdiff/1000.0); + LOGC(dlog.Debug) << CONID() << "readMsg: DELIVERED seq=" << seq << " T=" << logging::FormatTime(srctime) << " in " << (timediff/1000.0) << "ms - " + "TIME-PREVIOUS: PKT: " << (srctimediff/1000.0) << " LOCAL: " << (nowdiff/1000.0); - prev_now = nowtime; - prev_srctime = srctime; - } + prev_now = nowtime; + prev_srctime = srctime; + } #endif - } + } - if (!passack) - { - CUnit* tmp = m_pUnit[p]; - m_pUnit[p] = NULL; - tmp->m_iFlag = CUnit::FREE; - -- m_pUnitQueue->m_iCount; - } - else - m_pUnit[p]->m_iFlag = CUnit::PASSACK; + if (!passack) + { + CUnit* tmp = m_pUnit[p]; + m_pUnit[p] = NULL; + tmp->m_iFlag = CUnit::FREE; + -- m_pUnitQueue->m_iCount; + } + else + m_pUnit[p]->m_iFlag = CUnit::PASSACK; - if (++ p == m_iSize) - p = 0; - } + if (++ p == m_iSize) + p = 0; + } - if (!passack) - m_iStartPos = (q + 1) % m_iSize; + if (!passack) + m_iStartPos = (q + 1) % m_iSize; - return len - rs; + return len - rs; } bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack) { - // empty buffer - if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0)) - return false; - - int rmpkts = 0; - int rmbytes = 0; - //skip all bad msgs at the beginning - while (m_iStartPos != m_iLastAckPos) - { - if (NULL == m_pUnit[m_iStartPos]) - { - if (++ m_iStartPos == m_iSize) - m_iStartPos = 0; - continue; - } + // empty buffer + if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0)) + return false; - // Note: PB_FIRST | PB_LAST == PB_SOLO. - // testing if boundary() & PB_FIRST tests if the msg is first OR solo. - if ( m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD - && m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST ) - { - bool good = true; + int rmpkts = 0; + int rmbytes = 0; + //skip all bad msgs at the beginning + while (m_iStartPos != m_iLastAckPos) + { + if (NULL == m_pUnit[m_iStartPos]) + { + if (++ m_iStartPos == m_iSize) + m_iStartPos = 0; + continue; + } - // look ahead for the whole message - for (int i = m_iStartPos; i != m_iLastAckPos;) - { - if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD) + // Note: PB_FIRST | PB_LAST == PB_SOLO. + // testing if boundary() & PB_FIRST tests if the msg is first OR solo. + if ( m_pUnit[m_iStartPos]->m_iFlag == CUnit::GOOD + && m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() & PB_FIRST ) + { + bool good = true; + + // look ahead for the whole message + + // We expect to see either of: + // [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST] + // [PB_SOLO] + // but not: + // [PB_FIRST] NULL ... + // [PB_FIRST] FREE/PASSACK/DROPPED... + // If the message didn't look as expected, interrupt this. + + // This begins with a message starting at m_iStartPos + // up to m_iLastAckPos OR until the PB_LAST message is found. + // If any of the units on this way isn't good, this OUTER loop + // will be interrupted. + for (int i = m_iStartPos; i != m_iLastAckPos;) { - good = false; - break; + if (!m_pUnit[i] || m_pUnit[i]->m_iFlag != CUnit::GOOD) + { + good = false; + break; + } + + // Likewise, boundary() & PB_LAST will be satisfied for last OR solo. + if ( m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST ) + break; + + if (++ i == m_iSize) + i = 0; } - // Likewise, boundary() & PB_LAST will be satisfied for last OR solo. - if ( m_pUnit[i]->m_Packet.getMsgBoundary() & PB_LAST ) - break; + if (good) + break; + } - if (++ i == m_iSize) - i = 0; - } + CUnit* tmp = m_pUnit[m_iStartPos]; + m_pUnit[m_iStartPos] = NULL; + rmpkts++; + rmbytes += tmp->m_Packet.getLength(); + tmp->m_iFlag = CUnit::FREE; + -- m_pUnitQueue->m_iCount; - if (good) - break; - } + if (++ m_iStartPos == m_iSize) + m_iStartPos = 0; + } + /* we removed bytes form receive buffer */ + countBytes(-rmpkts, -rmbytes, true); - CUnit* tmp = m_pUnit[m_iStartPos]; - m_pUnit[m_iStartPos] = NULL; - rmpkts++; - rmbytes += tmp->m_Packet.getLength(); - tmp->m_iFlag = CUnit::FREE; - -- m_pUnitQueue->m_iCount; + // Not sure if this is correct, but this above 'while' loop exits + // under the following conditions only: + // - m_iStartPos == m_iLastAckPos (that makes passack = true) + // - found at least GOOD unit with PB_FIRST and not all messages up to PB_LAST are good, + // in which case it returns with m_iStartPos <% m_iLastAckPos (earlier) + // Also all units that lied before m_iStartPos are removed. - if (++ m_iStartPos == m_iSize) - m_iStartPos = 0; - } - /* we removed bytes form receive buffer */ - countBytes(-rmpkts, -rmbytes, true); + p = -1; // message head + q = m_iStartPos; // message tail + passack = m_iStartPos == m_iLastAckPos; + bool found = false; - p = -1; // message head - q = m_iStartPos; // message tail - passack = m_iStartPos == m_iLastAckPos; - bool found = false; + // looking for the first message + //>>m_pUnit[size + m_iMaxPos] is not valid - // looking for the first message - //>>m_pUnit[size + m_iMaxPos] is not valid - for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++ i) - { - if ((NULL != m_pUnit[q]) && (CUnit::GOOD == m_pUnit[q]->m_iFlag)) - { - switch (m_pUnit[q]->m_Packet.getMsgBoundary()) - { - case PB_SOLO: // 11 - p = q; - found = true; - break; + // XXX Would be nice to make some very thorough refactoring here. - case PB_FIRST: // 10 - p = q; - break; + // This rolls by q variable from m_iStartPos up to m_iLastAckPos, + // actually from the first message up to the one with PB_LAST + // or PB_SOLO boundary. - case PB_LAST: // 01 - if (p != -1) - found = true; - break; + // The 'i' variable used in this loop is just a stub, and the + // upper value is just to make it "virtually infinite, but with + // no exaggeration" (actually it makes sure that this loop does + // not roll more than around the whole cyclic container). This variable + // isn't used inside the loop at all. - case PB_SUBSEQUENT: - ; // do nothing - } - } - else - { - // a hole in this message, not valid, restart search - p = -1; - } + for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i < n; ++ i) + { + if ((NULL != m_pUnit[q]) && (CUnit::GOOD == m_pUnit[q]->m_iFlag)) + { + // Equivalent pseudocode: + // PacketBoundary bound = m_pUnit[q]->m_Packet.getMsgBoundary(); + // if ( IsSet(bound, PB_FIRST) ) + // p = q; + // if ( IsSet(bound, PB_LAST) && p != -1 ) + // found = true; + // + // Not implemented this way because it uselessly check p for -1 + // also after setting it explicitly. + + switch (m_pUnit[q]->m_Packet.getMsgBoundary()) + { + case PB_SOLO: // 11 + p = q; + found = true; + break; + + case PB_FIRST: // 10 + p = q; + break; + + case PB_LAST: // 01 + if (p != -1) + found = true; + break; + + case PB_SUBSEQUENT: + ; // do nothing + } + } + else + { + // a hole in this message, not valid, restart search + p = -1; + } - if (found) - { - // the msg has to be ack'ed or it is allowed to read out of order, and was not read before - if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag()) - break; + // 'found' is set when the current iteration hit a message with PB_LAST + // (including PB_SOLO since the very first message). + if (found) + { + // the msg has to be ack'ed or it is allowed to read out of order, and was not read before + if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag()) + break; - found = false; - } + found = false; + } - if (++ q == m_iSize) - q = 0; + if (++ q == m_iSize) + q = 0; - if (q == m_iLastAckPos) - passack = true; - } + if (q == m_iLastAckPos) + passack = true; + } - // no msg found - if (!found) - { - // if the message is larger than the receiver buffer, return part of the message - if ((p != -1) && ((q + 1) % m_iSize == p)) - found = true; - } + // no msg found + if (!found) + { + // if the message is larger than the receiver buffer, return part of the message + if ((p != -1) && ((q + 1) % m_iSize == p)) + found = true; + } - return found; + return found; } diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 9d70c3a9b..d2f830f19 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -89,11 +89,7 @@ class CSndBuffer /// @param [in] ttl time to live in milliseconds /// @param [in] order if the block should be delivered in order, for DGRAM only -#ifdef SRT_ENABLE_SRCTIMESTAMP void addBuffer(const char* data, int len, int ttl = -1, bool order = false, uint64_t srctime = 0); -#else - void addBuffer(const char* data, int len, int ttl = -1, bool order = false); -#endif /// Read a block of data from file and insert it into the sending list. /// @param [in] ifs input file stream. @@ -156,10 +152,8 @@ class CSndBuffer int m_iLength; // length of the block int32_t m_iMsgNoBitset; // message number - uint64_t m_OriginTime; // original request time -#ifdef SRT_ENABLE_SRCTIMESTAMP - uint64_t m_SourceTime; -#endif + uint64_t m_ullOriginTime_us; // original request time + uint64_t m_ullSourceTime_us; int m_iTTL; // time to live (milliseconds) Block* m_pNext; // next block @@ -195,7 +189,7 @@ class CSndBuffer int m_iCount; // number of used blocks int m_iBytesCount; // number of payload bytes in queue - uint64_t m_LastOriginTime; + uint64_t m_ullLastOriginTime_us; #ifdef SRT_ENABLE_SNDBUFSZ_MAVG uint64_t m_LastSamplingTime; @@ -362,7 +356,7 @@ class CRcvBuffer /// Add packet timestamp for drift caclculation and compensation /// @param [in] timestamp packet time stamp - void addRcvTsbPdDriftSample(uint32_t timestamp); + void addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mutex_to_lock); #ifdef SRT_DEBUG_TSBPD_DRIFT void printDriftHistogram(int64_t iDrift); diff --git a/srtcore/common.cpp b/srtcore/common.cpp index 837ba5e7c..81fa24bb6 100644 --- a/srtcore/common.cpp +++ b/srtcore/common.cpp @@ -269,7 +269,7 @@ void CTimer::triggerEvent() pthread_cond_signal(&m_EventCond); } -void CTimer::waitForEvent() +CTimer::EWait CTimer::waitForEvent() { timeval now; timespec timeout; @@ -285,8 +285,10 @@ void CTimer::waitForEvent() timeout.tv_nsec = (now.tv_usec + 10000 - 1000000) * 1000; } pthread_mutex_lock(&m_EventLock); - pthread_cond_timedwait(&m_EventCond, &m_EventLock, &timeout); + int reason = pthread_cond_timedwait(&m_EventCond, &m_EventLock, &timeout); pthread_mutex_unlock(&m_EventLock); + + return reason == ETIMEDOUT ? WT_TIMEOUT : reason == 0 ? WT_EVENT : WT_ERROR; } void CTimer::sleep() diff --git a/srtcore/common.h b/srtcore/common.h index 1fd294b12..c2d6723e6 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -481,9 +481,14 @@ class CTimer static void triggerEvent(); + enum EWait {WT_EVENT, WT_ERROR, WT_TIMEOUT}; + /// wait for an event to br triggered by "triggerEvent". + /// @retval WT_EVENT The event has happened + /// @retval WT_TIMEOUT The event hasn't happened, the function exited due to timeout + /// @retval WT_ERROR The function has exit due to an error - static void waitForEvent(); + static EWait waitForEvent(); /// sleep for a short interval. exact sleep time does not matter diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 3a6320189..c796c5c54 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -115,7 +115,7 @@ logging::Logger rxlog(SRT_LOGFA_REXMIT, &srt_logger_config, "SRT.r"); CUDTUnited CUDT::s_UDTUnited; -const UDTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK; +const SRTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK; const int UDT::ERROR = CUDT::ERROR; // SRT Version constants @@ -592,7 +592,7 @@ void CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) break; case SRTO_PBKEYLEN: - case SRTO_SNDPBKEYLEN: + case _DEPRECATED_SRTO_SNDPBKEYLEN: if (m_bConnected) throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0); @@ -675,6 +675,12 @@ void CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) if (m_bConnected) throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0); + if (*(int*)optval > int(CPacket::SRT_MAX_PAYLOAD_SIZE)) + { + LOGC(mglog.Error) << "SRTO_PAYLOADSIZE: value exceeds SRT_LIVE_MAX_PLSIZE, maximum payload per MTU."; + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + m_zOPT_ExpPayloadSize = *(int*)optval; break; @@ -703,7 +709,7 @@ void CUDT::setOpt(SRT_SOCKOPT optName, const void* optval, int optlen) m_Smoother.select("live"); break; - case SRTT_VOD: + case SRTT_FILE: // File transfer mode: // - tsbpd: off // - latency: 0 @@ -992,12 +998,22 @@ void CUDT::getOpt(SRT_SOCKOPT optName, void* optval, int& optlen) } break; + case SRTO_MESSAGEAPI: + optlen = sizeof (bool); + *(bool*)optval = m_bMessageAPI; + break; + + case SRTO_PAYLOADSIZE: + optlen = sizeof (int); + *(int*)optval = m_zOPT_ExpPayloadSize; + break; + default: throw CUDTException(MJ_NOTSUP, MN_NONE, 0); } } -bool CUDT::setstreamid(UDTSOCKET u, const std::string& sid) +bool CUDT::setstreamid(SRTSOCKET u, const std::string& sid) { CUDT* that = getUDTHandle(u); if (!that) @@ -1013,7 +1029,7 @@ bool CUDT::setstreamid(UDTSOCKET u, const std::string& sid) return true; } -std::string CUDT::getstreamid(UDTSOCKET u) +std::string CUDT::getstreamid(SRTSOCKET u) { CUDT* that = getUDTHandle(u); if (!that) @@ -1028,15 +1044,15 @@ void CUDT::clearData() { // Initial sequence number, loss, acknowledgement, etc. int udpsize = m_iMSS - CPacket::UDP_HDR_SIZE; - m_zMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE; + m_iMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE; - LOGC(mglog.Debug) << "clearData: PAYLOAD SIZE: " << m_zMaxSRTPayloadSize; + LOGC(mglog.Debug) << "clearData: PAYLOAD SIZE: " << m_iMaxSRTPayloadSize; m_iEXPCount = 1; m_iBandwidth = 1; //pkts/sec // XXX use some constant for this 16 m_iDeliveryRate = 16; - m_iByteDeliveryRate = 16 * m_zMaxSRTPayloadSize; + m_iByteDeliveryRate = 16 * m_iMaxSRTPayloadSize; m_iAckSeqNo = 0; m_ullLastAckTime_tk = 0; @@ -1535,7 +1551,7 @@ bool CUDT::createSrtHandshake(ref_t r_pkt, ref_t r_hs, // Now use the original function to store the actual SRT_HS data // ra_size after that - // NOTE: so far, ra_size is m_zMaxSRTPayloadSize expressed in number of elements. + // NOTE: so far, ra_size is m_iMaxSRTPayloadSize expressed in number of elements. // WILL BE CHANGED HERE. ra_size = fillSrtHandshake(p+offset, total_ra_size - offset, srths_cmd, HS_VERSION_SRT1); *pcmdspec = HS_CMDSPEC_CMD::wrap(srths_cmd) | HS_CMDSPEC_SIZE::wrap(ra_size); @@ -1552,7 +1568,7 @@ bool CUDT::createSrtHandshake(ref_t r_pkt, ref_t r_hs, // Now prepare the string with 4-byte alignment. The string size is limited // to half the payload size. Just a sanity check to not pack too much into // the conclusion packet. - size_t size_limit = m_zMaxSRTPayloadSize/2; + size_t size_limit = m_iMaxSRTPayloadSize/2; if ( m_sStreamName.size() >= size_limit ) { @@ -2516,7 +2532,7 @@ void CUDT::startConnect(const sockaddr* serv_addr, int32_t forced_isn) // Inform the server my configurations. CPacket reqpkt; reqpkt.setControl(UMSG_HANDSHAKE); - reqpkt.allocate(m_zMaxSRTPayloadSize); + reqpkt.allocate(m_iMaxSRTPayloadSize); // XXX NOTE: Now the memory for the payload part is allocated automatically, // and such allocated memory is also automatically deallocated in the // destructor. If you use CPacket::allocate, remember that you must not: @@ -2531,7 +2547,7 @@ void CUDT::startConnect(const sockaddr* serv_addr, int32_t forced_isn) // ID = 0, connection request reqpkt.m_iID = 0; - size_t hs_size = m_zMaxSRTPayloadSize; + size_t hs_size = m_iMaxSRTPayloadSize; m_ConnReq.store_to(reqpkt.m_pcData, Ref(hs_size)); // Note that CPacket::allocate() sets also the size @@ -2575,7 +2591,7 @@ void CUDT::startConnect(const sockaddr* serv_addr, int32_t forced_isn) // next incoming packet. CPacket response; response.setControl(UMSG_HANDSHAKE); - response.allocate(m_zMaxSRTPayloadSize); + response.allocate(m_iMaxSRTPayloadSize); CUDTException e; @@ -2617,7 +2633,7 @@ void CUDT::startConnect(const sockaddr* serv_addr, int32_t forced_isn) } EConnectStatus cst = CONN_CONTINUE; - response.setLength(m_zMaxSRTPayloadSize); + response.setLength(m_iMaxSRTPayloadSize); if (m_pRcvQueue->recvfrom(m_SocketID, Ref(response)) > 0) { LOGC(mglog.Debug) << CONID() << "startConnect: got response for connect request"; @@ -2689,12 +2705,12 @@ void CUDT::startConnect(const sockaddr* serv_addr, int32_t forced_isn) // Now serialize the handshake again to the existing buffer so that it's // then sent later in this loop. - // First, set the size back to the original size, m_zMaxSRTPayloadSize because + // First, set the size back to the original size, m_iMaxSRTPayloadSize because // this is the size of the originally allocated space. It might have been // shrunk by serializing the INDUCTION handshake (which was required before // sending this packet to the output queue) and therefore be too // small to store the CONCLUSION handshake (with HSv5 extensions). - reqpkt.setLength(m_zMaxSRTPayloadSize); + reqpkt.setLength(m_iMaxSRTPayloadSize); LOGC(mglog.Debug) << "startConnect: creating HS CONCLUSION: buffer size=" << reqpkt.getLength(); @@ -2785,7 +2801,7 @@ bool CUDT::processAsyncConnectRequest(EConnectStatus cst, const CPacket& respons CPacket request; request.setControl(UMSG_HANDSHAKE); - request.allocate(m_zMaxSRTPayloadSize); + request.allocate(m_iMaxSRTPayloadSize); uint64_t now = CTimer::getTime(); request.m_iTimeStamp = int(now - this->m_StartTime); @@ -2813,7 +2829,7 @@ bool CUDT::processAsyncConnectRequest(EConnectStatus cst, const CPacket& respons else { // (this procedure will be also run for HSv4 rendezvous) - size_t hs_size = m_zMaxSRTPayloadSize; + size_t hs_size = m_iMaxSRTPayloadSize; LOGC(mglog.Debug) << "processAsyncConnectRequest: serializing HS: buffer size=" << request.getLength(); if (!createSrtHandshake(Ref(request), Ref(m_ConnReq), SRT_CMD_HSREQ, SRT_CMD_KMREQ, 0, 0)) { @@ -2950,7 +2966,7 @@ EConnectStatus CUDT::processRendezvous(ref_t reqpkt, const CPacket& res LOGC(mglog.Debug) << "processConnectResponse: HSREQ extension ok, creating HSRSP response. kmdatasize=" << kmdatasize; - rpkt.setLength(m_zMaxSRTPayloadSize); + rpkt.setLength(m_iMaxSRTPayloadSize); if (!createSrtHandshake(reqpkt, Ref(m_ConnReq), SRT_CMD_HSRSP, SRT_CMD_KMRSP, kmdata, kmdatasize)) { LOGC(mglog.Debug) << "processRendezvous: rejecting due to problems in createSrtHandshake."; @@ -3000,7 +3016,7 @@ EConnectStatus CUDT::processRendezvous(ref_t reqpkt, const CPacket& res // serialization. m_ConnReq.m_extension = needs_extension; - rpkt.setLength(m_zMaxSRTPayloadSize); + rpkt.setLength(m_iMaxSRTPayloadSize); // needs_extension here distinguishes between cases 1 and 3. // NOTE: in case when interpretSrtHandshake was run under the conditions above (to interpret HSRSP), // then createSrtHandshake below will create only empty AGREEMENT message. @@ -3254,7 +3270,7 @@ void CUDT::applyResponseSettings() m_iMSS = m_ConnRes.m_iMSS; m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize; int udpsize = m_iMSS - CPacket::UDP_HDR_SIZE; - m_zMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE; + m_iMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE; m_iPeerISN = m_ConnRes.m_iISN; m_iRcvLastAck = m_ConnRes.m_iISN; #ifdef ENABLE_LOGGING @@ -3266,7 +3282,7 @@ void CUDT::applyResponseSettings() m_PeerID = m_ConnRes.m_iID; memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16); - LOGC(mglog.Debug) << CONID() << "applyResponseSettings: HANSHAKE CONCLUDED. SETTING: payload-size=" << m_zMaxSRTPayloadSize + LOGC(mglog.Debug) << CONID() << "applyResponseSettings: HANSHAKE CONCLUDED. SETTING: payload-size=" << m_iMaxSRTPayloadSize << " mss=" << m_ConnRes.m_iMSS << " flw=" << m_ConnRes.m_iFlightFlagSize << " isn=" << m_ConnRes.m_iISN @@ -3846,7 +3862,7 @@ bool CUDT::prepareConnectionObjects(const CHandShake& hs, HandshakeSide hsd, CUD try { - m_pSndBuffer = new CSndBuffer(32, m_zMaxSRTPayloadSize); + m_pSndBuffer = new CSndBuffer(32, m_iMaxSRTPayloadSize); m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize); // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); @@ -3926,8 +3942,8 @@ void CUDT::acceptAndRespond(const sockaddr* peer, CHandShake* hs, const CPacket& CIPAddress::ntop(peer, hs->m_piPeerIP, m_iIPversion); int udpsize = m_iMSS - CPacket::UDP_HDR_SIZE; - m_zMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE; - LOGC(mglog.Debug) << "acceptAndRespond: PAYLOAD SIZE: " << m_zMaxSRTPayloadSize; + m_iMaxSRTPayloadSize = udpsize - CPacket::HDR_SIZE; + LOGC(mglog.Debug) << "acceptAndRespond: PAYLOAD SIZE: " << m_iMaxSRTPayloadSize; // Prepare all structures prepareConnectionObjects(*hs, HSD_DRAW, 0); @@ -3977,7 +3993,7 @@ void CUDT::acceptAndRespond(const sockaddr* peer, CHandShake* hs, const CPacket& // XXX Here create CONCLUSION RESPONSE with: // - just the UDT handshake, if HS_VERSION_UDT4, // - if higher, the UDT handshake, the SRT HSRSP, the SRT KMRSP - size_t size = m_zMaxSRTPayloadSize; + size_t size = m_iMaxSRTPayloadSize; // Allocate the maximum possible memory for an SRT payload. // This is a maximum you can send once. CPacket response; @@ -4492,11 +4508,7 @@ int CUDT::recv(char* data, int len) return res; } -#ifdef SRT_ENABLE_SRCTIMESTAMP int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder, uint64_t srctime) -#else -int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder) -#endif { bool bCongestion = false; @@ -4514,7 +4526,7 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder) if (!m_Smoother->checkTransArgs(Smoother::STA_MESSAGE, Smoother::STAD_SEND, data, len, msttl, inorder)) throw CUDTException(MJ_NOTSUP, MN_INVALMSGAPI, 0); - if (len > int(m_iSndBufSize * m_zMaxSRTPayloadSize)) + if (len > int(m_iSndBufSize * m_iMaxSRTPayloadSize)) throw CUDTException(MJ_NOTSUP, MN_XSIZE, 0); CGuard sendguard(m_SendLock); @@ -4642,7 +4654,6 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder) m_llSndDurationCounter = CTimer::getTime(); // insert the user buffer into the sending list -#ifdef SRT_ENABLE_SRCTIMESTAMP #ifdef SRT_ENABLE_CBRTIMESTAMP if (srctime == 0) { @@ -4656,11 +4667,6 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder) m_pSndBuffer->addBuffer(data, len, msttl, inorder, srctime); LOGC(dlog.Debug) << CONID() << "sock:SENDING srctime: " << srctime << "us DATA SIZE: " << len; -#else /* SRT_ENABLE_SRCTIMESTAMP */ - m_pSndBuffer->addBuffer(data, len, msttl, inorder); -#endif /* SRT_ENABLE_SRCTIMESTAMP */ - - // insert this socket to the snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, CSndUList::rescheduleIf(bCongestion)); @@ -4679,14 +4685,12 @@ int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder) int CUDT::recvmsg(char* data, int len) { -#ifdef SRT_ENABLE_SRCTIMESTAMP uint64_t srctime; return(CUDT::recvmsg(data, len, srctime)); } int CUDT::recvmsg(char* data, int len, uint64_t& srctime) { -#endif /* SRT_ENABLE_SRCTIMESTAMP */ if (!m_bConnected || !m_Smoother.ready()) throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); @@ -4742,11 +4746,7 @@ int CUDT::recvmsg(char* data, int len, uint64_t& srctime) if (!m_bSynRecving) { -#ifdef SRT_ENABLE_SRCTIMESTAMP int res = m_pRcvBuffer->readMsg(data, len, srctime); -#else - int res = m_pRcvBuffer->readMsg(data, len); -#endif if (res == 0) { // read is not available any more @@ -4771,7 +4771,7 @@ int CUDT::recvmsg(char* data, int len, uint64_t& srctime) s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false); // After signaling the tsbpd for ready data, report the bandwidth. - double bw = Bps2Mbps( m_iBandwidth * m_zMaxSRTPayloadSize ); + double bw = Bps2Mbps( m_iBandwidth * m_iMaxSRTPayloadSize ); LOGC(mglog.Debug) << CONID() << "CURRENT BANDWIDTH: " << bw << "Mbps (" << m_iBandwidth << " buffers per second)"; } return res; @@ -4827,11 +4827,7 @@ int CUDT::recvmsg(char* data, int len, uint64_t& srctime) fputs(ptrn, stderr); // */ -#ifdef SRT_ENABLE_SRCTIMESTAMP res = m_pRcvBuffer->readMsg(data, len, srctime); -#else - res = m_pRcvBuffer->readMsg(data, len); -#endif if (m_bBroken || m_bClosing) { @@ -5095,15 +5091,15 @@ void CUDT::sample(CPerfMon* perf, bool clear) double interval = double(currtime - m_LastSampleTime); - perf->mbpsSendRate = double(m_llTraceSent) * m_zMaxSRTPayloadSize * 8.0 / interval; - perf->mbpsRecvRate = double(m_llTraceRecv) * m_zMaxSRTPayloadSize * 8.0 / interval; + perf->mbpsSendRate = double(m_llTraceSent) * m_iMaxSRTPayloadSize * 8.0 / interval; + perf->mbpsRecvRate = double(m_llTraceRecv) * m_iMaxSRTPayloadSize * 8.0 / interval; perf->usPktSndPeriod = m_ullInterval_tk / double(m_ullCPUFrequency); perf->pktFlowWindow = m_iFlowWindowSize; perf->pktCongestionWindow = (int)m_dCongestionWindow; perf->pktFlightSize = CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo)) - 1; perf->msRTT = m_iRTT/1000.0; - perf->mbpsBandwidth = Bps2Mbps( m_iBandwidth * m_zMaxSRTPayloadSize ); + perf->mbpsBandwidth = Bps2Mbps( m_iBandwidth * m_iMaxSRTPayloadSize ); if (pthread_mutex_trylock(&m_ConnectionLock) == 0) { @@ -5226,7 +5222,7 @@ void CUDT::bstats(CBytePerfMon* perf, bool clear) //< uint32_t availbw = (uint64_t)(m_iBandwidth == 1 ? m_RcvTimeWindow.getBandwidth() : m_iBandwidth); - perf->mbpsBandwidth = Bps2Mbps( availbw * (m_zMaxSRTPayloadSize + pktHdrSize) ); + perf->mbpsBandwidth = Bps2Mbps( availbw * (m_iMaxSRTPayloadSize + pktHdrSize) ); if (pthread_mutex_trylock(&m_ConnectionLock) == 0) { @@ -5422,6 +5418,8 @@ void CUDT::updateCC(ETransmissionEvent evt, EventVariant arg) #endif } + LOGC(mglog.Debug) << "udpateCC: finished handling for EVENT:" << TransmissionEventStr(evt); + #if 0//debug static int callcnt = 0; if (!(callcnt++ % 250)) fprintf(stderr, "SndPeriod=%llu\n", (unsigned long long)m_ullInterval_tk/m_ullCPUFrequency); @@ -5675,7 +5673,7 @@ void CUDT::sendCtrl(UDTMessageType pkttype, void* lparam, void* rparam, int size if (m_lPeerSrtVersion == SrtVersion(1, 0, 2)) { data[ACKD_RCVRATE] = rcvRate; //bytes/sec - data[ACKD_XMRATE] = data[ACKD_BANDWIDTH] * m_zMaxSRTPayloadSize; //bytes/sec + data[ACKD_XMRATE] = data[ACKD_BANDWIDTH] * m_iMaxSRTPayloadSize; //bytes/sec ctrlsz = ACKD_FIELD_SIZE * ACKD_TOTAL_SIZE_VER102; } else if (m_lPeerSrtVersion >= SrtVersion(1, 0, 3)) @@ -5738,9 +5736,9 @@ void CUDT::sendCtrl(UDTMessageType pkttype, void* lparam, void* rparam, int size // this is periodically NAK report; make sure NAK cannot be sent back too often // read loss list from the local receiver loss list - int32_t* data = new int32_t[m_zMaxSRTPayloadSize / 4]; + int32_t* data = new int32_t[m_iMaxSRTPayloadSize / 4]; int losslen; - m_pRcvLossList->getLossArray(data, losslen, m_zMaxSRTPayloadSize / 4); + m_pRcvLossList->getLossArray(data, losslen, m_iMaxSRTPayloadSize / 4); if (0 < losslen) { @@ -6019,7 +6017,7 @@ void CUDT::processCtrl(CPacket& ctrlpkt) } // Start with checking the base size. - if ( acksize < ACKD_TOTAL_SIZE_UDTBASE ) + if ( acksize < ACKD_TOTAL_SIZE_SMALL ) { LOGC(mglog.Error) << CONID() << "Invalid ACK size " << acksize << " fields - less than minimum required!"; // Ack is already interpreted, just skip further parts. @@ -6034,8 +6032,8 @@ void CUDT::processCtrl(CPacket& ctrlpkt) // the current RTT calculations are exactly the same as in UDT4. int rtt = ackdata[ACKD_RTT]; - m_iRTTVar = threeFour(m_iRTTVar, abs(rtt - m_iRTT)); - m_iRTT = sevenEight(m_iRTT, rtt); + m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT)); + m_iRTT = avg_iir<8>(m_iRTT, rtt); /* Version-dependent fields: * Original UDT (total size: ACKD_TOTAL_SIZE_SMALL): @@ -6064,13 +6062,13 @@ void CUDT::processCtrl(CPacket& ctrlpkt) if (acksize > ACKD_TOTAL_SIZE_UDTBASE) bytesps = ackdata[ACKD_RCVRATE]; else - bytesps = pktps * m_zMaxSRTPayloadSize; + bytesps = pktps * m_iMaxSRTPayloadSize; - m_iBandwidth = sevenEight(m_iBandwidth, bandwidth); - m_iDeliveryRate = sevenEight(m_iDeliveryRate, pktps); - m_iByteDeliveryRate = sevenEight(m_iByteDeliveryRate, bytesps); + m_iBandwidth = avg_iir<8>(m_iBandwidth, bandwidth); + m_iDeliveryRate = avg_iir<8>(m_iDeliveryRate, pktps); + m_iByteDeliveryRate = avg_iir<8>(m_iByteDeliveryRate, bytesps); // XXX not sure if ACKD_XMRATE is of any use. This is simply - // calculated as ACKD_BANDWIDTH * m_zMaxSRTPayloadSize. + // calculated as ACKD_BANDWIDTH * m_iMaxSRTPayloadSize. // Update Estimated Bandwidth and packet delivery rate // m_iRcvRate = m_iDeliveryRate; @@ -6111,9 +6109,13 @@ void CUDT::processCtrl(CPacket& ctrlpkt) updateCC(TEV_ACKACK, ack); - CGuard::enterCS(m_RecvLock); - m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp()); - CGuard::leaveCS(m_RecvLock); + // This function will put a lock on m_RecvLock by itself, as needed. + // It must be done inside because this function reads the current time + // and if waiting for the lock has caused a delay, the time will be + // inaccurate. Additionally it won't lock if TSBPD mode is off, and + // won't update anything. Note that if you set TSBPD mode and use + // srt_recvfile (which doesn't make any sense), you'll have e deadlock. + m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), m_RecvLock); // update last ACK that has been received by the sender if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0) @@ -6293,7 +6295,7 @@ void CUDT::processCtrl(CPacket& ctrlpkt) // XXX here interpret SRT handshake extension CPacket response; response.setControl(UMSG_HANDSHAKE); - response.allocate(m_zMaxSRTPayloadSize); + response.allocate(m_iMaxSRTPayloadSize); // If createSrtHandshake failed, don't send anything. Actually it can only fail on IPE. // There is also no possible IPE condition in case of HSv4 - for this version it will always return true. @@ -6666,10 +6668,17 @@ int CUDT::packData(CPacket& packet, uint64_t& ts_tk) m_ullLastSndTime_tk = entertime_tk; considerLegacySrtHandshake(0); + + // WARNING: TEV_SEND is the only event that is reported from + // the CSndQueue::worker thread. All others are reported from + // CRcvQueue::worker. If you connect to this signal, make sure + // that you are aware of prospective simultaneous access. updateCC(TEV_SEND, &packet); // XXX This was a blocked code also originally in UDT. Probably not required. // Left untouched for historical reasons. + // Might be possible that it was because of that this is send from + // different thread than the rest of the signals. //m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp); m_ullTraceBytesSent += payload; @@ -7573,7 +7582,7 @@ void CUDT::checkTimers() // is sent, which doesn't contain statistical data and nothing more // than just the ACK number. The "fat ACK" packets will be still sent // normally according to the timely rules. - else if (m_iPktCount >= m_iSelfClockInterval * m_iLightACKCount) + else if (m_iPktCount >= SELF_CLOCK_INTERVAL * m_iLightACKCount) { //send a "light" ACK sendCtrl(UMSG_ACK, NULL, NULL, SEND_LITE_ACK); @@ -7637,7 +7646,7 @@ void CUDT::checkTimers() // UDT does not signal any information about this instead of to stop quietly. // Application will detect this when it calls any UDT methods next time. // - LOGC(mglog.Debug).form("connection expired after: %llu", (unsigned long long)(currtime_tk - m_ullLastRspTime_tk)/m_ullCPUFrequency); + LOGC(mglog.Debug) << "CONNECTION EXPIRED after " << ((currtime_tk - m_ullLastRspTime_tk)/m_ullCPUFrequency) << "ms"; m_bClosing = true; m_bBroken = true; m_iBrokenCounter = 30; @@ -7655,6 +7664,9 @@ void CUDT::checkTimers() return; } + LOGC(mglog.Debug) << "EXP TIMER: count=" << m_iEXPCount << "/" << (+COMM_RESPONSE_MAX_EXP) + << " elapsed=" << ((currtime_tk - m_ullLastRspTime_tk)*m_ullCPUFrequency) << "/" << (+COMM_RESPONSE_TIMEOUT_US) << "us"; + /* * This part is only used with FileSmoother. This retransmits * unacknowledged packet only when nothing in the loss list. diff --git a/srtcore/core.h b/srtcore/core.h index b42877507..dcb89e5de 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -113,13 +113,16 @@ enum AckDataItem ACKD_BUFFERLEFT = 3, ACKD_TOTAL_SIZE_SMALL = 4, - // Extra stats for SRT + // Extra fields existing in UDT (not always sent) + ACKD_RCVSPEED = 4, // length would be 16 ACKD_BANDWIDTH = 5, ACKD_TOTAL_SIZE_UDTBASE = 6, // length = 24 + // Extra stats for SRT + ACKD_RCVRATE = 6, ACKD_TOTAL_SIZE_VER101 = 7, // length = 28 - ACKD_XMRATE = 7, // XXX This is a weird compat stuff. Version 1.1.3 defines it as ACKD_BANDWIDTH*m_zMaxSRTPayloadSize when set. Never got. + ACKD_XMRATE = 7, // XXX This is a weird compat stuff. Version 1.1.3 defines it as ACKD_BANDWIDTH*m_iMaxSRTPayloadSize when set. Never got. // XXX NOTE: field number 7 may be used for something in future, need to confirm destruction of all !compat 1.0.2 version ACKD_TOTAL_SIZE_VER102 = 8, // 32 @@ -184,12 +187,8 @@ class CUDT static int setsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, const void* optval, int optlen); static int send(SRTSOCKET u, const char* buf, int len, int flags); static int recv(SRTSOCKET u, char* buf, int len, int flags); -#ifdef SRT_ENABLE_SRCTIMESTAMP static int sendmsg(SRTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false, uint64_t srctime = 0LL); static int recvmsg(SRTSOCKET u, char* buf, int len, uint64_t& srctime); -#else - static int sendmsg(SRTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false); -#endif static int recvmsg(SRTSOCKET u, char* buf, int len); static int64_t sendfile(SRTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = 364000); static int64_t recvfile(SRTSOCKET u, std::fstream& ofs, int64_t& offset, int64_t size, int block = 7280000); @@ -212,6 +211,12 @@ class CUDT static std::string getstreamid(SRTSOCKET u); static int getsndbuffer(SRTSOCKET u, size_t* blocks, size_t* bytes); + static int setError(const CUDTException& e) + { + s_UDTUnited.setError(new CUDTException(e)); + return SRT_ERROR; + } + public: // internal API static const SRTSOCKET INVALID_SOCK = -1; // invalid socket descriptor static const int ERROR = -1; // socket api error returned value @@ -263,9 +268,10 @@ class CUDT int bandwidth() { return m_iBandwidth; } int64_t maxBandwidth() { return m_llMaxBW; } int MSS() { return m_iMSS; } - size_t maxPayloadSize() { return m_zMaxSRTPayloadSize; } + size_t maxPayloadSize() { return m_iMaxSRTPayloadSize; } size_t OPT_PayloadSize() { return m_zOPT_ExpPayloadSize; } uint64_t minNAKInterval() { return m_ullMinNakInt_tk; } + int32_t ISN() { return m_iISN; } // XXX See CUDT::tsbpd() to see how to implement it. This should // do the same as TLPKTDROP feature when skipping packets that are agreed @@ -372,19 +378,13 @@ class CUDT /// @param srctime [in] Time when the data were ready to send. /// @return Actual size of data sent. -#ifdef SRT_ENABLE_SRCTIMESTAMP int sendmsg(const char* data, int len, int ttl, bool inorder, uint64_t srctime); -#else - int sendmsg(const char* data, int len, int ttl, bool inorder); -#endif /// Receive a message to buffer "data". /// @param data [out] data received. /// @param len [in] size of the buffer. /// @return Actual size of data received. -#ifdef SRT_ENABLE_SRCTIMESTAMP int recvmsg(char* data, int len, uint64_t& srctime); -#endif int recvmsg(char* data, int len); /// Request UDT to send out a file described as "fd", starting from "offset", with size of "size". @@ -464,9 +464,15 @@ class CUDT int sndSpaceLeft() { - return ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_zMaxSRTPayloadSize); + return sndBuffersLeft() * m_iMaxSRTPayloadSize; + } + + int sndBuffersLeft() + { + return m_iSndBufSize - m_pSndBuffer->getCurrBufSize(); } + // TSBPD thread main function. static void* tsbpd(void* param); @@ -481,7 +487,7 @@ class CUDT UDTSockType m_iSockType; // Type of the UDT connection (SOCK_STREAM or SOCK_DGRAM) SRTSOCKET m_PeerID; // peer id, for multiplexer - size_t m_zMaxSRTPayloadSize; // Maximum/regular payload size, in bytes + int m_iMaxSRTPayloadSize; // Maximum/regular payload size, in bytes size_t m_zOPT_ExpPayloadSize; // Expected average payload size (user option) // Options @@ -564,16 +570,6 @@ class CUDT int m_iDeliveryRate; // Packet arrival rate at the receiver side int m_iByteDeliveryRate; // Byte arrival rate at the receiver side - int sevenEight(int oldvalue, int newvalue) - { - return (oldvalue*7 + newvalue) >> 3; - } - - int threeFour(int oldvalue, int newvalue) - { - return (oldvalue*3 + newvalue) >> 2; - } - uint64_t m_ullLingerExpiration; // Linger expiration time (for GC to close a socket with data in sending buffer) CHandShake m_ConnReq; // connection request @@ -740,7 +736,7 @@ class CUDT public: - static const int m_iSelfClockInterval = 64; // ACK interval for self-clocking + static const int SELF_CLOCK_INTERVAL = 64; // ACK interval for self-clocking static const int SEND_LITE_ACK = sizeof(int32_t); // special size for ack containing only ack seq static const int PACKETPAIR_MASK = 0xF; diff --git a/srtcore/crypto.cpp b/srtcore/crypto.cpp index 6a9313111..916a72c2b 100644 --- a/srtcore/crypto.cpp +++ b/srtcore/crypto.cpp @@ -399,7 +399,7 @@ void CCryptoControl::regenCryptoKm(bool sendit, bool bidirectional) m_SndKmLastTime = CTimer::getTime(); } -CCryptoControl::CCryptoControl(CUDT* parent, UDTSOCKET id): +CCryptoControl::CCryptoControl(CUDT* parent, SRTSOCKET id): m_parent(parent), // should be initialized in createCC() m_SocketID(id), m_iSndKmKeyLen(0), diff --git a/srtcore/crypto.h b/srtcore/crypto.h index 36c501286..ae321c174 100644 --- a/srtcore/crypto.h +++ b/srtcore/crypto.h @@ -49,7 +49,7 @@ class CCryptoControl { //public: class CUDT* m_parent; - UDTSOCKET m_SocketID; + SRTSOCKET m_SocketID; size_t m_iSndKmKeyLen; //Key length size_t m_iRcvKmKeyLen; //Key length from rx KM @@ -117,7 +117,7 @@ class CCryptoControl return false; } - CCryptoControl(CUDT* parent, UDTSOCKET id); + CCryptoControl(CUDT* parent, SRTSOCKET id); std::string CONID() const; diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index 0c3761c53..b796ddb8c 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -133,7 +133,7 @@ ENOMEM: There was insufficient memory to create the kernel object. return desc.m_iID; } -int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events) +int CEPoll::add_usock(const int eid, const SRTSOCKET& u, const int* events) { CGuard pg(m_EPollLock); @@ -220,7 +220,7 @@ int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events) return 0; } -int CEPoll::remove_usock(const int eid, const UDTSOCKET& u) +int CEPoll::remove_usock(const int eid, const SRTSOCKET& u) { CGuard pg(m_EPollLock); @@ -275,7 +275,7 @@ int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s) } // Need this to atomically modify polled events (ex: remove write/keep read) -int CEPoll::update_usock(const int eid, const UDTSOCKET& u, const int* events) +int CEPoll::update_usock(const int eid, const SRTSOCKET& u, const int* events) { CGuard pg(m_EPollLock); @@ -379,7 +379,7 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events) return 0; } -int CEPoll::wait(const int eid, set* readfds, set* writefds, int64_t msTimeOut, set* lrfds, set* lwfds) +int CEPoll::wait(const int eid, set* readfds, set* writefds, int64_t msTimeOut, set* lrfds, set* lwfds) { // if all fields is NULL and waiting time is infinite, then this would be a deadlock if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0)) @@ -416,14 +416,14 @@ int CEPoll::wait(const int eid, set* readfds, set* writefd if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty())) { *readfds = p->second.m_sUDTReads; - for (set::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i) + for (set::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i) readfds->insert(*i); total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size(); } if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty())) { *writefds = p->second.m_sUDTWrites; - for (set::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i) + for (set::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i) writefds->insert(*i); total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size(); } @@ -560,7 +560,7 @@ int CEPoll::release(const int eid) namespace { -void update_epoll_sets(const UDTSOCKET& uid, const set& watch, set& result, bool enable) +void update_epoll_sets(const SRTSOCKET& uid, const set& watch, set& result, bool enable) { if (enable && (watch.find(uid) != watch.end())) { @@ -574,7 +574,7 @@ void update_epoll_sets(const UDTSOCKET& uid, const set& watch, set& eids, int events, bool enable) +int CEPoll::update_events(const SRTSOCKET& uid, std::set& eids, int events, bool enable) { CGuard pg(m_EPollLock); diff --git a/srtcore/epoll.h b/srtcore/epoll.h index f97a841e5..4746dd073 100644 --- a/srtcore/epoll.h +++ b/srtcore/epoll.h @@ -72,16 +72,16 @@ modified by struct CEPollDesc { int m_iID; // epoll ID - std::set m_sUDTSocksOut; // set of UDT sockets waiting for write events - std::set m_sUDTSocksIn; // set of UDT sockets waiting for read events - std::set m_sUDTSocksEx; // set of UDT sockets waiting for exceptions + std::set m_sUDTSocksOut; // set of UDT sockets waiting for write events + std::set m_sUDTSocksIn; // set of UDT sockets waiting for read events + std::set m_sUDTSocksEx; // set of UDT sockets waiting for exceptions int m_iLocalID; // local system epoll ID std::set m_sLocals; // set of local (non-UDT) descriptors - std::set m_sUDTWrites; // UDT sockets ready for write - std::set m_sUDTReads; // UDT sockets ready for read - std::set m_sUDTExcepts; // UDT sockets with exceptions (connection broken, etc.) + std::set m_sUDTWrites; // UDT sockets ready for write + std::set m_sUDTReads; // UDT sockets ready for read + std::set m_sUDTExcepts; // UDT sockets with exceptions (connection broken, etc.) }; class CEPoll @@ -106,7 +106,7 @@ friend class CRendezvousQueue; /// @param [in] events events to watch. /// @return 0 if success, otherwise an error number. - int add_usock(const int eid, const UDTSOCKET& u, const int* events = NULL); + int add_usock(const int eid, const SRTSOCKET& u, const int* events = NULL); /// add a system socket to an EPoll. /// @param [in] eid EPoll ID. @@ -121,7 +121,7 @@ friend class CRendezvousQueue; /// @param [in] u UDT socket ID. /// @return 0 if success, otherwise an error number. - int remove_usock(const int eid, const UDTSOCKET& u); + int remove_usock(const int eid, const SRTSOCKET& u); /// remove a system socket event from an EPoll; socket will be removed if no events to watch. /// @param [in] eid EPoll ID. @@ -135,7 +135,7 @@ friend class CRendezvousQueue; /// @param [in] events events to watch. /// @return 0 if success, otherwise an error number. - int update_usock(const int eid, const UDTSOCKET& u, const int* events = NULL); + int update_usock(const int eid, const SRTSOCKET& u, const int* events = NULL); /// update a system socket events from an EPoll. /// @param [in] eid EPoll ID. @@ -154,7 +154,7 @@ friend class CRendezvousQueue; /// @param [out] lwfds system file descriptors for writing. /// @return number of sockets available for IO. - int wait(const int eid, std::set* readfds, std::set* writefds, int64_t msTimeOut, std::set* lrfds, std::set* lwfds); + int wait(const int eid, std::set* readfds, std::set* writefds, int64_t msTimeOut, std::set* lrfds, std::set* lwfds); /// close and release an EPoll. /// @param [in] eid EPoll ID. @@ -171,7 +171,7 @@ friend class CRendezvousQueue; /// @param [in] enable true -> enable, otherwise disable /// @return 0 if success, otherwise an error number - int update_events(const UDTSOCKET& uid, std::set& eids, int events, bool enable); + int update_events(const SRTSOCKET& uid, std::set& eids, int events, bool enable); private: int m_iIDSeed; // seed to generate a new ID diff --git a/srtcore/handshake.h b/srtcore/handshake.h index 7e651facf..e9ef56c32 100644 --- a/srtcore/handshake.h +++ b/srtcore/handshake.h @@ -106,42 +106,43 @@ struct SrtHandshakeExtension struct SrtHSRequest: public SrtHandshakeExtension { - static const int32_t SRT_MAGIC_CODE = 0x4A171510; + // .... HAIVISIOn + static const int32_t SRT_MAGIC_CODE = 0x4A171510; private: - friend class CHandShake; + friend class CHandShake; - static const size_t SRT_HS_SIZE = 4*sizeof(uint32_t); // 4 existing fields - static const size_t SRT_EXT_HS_SIZE = 2*sizeof(uint32_t) + SRT_HS_SIZE; // SRT magic and SRT HS type, used only in UDT HS ext + static const size_t SRT_HS_SIZE = 4*sizeof(uint32_t); // 4 existing fields + static const size_t SRT_EXT_HS_SIZE = 2*sizeof(uint32_t) + SRT_HS_SIZE; // SRT magic and SRT HS type, used only in UDT HS ext - typedef Bits<15, 0> SRT_TSBPD_DELAY; + typedef Bits<15, 0> SRT_TSBPD_DELAY; - uint32_t m_iSrtVersion; - uint32_t m_iSrtFlags; - uint32_t m_iSrtTsbpd; - uint32_t m_iSrtReserved; + uint32_t m_iSrtVersion; + uint32_t m_iSrtFlags; + uint32_t m_iSrtTsbpd; + uint32_t m_iSrtReserved; public: - SrtHSRequest(): SrtHandshakeExtension(SRT_CMD_HSREQ), m_iSrtVersion(), m_iSrtFlags(), m_iSrtTsbpd(), m_iSrtReserved() {} + SrtHSRequest(): SrtHandshakeExtension(SRT_CMD_HSREQ), m_iSrtVersion(), m_iSrtFlags(), m_iSrtTsbpd(), m_iSrtReserved() {} - void setVersion(uint32_t v) { m_iSrtVersion = v; } - uint32_t version() const { return m_iSrtVersion; } + void setVersion(uint32_t v) { m_iSrtVersion = v; } + uint32_t version() const { return m_iSrtVersion; } - void setFlag(SrtOptions opt) { m_iSrtFlags |= uint32_t(opt); } - void clearFlag(SrtOptions opt) { m_iSrtFlags &= ~opt; } - uint32_t flags() const { return m_iSrtFlags; } + void setFlag(SrtOptions opt) { m_iSrtFlags |= uint32_t(opt); } + void clearFlag(SrtOptions opt) { m_iSrtFlags &= ~opt; } + uint32_t flags() const { return m_iSrtFlags; } - void setTsbPdDelay(uint16_t delay) { m_iSrtTsbpd |= SRT_TSBPD_DELAY::wrap(delay); } - // Unknown what the 1-16 bits have to be used for. - uint16_t tsbPdDelay() const - { - return SRT_TSBPD_DELAY::unwrap(m_iSrtTsbpd); - } + void setTsbPdDelay(uint16_t delay) { m_iSrtTsbpd |= SRT_TSBPD_DELAY::wrap(delay); } + // Unknown what the 1-16 bits have to be used for. + uint16_t tsbPdDelay() const + { + return SRT_TSBPD_DELAY::unwrap(m_iSrtTsbpd); + } - size_t size() const { return SRT_EXT_HS_SIZE; } + size_t size() const { return SRT_EXT_HS_SIZE; } - bool serialize(char* p, size_t size) const; - bool deserialize(const char* mem, size_t size); + bool serialize(char* p, size_t size) const; + bool deserialize(const char* mem, size_t size); }; struct SrtKMRequest: public SrtHandshakeExtension diff --git a/srtcore/logging.h b/srtcore/logging.h index 80ab6213e..9f64d1930 100644 --- a/srtcore/logging.h +++ b/srtcore/logging.h @@ -503,8 +503,9 @@ inline void PrintArgs(std::ostream& serr, Arg1&& arg1, Args&&... args) } template -inline void LogDispatcher::PrintLogLine(const char* file, int line, const std::string& area, Args&&... args) +inline void LogDispatcher::PrintLogLine(const char* file ATR_UNUSED, int line ATR_UNUSED, const std::string& area ATR_UNUSED, Args&&... args ATR_UNUSED) { +#ifdef ENABLE_LOGGING std::ostringstream serr; CreateLogLinePrefix(serr); PrintArgs(serr, args...); @@ -514,13 +515,15 @@ inline void LogDispatcher::PrintLogLine(const char* file, int line, const std::s // Not sure, but it wasn't ever used. SendLogLine(file, line, area, serr.str()); +#endif } #else template -inline void LogDispatcher::PrintLogLine(const char* file, int line, const std::string& area, const Arg& arg) +inline void LogDispatcher::PrintLogLine(const char* file ATR_UNUSED, int line ATR_UNUSED, const std::string& area ATR_UNUSED, const Arg& arg ATR_UNUSED) { +#ifdef ENABLE_LOGGING std::ostringstream serr; CreateLogLinePrefix(serr); serr << arg; @@ -530,10 +533,14 @@ inline void LogDispatcher::PrintLogLine(const char* file, int line, const std::s // Not sure, but it wasn't ever used. SendLogLine(file, line, area, serr.str()); +#endif } #endif +// SendLogLine can be compiled normally. It's intermediately used by: +// - Proxy object, which is replaced by DummyProxy when !ENABLE_LOGGING +// - PrintLogLine, which has empty body when !ENABLE_LOGGING inline void LogDispatcher::SendLogLine(const char* file, int line, const std::string& area, const std::string& msg) { src_config->lock(); diff --git a/srtcore/packet.h b/srtcore/packet.h index 58e0eae00..84d7a1e8c 100644 --- a/srtcore/packet.h +++ b/srtcore/packet.h @@ -281,7 +281,7 @@ friend class CRcvQueue; /// Read the message sequence number. /// @return packet header field [1] - int32_t getMsgSeq(bool has_rexmit) const; + int32_t getMsgSeq(bool has_rexmit = true) const; /// Read the message crypto key bits. /// @return packet header field [1] (bit 3~4). @@ -381,7 +381,7 @@ friend class CRcvQueue; char*& m_pcData; // alias: data/control information //static const int m_iPktHdrSize; // packet header size - static const size_t HDR_SIZE = sizeof(HEADER_TYPE); // packet header size + static const size_t HDR_SIZE = sizeof(HEADER_TYPE); // packet header size = PH_SIZE * sizeof(uint32_t) // Used in many computations // Actually this can be also calculated as: sizeof(struct ether_header) + sizeof(struct ip) + sizeof(struct udphdr). diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 26f25bd24..544e873ae 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -585,7 +585,7 @@ void* CSndQueue::worker(void* param) } self->m_pChannel->sendto(addr, pkt); -#if defined(HAI_DEBUG_SNDQ_HIGHRATE) +#if defined(SRT_DEBUG_SNDQ_HIGHRATE) self->m_WorkerStats.lSendTo++; #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ } @@ -823,7 +823,7 @@ CRendezvousQueue::~CRendezvousQueue() m_lRendezvousID.clear(); } -void CRendezvousQueue::insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) +void CRendezvousQueue::insert(const SRTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) { CGuard vg(m_RIDVectorLock); @@ -838,7 +838,7 @@ void CRendezvousQueue::insert(const UDTSOCKET& id, CUDT* u, int ipv, const socka m_lRendezvousID.push_back(r); } -void CRendezvousQueue::remove(const UDTSOCKET& id, bool should_lock) +void CRendezvousQueue::remove(const SRTSOCKET& id, bool should_lock) { CGuard vg(m_RIDVectorLock, should_lock); @@ -858,10 +858,10 @@ void CRendezvousQueue::remove(const UDTSOCKET& id, bool should_lock) } } -CUDT* CRendezvousQueue::retrieve(const sockaddr* addr, ref_t r_id) +CUDT* CRendezvousQueue::retrieve(const sockaddr* addr, ref_t r_id) { CGuard vg(m_RIDVectorLock); - UDTSOCKET& id = r_id; + SRTSOCKET& id = r_id; // TODO: optimize search for (list::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++ i) @@ -1056,7 +1056,7 @@ void* CRcvQueue::worker(void* param) while (!self->m_bClosing) { bool have_received = false; - EReadStatus rst = self->worker_RetrieveUnit(id, unit, &sa); + EReadStatus rst = self->worker_RetrieveUnit(Ref(id), Ref(unit), &sa); if (rst == RST_OK) { if ( id < 0 ) @@ -1173,7 +1173,7 @@ static string PacketInfo(const CPacket& pkt) return os.str(); } -EReadStatus CRcvQueue::worker_RetrieveUnit(int32_t& id, CUnit*& unit, sockaddr* addr) +EReadStatus CRcvQueue::worker_RetrieveUnit(ref_t id, ref_t unit, sockaddr* addr) { #ifdef NO_BUSY_WAITING m_pTimer->tick(); @@ -1449,12 +1449,12 @@ void CRcvQueue::removeListener(const CUDT* u) m_pListener = NULL; } -void CRcvQueue::registerConnector(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) +void CRcvQueue::registerConnector(const SRTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl) { m_pRendezvousQueue->insert(id, u, ipv, addr, ttl); } -void CRcvQueue::removeConnector(const UDTSOCKET& id, bool should_lock) +void CRcvQueue::removeConnector(const SRTSOCKET& id, bool should_lock) { LOGC(mglog.Debug) << "removeConnector: removing %" << id; m_pRendezvousQueue->remove(id, should_lock); diff --git a/srtcore/queue.h b/srtcore/queue.h index 07ec0d857..ea18efef3 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -313,21 +313,21 @@ class CRendezvousQueue ~CRendezvousQueue(); public: - void insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl); + void insert(const SRTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl); // The should_lock parameter is given here to state as to whether // the lock should be applied here. If called from some internals // and the lock IS ALREADY APPLIED, use false here to prevent // double locking and deadlock in result. - void remove(const UDTSOCKET& id, bool should_lock); - CUDT* retrieve(const sockaddr* addr, ref_t id); + void remove(const SRTSOCKET& id, bool should_lock); + CUDT* retrieve(const sockaddr* addr, ref_t id); void updateConnStatus(EConnectStatus, const CPacket& response); private: struct CRL { - UDTSOCKET m_iID; // UDT socket ID (self) + SRTSOCKET m_iID; // UDT socket ID (self) CUDT* m_pUDT; // UDT instance int m_iIPversion; // IP version sockaddr* m_pPeerAddr; // UDT sonnection peer address @@ -456,7 +456,7 @@ friend class CUDTUnited; static void* worker(void* param); pthread_t m_WorkerThread; // Subroutines of worker - EReadStatus worker_RetrieveUnit(int32_t& id, CUnit*& unit, sockaddr* sa); + EReadStatus worker_RetrieveUnit(ref_t id, ref_t unit, sockaddr* sa); EConnectStatus worker_ProcessConnectionRequest(CUnit* unit, const sockaddr* sa); EConnectStatus worker_TryAsyncRend_OrStore(int32_t id, CUnit* unit, const sockaddr* sa); EConnectStatus worker_ProcessAddressedPacket(int32_t id, CUnit* unit, const sockaddr* sa); @@ -478,8 +478,8 @@ friend class CUDTUnited; int setListener(CUDT* u); void removeListener(const CUDT* u); - void registerConnector(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl); - void removeConnector(const UDTSOCKET& id, bool should_lock = true); + void registerConnector(const SRTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl); + void removeConnector(const SRTSOCKET& id, bool should_lock = true); void setNewEntry(CUDT* u); bool ifNewEntry(); diff --git a/srtcore/smoother.cpp b/srtcore/smoother.cpp index a5abd35cc..5ae54aadf 100644 --- a/srtcore/smoother.cpp +++ b/srtcore/smoother.cpp @@ -93,6 +93,9 @@ class LiveSmoother: public SmootherBase updatePktSndPeriod(); + + // NOTE: TEV_SEND gets dispatched from Sending thread, all others + // from receiving thread. parent->ConnectSignal(TEV_SEND, SSLOT(updatePayloadSize)); /* @@ -137,13 +140,22 @@ class LiveSmoother: public SmootherBase virtual int64_t sndBandwidth() ATR_OVERRIDE { return m_llSndMaxBW; } +private: // SLOTS: // TEV_SEND -> CPacket*. void updatePayloadSize(ETransmissionEvent, EventVariant var) { const CPacket& packet = *var.get(); - m_iSndAvgPayloadSize = ((m_iSndAvgPayloadSize * 127) + packet.getLength()) / 128; + + // XXX NOTE: TEV_SEND is sent from CSndQueue::worker thread, which is + // different to threads running any other events (TEV_CHECKTIMER and TEV_ACK). + // The m_iSndAvgPayloadSize field is however left unguarded because as + // 'int' type it's considered atomic, as well as there's no other modifier + // of this field. Worst case scenario, the procedure running in CRcvQueue::worker + // thread will pick up a "slightly outdated" average value from this + // field - this is insignificant. + m_iSndAvgPayloadSize = avg_iir<128, int>(m_iSndAvgPayloadSize, packet.getLength()); LOGC(mglog.Debug) << "LiveSmoother: avg payload size updated: " << m_iSndAvgPayloadSize; } @@ -160,8 +172,9 @@ class LiveSmoother: public SmootherBase void updatePktSndPeriod() { - double pktsize = m_iSndAvgPayloadSize + CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE; - m_dPktSndPeriod = 1000000.0 * (pktsize/m_llSndMaxBW); + // packet = payload + header + double pktsize = m_iSndAvgPayloadSize + CPacket::SRT_DATA_HDR_SIZE; + m_dPktSndPeriod = 1000*1000.0 * (pktsize/m_llSndMaxBW); LOGC(mglog.Debug) << "LiveSmoother: sending period updated: " << m_iSndAvgPayloadSize; } @@ -323,6 +336,8 @@ class FileSmoother: public SmootherBase } } +private: + // SLOTS void updateSndPeriod(ETransmissionEvent, EventVariant arg) { @@ -346,13 +361,28 @@ class FileSmoother: public SmootherBase { m_bSlowStart = false; if (m_parent->deliveryRate() > 0) + { m_dPktSndPeriod = 1000000.0 / m_parent->deliveryRate(); + LOGC(mglog.Debug) << "FileSmoother: UPD (slowstart:ENDED) wndsize=" + << m_dCWndSize << "/" << m_dMaxCWndSize + << " sndperiod=" << m_dPktSndPeriod << "us = mega/(" + << m_parent->deliveryRate() << "B/s)"; + } else + { m_dPktSndPeriod = m_dCWndSize / (m_parent->RTT() + m_iRCInterval); + LOGC(mglog.Debug) << "FileSmoother: UPD (slowstart:ENDED) wndsize=" + << m_dCWndSize << "/" << m_dMaxCWndSize + << " sndperiod=" << m_dPktSndPeriod << "us = wndsize/(RTT+RCIV) RTT=" + << m_parent->RTT() << " RCIV=" << m_iRCInterval; + } + } + else + { + LOGC(mglog.Debug) << "FileSmoother: UPD (slowstart:KEPT) wndsize=" + << m_dCWndSize << "/" << m_dMaxCWndSize + << " sndperiod=" << m_dPktSndPeriod << "us"; } - - LOGC(mglog.Debug) << "FileSmoother: UPD (slowstart:" - << (m_bSlowStart ? "KEPT" : "ENDED") << ") wndsize=" << m_dCWndSize << " sndperiod=" << m_dPktSndPeriod << "us"; } else { diff --git a/srtcore/srt.h b/srtcore/srt.h index 8225de40c..876e247ff 100644 --- a/srtcore/srt.h +++ b/srtcore/srt.h @@ -125,27 +125,26 @@ typedef enum SRT_SOCKSTATUS { // backward compatibility until all compat is destroyed. typedef enum SRT_SOCKOPT { - SRTO_MSS, // the Maximum Transfer Unit - SRTO_SNDSYN, // if sending is blocking - SRTO_RCVSYN, // if receiving is blocking - SRTO_CC, // custom congestion control algorithm - SRTO_FC, // Flight flag size (window size) - SRTO_SNDBUF, // maximum buffer in sending queue - SRTO_RCVBUF, // UDT receiving buffer size - SRTO_LINGER, // waiting for unsent data when closing - SRTO_UDP_SNDBUF, // UDP sending buffer size - SRTO_UDP_RCVBUF, // UDP receiving buffer size - SRTO_MAXMSG, // maximum datagram message size - SRTO_MSGTTL, // time-to-live of a datagram message - SRTO_RENDEZVOUS, // rendezvous connection mode - SRTO_SNDTIMEO, // send() timeout - SRTO_RCVTIMEO, // recv() timeout - SRTO_REUSEADDR, // reuse an existing port or create a new one - SRTO_MAXBW, // maximum bandwidth (bytes per second) that the connection can use - SRTO_STATE, // current socket state, see UDTSTATUS, read only - SRTO_EVENT, // current available events associated with the socket - SRTO_SNDDATA, // size of data in the sending buffer - SRTO_RCVDATA, // size of data available for recv + SRTO_MSS = 0, // the Maximum Transfer Unit + SRTO_SNDSYN = 1, // if sending is blocking + SRTO_RCVSYN = 2, // if receiving is blocking + SRTO_FC = 4, // Flight flag size (window size) + SRTO_SNDBUF = 5, // maximum buffer in sending queue + SRTO_RCVBUF = 6, // UDT receiving buffer size + SRTO_LINGER = 7, // waiting for unsent data when closing + SRTO_UDP_SNDBUF = 8, // UDP sending buffer size + SRTO_UDP_RCVBUF = 9, // UDP receiving buffer size + // XXX Here space free for 2 options + // after deprecated ones are removed + SRTO_RENDEZVOUS = 12, // rendezvous connection mode + SRTO_SNDTIMEO = 13, // send() timeout + SRTO_RCVTIMEO = 14, // recv() timeout + SRTO_REUSEADDR = 15, // reuse an existing port or create a new one + SRTO_MAXBW = 16, // maximum bandwidth (bytes per second) that the connection can use + SRTO_STATE = 17, // current socket state, see UDTSTATUS, read only + SRTO_EVENT = 18, // current available events associated with the socket + SRTO_SNDDATA = 19, // size of data in the sending buffer + SRTO_RCVDATA = 20, // size of data available for recv SRTO_SENDER = 21, // Sender mode (independent of conn mode), for encryption, tsbpd handshake. SRTO_TSBPDMODE = 22, // Enable/Disable TsbPd. Enable -> Tx set origin timestamp, Rx deliver packet at origin time + delay SRTO_LATENCY = 23, // DEPRECATED. SET: to both SRTO_RCVLATENCY and SRTO_PEERLATENCY. GET: same as SRTO_RCVLATENCY. @@ -163,10 +162,10 @@ typedef enum SRT_SOCKOPT { SRTO_VERSION = 34, // Local SRT Version SRTO_PEERVERSION, // Peer SRT Version (from SRT Handshake) SRTO_CONNTIMEO = 36, // Connect timeout in msec. Ccaller default: 3000, rendezvous (x 10) - // deprecated: SRTO_TWOWAYDATA (@c below) - SRTO_SNDPBKEYLEN = 38, // (DEPRECATED: use SRTO_PBKEYLEN) - SRTO_RCVPBKEYLEN, // (DEPRECATED: use SRTO_PBKEYLEN) - SRTO_SNDPEERKMSTATE, // (GET) the current state of the encryption at the peer side + // deprecated: SRTO_TWOWAYDATA, SRTO_SNDPBKEYLEN, SRTO_RCVPBKEYLEN (@c below) + _DEPRECATED_SRTO_SNDPBKEYLEN = 38, // (needed to use inside the code without generating -Wswitch) + // + SRTO_SNDPEERKMSTATE = 40, // (GET) the current state of the encryption at the peer side SRTO_RCVKMSTATE, // (GET) the current state of the encryption at the agent side SRTO_LOSSMAXTTL, // Maximum possible packet reorder tolerance (number of packets to receive after loss to send lossreport) SRTO_RCVLATENCY, // TsbPd receiver delay (mSec) to absorb burst of missed packet retransmission @@ -179,18 +178,45 @@ typedef enum SRT_SOCKOPT { SRTO_TRANSTYPE // Transmission type (set of options required for given transmission type) } SRT_SOCKOPT; +// DEPRECATED OPTIONS: + // SRTO_TWOWAYDATA: not to be used. SRT connection is always bidirectional if -// both clients support HSv5 - that is, since version 1.3.0 +// both clients support HSv5 - that is, since version 1.3.0. This flag was +// introducted around 1.2.0 version when full bidirectional support was added, +// but the bidirectional feature was decided no to be enabled due to huge +// differences between bidirectional support (especially concerning encryption) +// with HSv4 and HSv5 (that is, HSv4 was decided to remain unidirectional only, +// even though partial support is already provided in this version). static const SRT_SOCKOPT SRTO_TWOWAYDATA SRT_ATR_DEPRECATED = (SRT_SOCKOPT)37; // This has been deprecated a long time ago, treat this as never implemented. static const SRT_SOCKOPT SRTO_TSBPDMAXLAG SRT_ATR_DEPRECATED = (SRT_SOCKOPT)32; +// This option is a derivative from UDT; the mechanism that uses it is now +// known as Smoother and settable by SRTO_SMOOTHER, or more generally by +// SRTO_TRANSTYPE. The freed number will be reused for some other +// option. This option should have never been used anywhere, just for safety +// this is temporarily declared as deprecated. +static const SRT_SOCKOPT SRTO_CC SRT_ATR_DEPRECATED = (SRT_SOCKOPT)3; + +// These two flags were derived from UDT, but they were never used. +// Probably it didn't make sense anyway. The maximum size of the message +// in File/Message mode is defined by SRTO_SNDBUF, and the MSGTTL is +// a parameter used in `srt_sendmsg` and `srt_sendmsg2`. +static const SRT_SOCKOPT SRTO_MAXMSG SRT_ATR_DEPRECATED = (SRT_SOCKOPT)10; +static const SRT_SOCKOPT SRTO_MSGTTL SRT_ATR_DEPRECATED = (SRT_SOCKOPT)11; + +// These flags come from an older experimental implementation of bidirectional +// encryption support, which were used two different SEKs, KEKs and passphrases +// per direction. The current implementation uses just one in both directions, +// so SRTO_PBKEYLEN should be used for both cases. +static const SRT_SOCKOPT SRTO_SNDPBKEYLEN SRT_ATR_DEPRECATED = (SRT_SOCKOPT)38; +static const SRT_SOCKOPT SRTO_RCVPBKEYLEN SRT_ATR_DEPRECATED = (SRT_SOCKOPT)39; typedef enum SRT_TRANSTYPE { SRTT_LIVE, - SRTT_VOD, + SRTT_FILE, SRTT_INVALID } SRT_TRANSTYPE; diff --git a/srtcore/srt4udt.h b/srtcore/srt4udt.h index dd3d2d9d5..034026c20 100644 --- a/srtcore/srt4udt.h +++ b/srtcore/srt4udt.h @@ -30,7 +30,6 @@ written by #endif //undef SRT_ENABLE_ECN 1 /* Early Congestion Notification (for source bitrate control) */ -#define SRT_ENABLE_SRCTIMESTAMP 1 /* Support timestamp carryover from one SRT connection (Rx) to the next (Tx) */ #define SRT_ENABLE_CBRTIMESTAMP 1 /* Set timestamp for Constant Bit Rate flow (requires SRCTIMESTAMP) */ //undef SRT_DEBUG_TSBPD_OUTJITTER 1 /* Packet Delivery histogram */ @@ -40,12 +39,6 @@ written by //undef SRT_DEBUG_SNDQ_HIGHRATE 1 -/* -* SRT_ENABLE_FASTREXMIT -* Earlier [re-]retransmission of lost retransmitted packets -*/ -#define SRT_ENABLE_FASTREXMIT 1 - /* * SRT_ENABLE_CONNTIMEO * Option UDT_CONNTIMEO added to the API to set/get the connection timeout. diff --git a/srtcore/srt_c_api.cpp b/srtcore/srt_c_api.cpp index 2b53d6ae9..6abdf265d 100644 --- a/srtcore/srt_c_api.cpp +++ b/srtcore/srt_c_api.cpp @@ -34,15 +34,15 @@ extern "C" { int srt_startup() { return CUDT::startup(); } int srt_cleanup() { return CUDT::cleanup(); } -UDTSOCKET srt_socket(int af, int type, int protocol) { return CUDT::socket(af, type, protocol); } -int srt_bind(UDTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::bind(u, name, namelen); } -int srt_bind_peerof(UDTSOCKET u, UDPSOCKET udpsock) { return CUDT::bind(u, udpsock); } -int srt_listen(UDTSOCKET u, int backlog) { return CUDT::listen(u, backlog); } -UDTSOCKET srt_accept(UDTSOCKET u, struct sockaddr * addr, int * addrlen) { return CUDT::accept(u, addr, addrlen); } -int srt_connect(UDTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::connect(u, name, namelen, 0); } -int srt_connect_debug(UDTSOCKET u, const struct sockaddr * name, int namelen, int32_t forced_isn) { return CUDT::connect(u, name, namelen, forced_isn); } - -int srt_rendezvous(UDTSOCKET u, const struct sockaddr* local_name, int local_namelen, +SRTSOCKET srt_socket(int af, int type, int protocol) { return CUDT::socket(af, type, protocol); } +int srt_bind(SRTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::bind(u, name, namelen); } +int srt_bind_peerof(SRTSOCKET u, UDPSOCKET udpsock) { return CUDT::bind(u, udpsock); } +int srt_listen(SRTSOCKET u, int backlog) { return CUDT::listen(u, backlog); } +SRTSOCKET srt_accept(SRTSOCKET u, struct sockaddr * addr, int * addrlen) { return CUDT::accept(u, addr, addrlen); } +int srt_connect(SRTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::connect(u, name, namelen, 0); } +int srt_connect_debug(SRTSOCKET u, const struct sockaddr * name, int namelen, int32_t forced_isn) { return CUDT::connect(u, name, namelen, forced_isn); } + +int srt_rendezvous(SRTSOCKET u, const struct sockaddr* local_name, int local_namelen, const struct sockaddr* remote_name, int remote_namelen) { bool yes = 1; @@ -67,7 +67,7 @@ int srt_rendezvous(UDTSOCKET u, const struct sockaddr* local_name, int local_nam return srt_connect(u, remote_name, remote_namelen); } -int srt_close(UDTSOCKET u) +int srt_close(SRTSOCKET u) { SRT_SOCKSTATUS st = srt_getsockstate(u); @@ -82,24 +82,24 @@ int srt_close(UDTSOCKET u) return CUDT::close(u); } -int srt_getpeername(UDTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getpeername(u, name, namelen); } -int srt_getsockname(UDTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getsockname(u, name, namelen); } -int srt_getsockopt(UDTSOCKET u, int level, SRT_SOCKOPT optname, void * optval, int * optlen) +int srt_getpeername(SRTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getpeername(u, name, namelen); } +int srt_getsockname(SRTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getsockname(u, name, namelen); } +int srt_getsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, void * optval, int * optlen) { return CUDT::getsockopt(u, level, optname, optval, optlen); } -int srt_setsockopt(UDTSOCKET u, int level, SRT_SOCKOPT optname, const void * optval, int optlen) +int srt_setsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, const void * optval, int optlen) { return CUDT::setsockopt(u, level, optname, optval, optlen); } -int srt_getsockflag(UDTSOCKET u, SRT_SOCKOPT opt, void* optval, int* optlen) +int srt_getsockflag(SRTSOCKET u, SRT_SOCKOPT opt, void* optval, int* optlen) { return CUDT::getsockopt(u, 0, opt, optval, optlen); } -int srt_setsockflag(UDTSOCKET u, SRT_SOCKOPT opt, const void* optval, int optlen) +int srt_setsockflag(SRTSOCKET u, SRT_SOCKOPT opt, const void* optval, int optlen) { return CUDT::setsockopt(u, 0, opt, optval, optlen); } -int srt_send(UDTSOCKET u, const char * buf, int len, int flags) { return CUDT::send(u, buf, len, flags); } -int srt_recv(UDTSOCKET u, char * buf, int len, int flags) { return CUDT::recv(u, buf, len, flags); } -int srt_sendmsg(UDTSOCKET u, const char * buf, int len, int ttl, int inorder) { return CUDT::sendmsg(u, buf, len, ttl, 0!= inorder); } -int srt_recvmsg(UDTSOCKET u, char * buf, int len) { return CUDT::recvmsg(u, buf, len); } +int srt_send(SRTSOCKET u, const char * buf, int len, int flags) { return CUDT::send(u, buf, len, flags); } +int srt_recv(SRTSOCKET u, char * buf, int len, int flags) { return CUDT::recv(u, buf, len, flags); } +int srt_sendmsg(SRTSOCKET u, const char * buf, int len, int ttl, int inorder) { return CUDT::sendmsg(u, buf, len, ttl, 0!= inorder); } +int srt_recvmsg(SRTSOCKET u, char * buf, int len) { return CUDT::recvmsg(u, buf, len); } -int srt_sendmsg2(UDTSOCKET u, const char * buf, int len, SRT_MSGCTRL *mctrl) +int srt_sendmsg2(SRTSOCKET u, const char * buf, int len, SRT_MSGCTRL *mctrl) { if (mctrl) return CUDT::sendmsg(u, buf, len, -1, true, mctrl->srctime); @@ -107,7 +107,7 @@ int srt_sendmsg2(UDTSOCKET u, const char * buf, int len, SRT_MSGCTRL *mctrl) return CUDT::sendmsg(u, buf, len); } -int srt_recvmsg2(UDTSOCKET u, char * buf, int len, SRT_MSGCTRL *mctrl) +int srt_recvmsg2(SRTSOCKET u, char * buf, int len, SRT_MSGCTRL *mctrl) { uint64_t srctime = 0; int rc = CUDT::recvmsg(u, buf, len, srctime); @@ -143,17 +143,17 @@ void srt_clearlasterror() UDT::getlasterror().clear(); } -int srt_perfmon(UDTSOCKET u, SRT_TRACEINFO * perf, int clear) { return CUDT::perfmon(u, perf, 0!= clear); } -int srt_bstats(UDTSOCKET u, SRT_TRACEBSTATS * perf, int clear) { return CUDT::bstats(u, perf, 0!= clear); } +int srt_perfmon(SRTSOCKET u, SRT_TRACEINFO * perf, int clear) { return CUDT::perfmon(u, perf, 0!= clear); } +int srt_bstats(SRTSOCKET u, SRT_TRACEBSTATS * perf, int clear) { return CUDT::bstats(u, perf, 0!= clear); } -SRT_SOCKSTATUS srt_getsockstate(UDTSOCKET u) { return SRT_SOCKSTATUS((int)CUDT::getsockstate(u)); } +SRT_SOCKSTATUS srt_getsockstate(SRTSOCKET u) { return SRT_SOCKSTATUS((int)CUDT::getsockstate(u)); } // event mechanism int srt_epoll_create() { return CUDT::epoll_create(); } // You can use either SRT_EPOLL_* flags or EPOLL* flags from , both are the same. IN/OUT/ERR only. // events == NULL accepted, in which case all flags are set. -int srt_epoll_add_usock(int eid, UDTSOCKET u, const int * events) { return CUDT::epoll_add_usock(eid, u, events); } +int srt_epoll_add_usock(int eid, SRTSOCKET u, const int * events) { return CUDT::epoll_add_usock(eid, u, events); } int srt_epoll_add_ssock(int eid, SYSSOCKET s, const int * events) { @@ -179,10 +179,10 @@ int srt_epoll_add_ssock(int eid, SYSSOCKET s, const int * events) return CUDT::epoll_add_ssock(eid, s, &flag); } -int srt_epoll_remove_usock(int eid, UDTSOCKET u) { return CUDT::epoll_remove_usock(eid, u); } +int srt_epoll_remove_usock(int eid, SRTSOCKET u) { return CUDT::epoll_remove_usock(eid, u); } int srt_epoll_remove_ssock(int eid, SYSSOCKET s) { return CUDT::epoll_remove_ssock(eid, s); } -int srt_epoll_update_usock(int eid, UDTSOCKET u, const int * events) +int srt_epoll_update_usock(int eid, SRTSOCKET u, const int * events) { int srt_ev = 0; @@ -221,7 +221,7 @@ int srt_epoll_update_ssock(int eid, SYSSOCKET s, const int * events) int srt_epoll_wait( int eid, - UDTSOCKET* readfds, int* rnum, UDTSOCKET* writefds, int* wnum, + SRTSOCKET* readfds, int* rnum, SRTSOCKET* writefds, int* wnum, int64_t msTimeOut, SYSSOCKET* lrfds, int* lrnum, SYSSOCKET* lwfds, int* lwnum) { diff --git a/srtcore/udt.h b/srtcore/udt.h index 659fedb50..bead29f11 100644 --- a/srtcore/udt.h +++ b/srtcore/udt.h @@ -352,13 +352,8 @@ UDT_API int setsockopt(UDTSOCKET u, int level, SRT_SOCKOPT optname, const void* UDT_API int send(UDTSOCKET u, const char* buf, int len, int flags); UDT_API int recv(UDTSOCKET u, char* buf, int len, int flags); -// If SRT_ENABLE_SRCTIMESTAMP is NOT enabled, 'srctime' argument is ignored. UDT_API int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false, uint64_t srctime = 0LL); -#ifdef SRT_ENABLE_SRCTIMESTAMP -// For non-SRCTIMESTAMP, this version is not available for the application UDT_API int recvmsg(UDTSOCKET u, char* buf, int len, uint64_t& srctime); -#endif -// For SRCTIMESTAMP this version is still available, it just ignores the received timestamp. UDT_API int recvmsg(UDTSOCKET u, char* buf, int len); UDT_API int64_t sendfile(UDTSOCKET u, std::fstream& ifs, int64_t& offset, int64_t size, int block = 364000); From f85de73b070f80ab1466764d026c496c7ad3fe62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Ma=C5=82ecki?= Date: Thu, 5 Oct 2017 12:30:03 +0200 Subject: [PATCH 2/2] Withdrawn a fix for drift tracer locking --- srtcore/buffer.cpp | 6 +----- srtcore/buffer.h | 2 +- srtcore/core.cpp | 10 +++------- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index ccf905680..c67fe124c 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -1449,7 +1449,7 @@ void CRcvBuffer::printDriftOffset(int tsbPdOffset, int tsbPdDriftAvg) } #endif /* SRT_DEBUG_TSBPD_DRIFT */ -void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mutex_to_lock) +void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp) { if (!m_bTsbPdMode) // Not checked unless in TSBPD mode return; @@ -1472,9 +1472,6 @@ void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mut // either schedule time or a time supplied by the application). int64_t iDrift = CTimer::getTime() - (getTsbPdTimeBase(timestamp) + timestamp); - - CGuard::enterCS(mutex_to_lock); - bool updated = m_DriftTracer.update(iDrift); #ifdef SRT_DEBUG_TSBPD_DRIFT @@ -1490,7 +1487,6 @@ void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mut m_ullTsbPdTimeBase += m_DriftTracer.overdrift(); } - CGuard::leaveCS(mutex_to_lock); } int CRcvBuffer::readMsg(char* data, int len) diff --git a/srtcore/buffer.h b/srtcore/buffer.h index d2f830f19..fd416bd3f 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -356,7 +356,7 @@ class CRcvBuffer /// Add packet timestamp for drift caclculation and compensation /// @param [in] timestamp packet time stamp - void addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mutex_to_lock); + void addRcvTsbPdDriftSample(uint32_t timestamp); #ifdef SRT_DEBUG_TSBPD_DRIFT void printDriftHistogram(int64_t iDrift); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index c796c5c54..8da9b4f7b 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6109,13 +6109,9 @@ void CUDT::processCtrl(CPacket& ctrlpkt) updateCC(TEV_ACKACK, ack); - // This function will put a lock on m_RecvLock by itself, as needed. - // It must be done inside because this function reads the current time - // and if waiting for the lock has caused a delay, the time will be - // inaccurate. Additionally it won't lock if TSBPD mode is off, and - // won't update anything. Note that if you set TSBPD mode and use - // srt_recvfile (which doesn't make any sense), you'll have e deadlock. - m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), m_RecvLock); + CGuard::enterCS(m_RecvLock); + m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp()); + CGuard::leaveCS(m_RecvLock); // update last ACK that has been received by the sender if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)