Skip to content

Commit

Permalink
UDP engine aborts on networking-related errors from socket syscalls z…
Browse files Browse the repository at this point in the history
  • Loading branch information
atomashpolskiy committed Aug 22, 2019
1 parent fba7c34 commit 5db9915
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions src/udp_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)

// Bind the socket to a device if applicable
if (!_options.bound_device.empty ())
rc |= bind_to_device (_fd, _options.bound_device);
rc = rc | bind_to_device (_fd, _options.bound_device);

if (_send_enabled) {
if (!_options.raw_socket) {
Expand All @@ -130,15 +130,15 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)

if (out->is_multicast ()) {
bool is_ipv6 = (out->family () == AF_INET6);
rc |= set_udp_multicast_loop (_fd, is_ipv6,
rc = rc | set_udp_multicast_loop (_fd, is_ipv6,
_options.multicast_loop);

if (_options.multicast_hops > 0) {
rc |= set_udp_multicast_ttl (_fd, is_ipv6,
rc = rc | set_udp_multicast_ttl (_fd, is_ipv6,
_options.multicast_hops);
}

rc |= set_udp_multicast_iface (_fd, is_ipv6, udp_addr);
rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr);
}
} else {
/// XXX fixme ?
Expand All @@ -149,7 +149,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
}

if (_recv_enabled) {
rc |= set_udp_reuse_address (_fd, true);
rc = rc | set_udp_reuse_address (_fd, true);

const ip_addr_t *bind_addr = udp_addr->bind_addr ();
ip_addr_t any = ip_addr_t::any (bind_addr->family ());
Expand All @@ -161,7 +161,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
// Multicast addresses should be allowed to bind to more than
// one port as all ports should receive the message
#ifdef SO_REUSEPORT
rc |= set_udp_reuse_port (_fd, true);
rc = rc | set_udp_reuse_port (_fd, true);
#endif

// In multicast we should bind ANY and use the mreq struct to
Expand All @@ -174,15 +174,15 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
}

#ifdef ZMQ_HAVE_VXWORKS
rc |= bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
rc = rc | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
real_bind_addr->sockaddr_len ());
#else
rc |= bind (_fd, real_bind_addr->as_sockaddr (),
rc = rc | bind (_fd, real_bind_addr->as_sockaddr (),
real_bind_addr->sockaddr_len ());
#endif

if (multicast) {
rc |= add_membership (_fd, udp_addr);
rc = rc | add_membership (_fd, udp_addr);
}
}

Expand Down Expand Up @@ -277,17 +277,21 @@ int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_)

int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_)
{
#ifndef SO_REUSEPORT
return 0;
#else
int on = on_ ? 1 : 0;
int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT,
reinterpret_cast<char *> (&on), sizeof (on));
assert_socket_tuning_error (s_, rc);
return rc;
#endif
}

int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
{
const ip_addr_t *mcast_addr = addr_->target_addr ();
int rc;
int rc = 0;

if (mcast_addr->family () == AF_INET) {
struct ip_mreq mreq;
Expand Down

0 comments on commit 5db9915

Please sign in to comment.