Skip to content

Commit

Permalink
ZMQ lib (wait).
Browse files Browse the repository at this point in the history
  • Loading branch information
divyagayathri-hcl committed Oct 29, 2024
1 parent 352234a commit 646ee51
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 28 deletions.
62 changes: 62 additions & 0 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,66 @@ void ZmqClient::sendMsg(
throw system_error(make_error_code(errc::io_error), message);
}

bool ZmqClient::wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,

std::vector<char>& buffer)

{

SWSS_LOG_ENTER();

int rc;

for (int i = 0; true ; ++i)

{

rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0);

if (rc < 0)

{

if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)

{

continue;

}

SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());

}

if (rc >= (int)buffer.size())

{

SWSS_LOG_THROW(

"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",

(int)buffer.size(), rc);

}

break;

}

buffer.at(rc) = 0; // make sure that we end string with zero before parse

kcos.clear();

BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos);

return true;

}

}
10 changes: 10 additions & 0 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace swss {
class ZmqClient
{
public:

ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
~ZmqClient();
Expand All @@ -24,6 +25,15 @@ class ZmqClient
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);

bool wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,

std::vector<char>& buffer);

private:
void initialize(const std::string& endpoint, const std::string& vrf);

Expand Down
12 changes: 12 additions & 0 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos
}
}

bool ZmqProducerStateTable::wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)

{

return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer);

}

size_t ZmqProducerStateTable::dbUpdaterQueueSize()
{
if (m_asyncDBUpdater == nullptr)
Expand Down
8 changes: 8 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class ZmqProducerStateTable : public ProducerStateTable
// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

// This method should only be used if the ZmqClient enables one-to-one sync.

virtual bool wait(std::string& dbName,

std::string& tableName,

std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

size_t dbUpdaterQueueSize();
private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);
Expand Down
43 changes: 15 additions & 28 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ using namespace std;
namespace swss {

ZmqServer::ZmqServer(const std::string& endpoint)
: ZmqServer(endpoint, "")
: m_endpoint(endpoint)
{
}

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
m_vrf(vrf), m_allowZmqPoll(true)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
Expand All @@ -29,8 +29,14 @@ ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)

ZmqServer::~ZmqServer()
{
m_allowZmqPoll = true;
m_runThread = false;
m_mqPollThread->join();

zmq_close(m_socket);

zmq_ctx_destroy(m_context);

}

void ZmqServer::registerMessageHandler(
Expand Down Expand Up @@ -90,39 +96,20 @@ void ZmqServer::mqPollThread()
SWSS_LOG_ENTER();
SWSS_LOG_NOTICE("mqPollThread begin");

// Producer/Consumer state table are n:1 mapping, so need use PUSH/PULL pattern http://api.zeromq.org/master:zmq-socket
void* context = zmq_ctx_new();;
void* socket = zmq_socket(context, ZMQ_PULL);

// Increase recv buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
SWSS_LOG_THROW("zmq_bind failed on endpoint: %s, zmqerrno: %d",
m_endpoint.c_str(),
zmq_errno());
}

// zmq_poll will use less CPU
zmq_pollitem_t poll_item;
poll_item.fd = 0;
poll_item.socket = socket;
poll_item.socket = m_socket;
poll_item.events = ZMQ_POLLIN;
poll_item.revents = 0;

SWSS_LOG_NOTICE("bind to zmq endpoint: %s", m_endpoint.c_str());
while (m_runThread)
{
m_allowZmqPoll = false;

// receive message
rc = zmq_poll(&poll_item, 1, 1000);
auto rc = zmq_poll(&poll_item, 1, 1000);
if (rc == 0 || !(poll_item.revents & ZMQ_POLLIN))
{
// timeout or other event
Expand All @@ -131,7 +118,7 @@ void ZmqServer::mqPollThread()
}

// receive message
rc = zmq_recv(socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
rc = zmq_recv(m_socket, m_buffer.data(), MQ_RESPONSE_MAX_COUNT, ZMQ_DONTWAIT);
if (rc < 0)
{
int zmq_err = zmq_errno();
Expand Down Expand Up @@ -160,8 +147,8 @@ void ZmqServer::mqPollThread()
handleReceivedData(m_buffer.data(), rc);
}

zmq_close(socket);
zmq_ctx_destroy(context);
zmq_close(m_socket);
zmq_ctx_destroy(m_context);

SWSS_LOG_NOTICE("mqPollThread end");
}
Expand Down
9 changes: 9 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class ZmqServer
ZmqMessageHandler* handler);

private:

void connect();

void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();
Expand All @@ -56,6 +59,12 @@ class ZmqServer

std::string m_vrf;

void* m_context;

void* m_socket;

bool m_allowZmqPoll;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down

0 comments on commit 646ee51

Please sign in to comment.