Skip to content

Commit

Permalink
UDP engine aborts on networking-related errors from socket syscalls z…
Browse files Browse the repository at this point in the history
  • Loading branch information
atomashpolskiy committed Aug 22, 2019
1 parent 56ace6d commit 4e2c85e
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 173 deletions.
44 changes: 44 additions & 0 deletions src/ip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,47 @@ void zmq::make_socket_noninheritable (fd_t sock_)
LIBZMQ_UNUSED (sock_);
#endif
}

void zmq::assert_socket_tuning_error (zmq::fd_t s_, int rc_)
{
if (rc_ == 0)
return;

// Check whether an error occurred
int err = 0;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
#endif

int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);

// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
|| err == WSAECONNABORTED || err == WSAEINTR
|| err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
|| err == WSAENETUNREACH || err == WSAENETDOWN
|| err == WSAENETRESET || err == WSAEACCES
|| err == WSAEINVAL || err == WSAEADDRINUSE);
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if (rc == -1)
err = errno;
if (err != 0) {
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
|| errno == ECONNABORTED || errno == EINTR
|| errno == ETIMEDOUT || errno == EHOSTUNREACH
|| errno == ENETUNREACH || errno == ENETDOWN
|| errno == ENETRESET || errno == EINVAL);
}
#endif
}
4 changes: 4 additions & 0 deletions src/ip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ int make_fdpair (fd_t *r_, fd_t *w_);
// Makes a socket non-inheritable to child processes.
// Asserts on any failure.
void make_socket_noninheritable (fd_t sock_);

// Asserts that an internal error did not occur. Does not assert
// on network errors such as reset or aborted connections.
void assert_socket_tuning_error (fd_t s_, int rc_);
}

#endif
65 changes: 13 additions & 52 deletions src/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ int zmq::tune_tcp_socket (fd_t s_)
int nodelay = 1;
int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *> (&nodelay), sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;

Expand All @@ -71,7 +71,7 @@ int zmq::tune_tcp_socket (fd_t s_)
int nodelack = 1;
rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack,
sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
#endif
return rc;
}
Expand All @@ -81,7 +81,7 @@ int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
const int rc =
setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
}

Expand All @@ -90,7 +90,7 @@ int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
const int rc =
setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
}

Expand Down Expand Up @@ -123,7 +123,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
sizeof (keepalive_opts), NULL, 0,
&num_bytes_returned, NULL, NULL);
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc == SOCKET_ERROR)
return rc;
}
Expand All @@ -133,15 +133,15 @@ int zmq::tune_tcp_keepalives (fd_t s_,
int rc =
setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
reinterpret_cast<char *> (&keepalive_), sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;

#ifdef ZMQ_HAVE_TCP_KEEPCNT
if (keepalive_cnt_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_,
sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -151,7 +151,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
&keepalive_idle_, sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -160,7 +160,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
&keepalive_idle_, sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -171,7 +171,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_intvl_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
&keepalive_intvl_, sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
Expand All @@ -196,13 +196,13 @@ int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
int rc =
setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT,
reinterpret_cast<char *> (&timeout_), sizeof (timeout_));
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
// FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
#elif defined(TCP_USER_TIMEOUT)
int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
sizeof (timeout_));
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
#else
return 0;
Expand Down Expand Up @@ -318,46 +318,7 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)

void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_)
{
if (rc_ == 0)
return;

// Check whether an error occurred
int err = 0;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
#endif

int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);

// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
|| err == WSAECONNABORTED || err == WSAEINTR
|| err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
|| err == WSAENETUNREACH || err == WSAENETDOWN
|| err == WSAENETRESET || err == WSAEACCES
|| err == WSAEINVAL || err == WSAEADDRINUSE);
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if (rc == -1)
err = errno;
if (err != 0) {
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
|| errno == ECONNABORTED || errno == EINTR
|| errno == ETIMEDOUT || errno == EHOSTUNREACH
|| errno == ENETUNREACH || errno == ENETDOWN
|| errno == ENETRESET || errno == EINVAL);
}
#endif
assert_socket_tuning_error (s_, rc_);
}

void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
Expand Down
Loading

0 comments on commit 4e2c85e

Please sign in to comment.