Skip to content

Commit

Permalink
Added ::select to Windows sockets.
Browse files Browse the repository at this point in the history
This fixes the high CPU load issue on Windows with non-blocking sockets (after PR Haivision#483).
Fixed type casting warnings in channel.cpp.
  • Loading branch information
maxsharabayko committed Dec 5, 2018
1 parent b1cabc0 commit 3a0184c
Showing 1 changed file with 55 additions and 36 deletions.
91 changes: 55 additions & 36 deletions srtcore/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ void CChannel::open(const sockaddr* addr)
if (0 != ::getaddrinfo(NULL, "0", &hints, &res))
throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR);

if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen))
if (0 != ::bind(m_iSocket, res->ai_addr, (int) res->ai_addrlen))
throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR);
memcpy(&m_BindAddr, res->ai_addr, res->ai_addrlen);
m_BindAddr.len = res->ai_addrlen;
m_BindAddr.len = (socklen_t) res->ai_addrlen;

::freeaddrinfo(res);
}
Expand Down Expand Up @@ -390,7 +390,7 @@ int CChannel::sendto(const sockaddr* addr, CPacket& packet) const
// convert control information into network order
// XXX USE HtoNLA!
if (packet.isControl())
for (int i = 0, n = packet.getLength() / 4; i < n; ++ i)
for (ptrdiff_t i = 0, n = (ptrdiff_t) packet.getLength() / 4; i < n; ++i)
*((uint32_t *)packet.m_pcData + i) = htonl(*((uint32_t *)packet.m_pcData + i));

// convert packet header into network order
Expand All @@ -415,7 +415,7 @@ int CChannel::sendto(const sockaddr* addr, CPacket& packet) const

int res = ::sendmsg(m_iSocket, &mh, 0);
#else
DWORD size = CPacket::HDR_SIZE + packet.getLength();
DWORD size = (DWORD) (CPacket::HDR_SIZE + packet.getLength());
int addrsize = m_iSockAddrSize;
int res = ::WSASendTo(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, 0, addr, addrsize, NULL, NULL);
res = (0 == res) ? size : -1;
Expand All @@ -433,7 +433,7 @@ int CChannel::sendto(const sockaddr* addr, CPacket& packet) const

if (packet.isControl())
{
for (int l = 0, n = packet.getLength() / 4; l < n; ++ l)
for (ptrdiff_t l = 0, n = packet.getLength() / 4; l < n; ++ l)
*((uint32_t *)packet.m_pcData + l) = ntohl(*((uint32_t *)packet.m_pcData + l));
}

Expand All @@ -444,29 +444,42 @@ EReadStatus CChannel::recvfrom(sockaddr* addr, CPacket& packet) const
{
EReadStatus status = RST_OK;

#ifndef _WIN32
msghdr mh;
mh.msg_name = addr;
mh.msg_namelen = m_iSockAddrSize;
mh.msg_iov = packet.m_PacketVector;
mh.msg_iovlen = 2;
mh.msg_control = NULL;
mh.msg_controllen = 0;
mh.msg_flags = 0;

#ifdef UNIX
#if defined(UNIX) || defined(_WIN32)
fd_set set;
timeval tv;
FD_ZERO(&set);
FD_SET(m_iSocket, &set);
tv.tv_sec = 0;
tv.tv_sec = 0;
tv.tv_usec = 10000;
::select(m_iSocket+1, &set, NULL, &set, &tv);
const int select_ret = ::select((int) m_iSocket + 1, &set, NULL, &set, &tv);
#else
const int select_ret = 1; // the socket is expected to be in the blocking mode itself
#endif

int res = ::recvmsg(m_iSocket, &mh, 0);
int msg_flags = mh.msg_flags;
if (select_ret == 0) // timeout
{
status = RST_AGAIN;
goto Return_error;
}

#ifndef _WIN32

int msg_flags = 0;
int recv_size = -1;
if (select_ret > 0)
{
msghdr mh;
mh.msg_name = addr;
mh.msg_namelen = m_iSockAddrSize;
mh.msg_iov = packet.m_PacketVector;
mh.msg_iovlen = 2;
mh.msg_control = NULL;
mh.msg_controllen = 0;
mh.msg_flags = 0;

recv_size = ::recvmsg(m_iSocket, &mh, 0);
msg_flags = mh.msg_flags;
}

// Note that there are exactly four groups of possible errors
// reported by recvmsg():
Expand All @@ -489,9 +502,10 @@ EReadStatus CChannel::recvfrom(sockaddr* addr, CPacket& packet) const
// Return: RST_ERROR. This will simply make the worker thread exit, which is
// expected to happen after CChannel::close() is called by another thread.

if (res == -1)
// We do not handle <= SOCKET_ERROR as they are handled further by checking the recv_size
if (select_ret == -1 || recv_size == -1)
{
int err = NET_ERROR;
const int err = NET_ERROR;
if (err == EAGAIN || err == EINTR || err == ECONNREFUSED) // For EAGAIN, this isn't an error, just a useless call.
{
status = RST_AGAIN;
Expand Down Expand Up @@ -522,20 +536,25 @@ EReadStatus CChannel::recvfrom(sockaddr* addr, CPacket& packet) const
// value one Windows than 0, unless this procedure below is rewritten
// to use WSARecvMsg().

DWORD size = CPacket::HDR_SIZE + packet.getLength();
int recv_size = -1;
int recv_ret = SOCKET_ERROR;
int msg_flags = 0;
DWORD flag = 0;
int addrsize = m_iSockAddrSize;

int msg_flags = 0;
int sockerror = ::WSARecvFrom(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, &flag, addr, &addrsize, NULL, NULL);
int res;
if (sockerror == 0)
if (select_ret > 0) // the total number of socket handles that are ready
{
res = size;
DWORD size = (DWORD) (CPacket::HDR_SIZE + packet.getLength());
int addrsize = m_iSockAddrSize;

recv_ret = ::WSARecvFrom(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, &flag, addr, &addrsize, NULL, NULL);
if (recv_ret == 0)
recv_size = size;
}
else // == SOCKET_ERROR

// We do not handle <= SOCKET_ERROR as they are handled further by checking the recv_size
if (select_ret == SOCKET_ERROR || recv_ret == SOCKET_ERROR) // == SOCKET_ERROR
{
res = -1;
recv_size = -1;
// On Windows this is a little bit more complicated, so simply treat every error
// as an "again" situation. This should still be probably fixed, but it needs more
// thorough research. For example, the problem usually reported from here is
Expand All @@ -551,7 +570,7 @@ EReadStatus CChannel::recvfrom(sockaddr* addr, CPacket& packet) const
WSA_OPERATION_ABORTED
};
static const int* fatals_end = fatals + Size(fatals);
int err = NET_ERROR;
const int err = NET_ERROR;
if (std::find(fatals, fatals_end, err) != fatals_end)
{
HLOGC(mglog.Debug, log << CONID() << "(sys)WSARecvFrom: " << SysStrError(err) << " [" << err << "]");
Expand All @@ -572,10 +591,10 @@ EReadStatus CChannel::recvfrom(sockaddr* addr, CPacket& packet) const


// Sanity check for a case when it didn't fill in even the header
if ( size_t(res) < CPacket::HDR_SIZE )
if (size_t(recv_size) < CPacket::HDR_SIZE)
{
status = RST_AGAIN;
HLOGC(mglog.Debug, log << CONID() << "POSSIBLE ATTACK: received too short packet with " << res << " bytes");
HLOGC(mglog.Debug, log << CONID() << "POSSIBLE ATTACK: received too short packet with " << recv_size << " bytes");
goto Return_error;
}

Expand All @@ -598,13 +617,13 @@ EReadStatus CChannel::recvfrom(sockaddr* addr, CPacket& packet) const
// packet was received, so the packet will be then retransmitted.
if ( msg_flags != 0 )
{
HLOGC(mglog.Debug, log << CONID() << "NET ERROR: packet size=" << res
HLOGC(mglog.Debug, log << CONID() << "NET ERROR: packet size=" << recv_size
<< " msg_flags=0x" << hex << msg_flags << ", possibly MSG_TRUNC (0x" << hex << int(MSG_TRUNC) << ")");
status = RST_AGAIN;
goto Return_error;
}

packet.setLength(res - CPacket::HDR_SIZE);
packet.setLength(recv_size - CPacket::HDR_SIZE);

// convert back into local host order
// XXX use NtoHLA().
Expand Down

0 comments on commit 3a0184c

Please sign in to comment.