Skip to content

Commit

Permalink
Add retry on zmq functions if fail with EINTR. (sonic-net#1109)
Browse files Browse the repository at this point in the history
  • Loading branch information
mint570 committed Sep 2, 2022
1 parent 594b242 commit e8a01a8
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 44 deletions.
97 changes: 53 additions & 44 deletions lib/ZeroMQChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using namespace sairedis;

#define ZMQ_RESPONSE_BUFFER_SIZE (4*1024*1024)
#define ZMQ_MAX_RETRY 10

ZeroMQChannel::ZeroMQChannel(
_In_ const std::string& endpoint,
Expand Down Expand Up @@ -219,14 +220,22 @@ void ZeroMQChannel::set(

SWSS_LOG_DEBUG("sending: %s", msg.c_str());

int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0);

if (rc <= 0)
for (int i = 0; true ; ++i)
{
SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d: %s",
m_endpoint.c_str(),
zmq_errno(),
zmq_strerror(zmq_errno()));
int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0);

if (rc <= 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY)
{
continue;
}
if (rc <= 0)
{
SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d: %s",
m_endpoint.c_str(),
zmq_errno(),
zmq_strerror(zmq_errno()));
}
break;
}
}

Expand All @@ -238,23 +247,7 @@ void ZeroMQChannel::del(

std::vector<swss::FieldValueTuple> values;

swss::FieldValueTuple opdata(key, command);

values.insert(values.begin(), opdata);

std::string msg = swss::JSon::buildJson(values);

SWSS_LOG_DEBUG("sending: %s", msg.c_str());

int rc = zmq_send(m_socket, msg.c_str(), msg.length(), 0);

if (rc <= 0)
{
SWSS_LOG_THROW("zmq_send failed, on endpoint %s, zmqerrno: %d: %s",
m_endpoint.c_str(),
zmq_errno(),
zmq_strerror(zmq_errno()));
}
set(key, values, command);
}

sai_status_t ZeroMQChannel::wait(
Expand All @@ -270,35 +263,51 @@ sai_status_t ZeroMQChannel::wait(
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;

int rc = zmq_poll(items, 1, (int)m_responseTimeoutMs);
int rc;

if (rc == 0)
for (int i = 0; true ; ++i)
{
SWSS_LOG_ERROR("zmq_poll timed out for: %s", command.c_str());
rc = zmq_poll(items, 1, (int)m_responseTimeoutMs);

// notice, at this point we could throw, since in REP/REQ pattern
// we are forced to use send/recv in that specific order
if (rc == 0)
{
SWSS_LOG_ERROR("zmq_poll timed out for: %s", command.c_str());

return SAI_STATUS_FAILURE;
}
// notice, at this point we could throw, since in REP/REQ pattern
// we are forced to use send/recv in that specific order

if (rc < 0)
{
SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno());
return SAI_STATUS_FAILURE;
}
if (rc < 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY)
{
continue;
}
if (rc < 0)
{
SWSS_LOG_THROW("zmq_poll failed, zmqerrno: %d", zmq_errno());
}
break;
}

rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);

if (rc < 0)
for (int i = 0; true ; ++i)
{
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
}
rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);

if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
{
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
ZMQ_RESPONSE_BUFFER_SIZE,
rc);
if (rc < 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY)
{
continue;
}
if (rc < 0)
{
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
}
if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
{
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
ZMQ_RESPONSE_BUFFER_SIZE,
rc);
}
break;
}

m_buffer.at(rc) = 0; // make sure that we end string with zero before parse
Expand Down
55 changes: 55 additions & 0 deletions tests/tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "meta/sai_serialize.h"

#include <unistd.h>
#include <signal.h>
#include <thread>
#include <memory>

Expand All @@ -19,6 +21,18 @@ using namespace sairedis;

#define ASSERT_EQ(a,b) if ((a) != (b)) { SWSS_LOG_THROW("ASSERT EQ FAILED: " #a " != " #b); }

#define ASSERT_THROW(a,b) \
try { \
a; \
SWSS_LOG_ERROR("ASSERT_THROW FAILED"); \
exit(1); \
} \
catch(const b &e) { \
} \
catch(...) { \
SWSS_LOG_THROW("ASSERT_THROW FAILED"); \
}

/*
* Test if destructor proper clean and join zeromq socket and context, and
* break recv method.
Expand Down Expand Up @@ -86,13 +100,54 @@ static void test_zeromqchannel_first_notification()
}
}

void send_signals()
{
SWSS_LOG_ENTER();
pid_t pid = getpid();
for (int i = 0; i < 11; ++i)
{
sleep(1);
kill(pid, SIGHUP);
}
};

/*
* Test if runtime_error will be thrown if zmq wait reaches max retry due to
* signal interrupt.
*/
static void test_zeromqchannel_eintr_errno_on_wait()
{
SWSS_LOG_ENTER();

std::cout << " * " << __FUNCTION__ << std::endl;

ZeroMQChannel z("ipc:///tmp/feeds1", "ipc:///tmp/feeds2", nullptr);
z.setResponseTimeout(60000);

std::thread signal_thread(send_signals);

swss::KeyOpFieldsValuesTuple kco;
ASSERT_THROW(z.wait("foo", kco), std::runtime_error);

signal_thread.join();
}

void sighup_handler(int signo)
{
SWSS_LOG_ENTER();
}

int main()
{
SWSS_LOG_ENTER();

signal(SIGHUP, sighup_handler);

test_zeromqchannel_destructor();

test_zeromqchannel_first_notification();

test_zeromqchannel_eintr_errno_on_wait();

return 0;
}

0 comments on commit e8a01a8

Please sign in to comment.