Skip to content

Commit

Permalink
Remove control character tracking from do_capture
Browse files Browse the repository at this point in the history
  • Loading branch information
zbud-msft committed Aug 27, 2024
1 parent 967caa8 commit afa2e6b
Showing 1 changed file with 3 additions and 43 deletions.
46 changes: 3 additions & 43 deletions src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ static bool
validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq)
{
bool ret = false;

if(event.empty()) {
return ret;
}
internal_event_t::const_iterator itc_r, itc_s, itc_e;
itc_r = event.find(EVENT_RUNTIME_ID);
itc_s = event.find(EVENT_SEQUENCE);
Expand Down Expand Up @@ -357,7 +359,6 @@ capture_service::do_capture()
int init_cnt;
void *cap_sub_sock = NULL;
counters_t total_overflow = 0;
static bool init_done = false;

typedef enum {
/*
Expand Down Expand Up @@ -394,47 +395,6 @@ capture_service::do_capture()

m_cap_run = true;

if(!init_done) {
zmq_msg_t msg;
zmq_msg_init(&msg);
int rc = zmq_msg_recv(&msg, cap_sub_sock, 0);

/*
* When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1.
* When capture service begins to read, the very first message that it will read is this
* control character.
*
* We will handle by reading this message and dropping it before we begin reading for
* cached events.
*
* This behavior will only happen once when XSUB connects to XPUB not everytime cache is started.
*
* There are chances that there are events already published to XSUB endpoint before XSUB is able to connect to XPUB, so we can receive events
before the subscription message
*/


if(rc == 1) { // Expected case to receive subscription message as very first message
SWSS_LOG_INFO("Received subscription message when XSUB connects to XPUB");
} else if (rc > 1) { // If there are events already published to XSUB before XSUB connects to XPUB, we can receive events before subscription message
string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
SWSS_LOG_DEBUG("Receiving event from source: %s, will read second part of event", event_source.c_str());
int more = 0;
size_t more_size = sizeof(more);
zmq_getsockopt(cap_sub_sock, ZMQ_RCVMORE, &more, &more_size);
if(more) {
zmq_msg_t msg_part;
zmq_msg_init(&msg_part);
zmq_msg_recv(&msg_part, cap_sub_sock, 0);
zmq_msg_close(&msg_part);
}
} else {
SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc);
}
zmq_msg_close(&msg);
init_done = true;
}

while (m_ctrl != START_CAPTURE) {
/* Wait for capture start */
this_thread::sleep_for(chrono::milliseconds(10));
Expand Down

0 comments on commit afa2e6b

Please sign in to comment.