From dc2e7bc402d11cb1e96c9be6e354fea8250c9fd5 Mon Sep 17 00:00:00 2001 From: Zain Budhwani Date: Fri, 23 Aug 2024 21:49:05 +0000 Subject: [PATCH 1/4] Initial commit --- src/sonic-eventd/src/eventd.cpp | 55 +++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp index eb692d5b3a9b..b8cd285d2193 100644 --- a/src/sonic-eventd/src/eventd.cpp +++ b/src/sonic-eventd/src/eventd.cpp @@ -398,7 +398,7 @@ capture_service::do_capture() zmq_msg_t msg; zmq_msg_init(&msg); int rc = zmq_msg_recv(&msg, cap_sub_sock, 0); - RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB"); + /* * When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1. * When capture service begins to read, the very first message that it will read is this @@ -409,8 +409,59 @@ capture_service::do_capture() * * This behavior will only happen once when XSUB connects to XPUB not everytime cache is started. * + * There are chances that there are events already published to XSUB endpoint before XSUB is able to connect to XPUB, so we can receive events + before the subscription message */ - init_done = true; + + + if(rc == 1) { // Expected case to receive subscription message as very first message + SWSS_LOG_INFO("Received subscription message when XSUB connects to XPUB"); + zmq_msg_close(&msg); + } else if (rc > 1) { // If there are events already published to XSUB when XSUB connects to XPUB, we can receive events before subscription message + string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); + SWSS_LOG_INFO("Receiving event from source: %s, will read second part of event", event_source.c_str()); + zmq_msg_close(&msg); + int more = 0; + size_t more_size = sizeof(more); + zmq_getsockopt(cap_sub_sock, ZMQ_RCVMORE, &more, &more_size); + // Read multi-part message + if(more) { + zmq_msg_t msg_part; + zmq_msg_init(&msg_part); + rc = zmq_msg_recv(&msg_part, cap_sub_sock, 0); + if(rc > 0) { + string event_data((const char*)zmq_msg_data(&msg_part),zmq_msg_size(&msg_part)); + SWSS_LOG_INFO("Received second part of event: %s", event_data.c_str()); + zmq_msg_close(&msg_part); + internal_event_t event; + if(deserialize(event_data, event) == 0) { + runtime_id_t rid; + sequence_t seq; + + if(validate_event(event, rid, seq)) { + m_pre_exist_id[rid] = seq; + m_events.push_back(event_data); + } + rc = 1; + } else { + SWSS_LOG_ERROR("Unable to deserialize first event"); + rc = -1; + } + } else { + SWSS_LOG_ERROR("Unable to read second part of first event, rc=%d", rc); + zmq_msg_close(&msg_part); + rc = -1; + } + } else { // No second part to read + rc = 1; + } + } else { + zmq_msg_close(&msg); + SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc); + } + + RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB"); + init_done = true; } while (m_ctrl != START_CAPTURE) { From 3d5f62dfbafbc7dd2887e236e5c96af32fe919bb Mon Sep 17 00:00:00 2001 From: Zain Budhwani Date: Tue, 27 Aug 2024 00:16:45 +0000 Subject: [PATCH 2/4] Remove failure from initial do_capture read --- src/sonic-eventd/src/eventd.cpp | 15 ++++----------- src/sonic-eventd/tests/eventd_ut.cpp | 23 ++++++----------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp index b8cd285d2193..4b92ff974773 100644 --- a/src/sonic-eventd/src/eventd.cpp +++ b/src/sonic-eventd/src/eventd.cpp @@ -419,7 +419,7 @@ capture_service::do_capture() zmq_msg_close(&msg); } else if (rc > 1) { // If there are events already published to XSUB when XSUB connects to XPUB, we can receive events before subscription message string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); - SWSS_LOG_INFO("Receiving event from source: %s, will read second part of event", event_source.c_str()); + SWSS_LOG_DEBUG("Receiving event from source: %s, will read second part of event", event_source.c_str()); zmq_msg_close(&msg); int more = 0; size_t more_size = sizeof(more); @@ -431,7 +431,7 @@ capture_service::do_capture() rc = zmq_msg_recv(&msg_part, cap_sub_sock, 0); if(rc > 0) { string event_data((const char*)zmq_msg_data(&msg_part),zmq_msg_size(&msg_part)); - SWSS_LOG_INFO("Received second part of event: %s", event_data.c_str()); + SWSS_LOG_DEBUG("Received second part of event: %s", event_data.c_str()); zmq_msg_close(&msg_part); internal_event_t event; if(deserialize(event_data, event) == 0) { @@ -442,25 +442,18 @@ capture_service::do_capture() m_pre_exist_id[rid] = seq; m_events.push_back(event_data); } - rc = 1; } else { - SWSS_LOG_ERROR("Unable to deserialize first event"); - rc = -1; + SWSS_LOG_DEBUG("Unable to deserialize first event"); } } else { - SWSS_LOG_ERROR("Unable to read second part of first event, rc=%d", rc); + SWSS_LOG_DEBUG("Unable to read second part of first event, rc=%d", rc); zmq_msg_close(&msg_part); - rc = -1; } - } else { // No second part to read - rc = 1; } } else { zmq_msg_close(&msg); SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc); } - - RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB"); init_done = true; } diff --git a/src/sonic-eventd/tests/eventd_ut.cpp b/src/sonic-eventd/tests/eventd_ut.cpp index 729563fcd39d..a499bb67a95e 100644 --- a/src/sonic-eventd/tests/eventd_ut.cpp +++ b/src/sonic-eventd/tests/eventd_ut.cpp @@ -150,9 +150,8 @@ static const test_data_t ldata[] = { }, }; - void run_cap(void *zctx, bool &term, string &read_source, - int &cnt, bool &should_read_control) + int &cnt) { void *mock_cap = zmq_socket (zctx, ZMQ_SUB); string source; @@ -165,11 +164,10 @@ void run_cap(void *zctx, bool &term, string &read_source, EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0)); EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); - if(should_read_control) { - zmq_msg_t msg; - zmq_msg_init(&msg); - EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture - } + zmq_msg_t msg; + zmq_msg_init(&msg); + int rc = zmq_msg_recv(&msg, mock_cap, 0); + EXPECT_EQ(1, rc); // read control character while(!term) { string source; @@ -228,7 +226,6 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst) TEST(eventd, proxy) { printf("Proxy TEST started\n"); - bool should_read_control = false; bool term_sub = false; bool term_cap = false; string rd_csource, rd_source, wr_source("hello"); @@ -246,7 +243,7 @@ TEST(eventd, proxy) EXPECT_EQ(0, pxy->init()); /* capture in a thread */ - thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control)); + thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz)); /* subscriber in a thread */ thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz)); @@ -283,17 +280,9 @@ TEST(eventd, proxy) zmq_close(mock_pub); - /* Do control test */ - - should_read_control = true; - - /* capture in a thread */ - thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control)); - delete pxy; pxy = NULL; - thrcc.join(); zmq_ctx_term(zctx); /* Provide time for async proxy removal to complete */ From 967caa8618a9d9a7c57eebb212582978feacbaf5 Mon Sep 17 00:00:00 2001 From: Zain Budhwani Date: Tue, 27 Aug 2024 00:45:45 +0000 Subject: [PATCH 3/4] Drop messages until proxy is setup --- src/sonic-eventd/src/eventd.cpp | 30 ++++-------------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp index 4b92ff974773..6727038aeb87 100644 --- a/src/sonic-eventd/src/eventd.cpp +++ b/src/sonic-eventd/src/eventd.cpp @@ -416,44 +416,22 @@ capture_service::do_capture() if(rc == 1) { // Expected case to receive subscription message as very first message SWSS_LOG_INFO("Received subscription message when XSUB connects to XPUB"); - zmq_msg_close(&msg); - } else if (rc > 1) { // If there are events already published to XSUB when XSUB connects to XPUB, we can receive events before subscription message + } else if (rc > 1) { // If there are events already published to XSUB before XSUB connects to XPUB, we can receive events before subscription message string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); SWSS_LOG_DEBUG("Receiving event from source: %s, will read second part of event", event_source.c_str()); - zmq_msg_close(&msg); int more = 0; size_t more_size = sizeof(more); zmq_getsockopt(cap_sub_sock, ZMQ_RCVMORE, &more, &more_size); - // Read multi-part message if(more) { zmq_msg_t msg_part; zmq_msg_init(&msg_part); - rc = zmq_msg_recv(&msg_part, cap_sub_sock, 0); - if(rc > 0) { - string event_data((const char*)zmq_msg_data(&msg_part),zmq_msg_size(&msg_part)); - SWSS_LOG_DEBUG("Received second part of event: %s", event_data.c_str()); - zmq_msg_close(&msg_part); - internal_event_t event; - if(deserialize(event_data, event) == 0) { - runtime_id_t rid; - sequence_t seq; - - if(validate_event(event, rid, seq)) { - m_pre_exist_id[rid] = seq; - m_events.push_back(event_data); - } - } else { - SWSS_LOG_DEBUG("Unable to deserialize first event"); - } - } else { - SWSS_LOG_DEBUG("Unable to read second part of first event, rc=%d", rc); - zmq_msg_close(&msg_part); - } + zmq_msg_recv(&msg_part, cap_sub_sock, 0); + zmq_msg_close(&msg_part); } } else { - zmq_msg_close(&msg); SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc); } + zmq_msg_close(&msg); init_done = true; } From afa2e6b8e55ce8632e289e468747df69bf75f8ef Mon Sep 17 00:00:00 2001 From: Zain Budhwani Date: Tue, 27 Aug 2024 21:51:27 +0000 Subject: [PATCH 4/4] Remove control character tracking from do_capture --- src/sonic-eventd/src/eventd.cpp | 46 +++------------------------------ 1 file changed, 3 insertions(+), 43 deletions(-) diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp index 6727038aeb87..d4da44f32526 100644 --- a/src/sonic-eventd/src/eventd.cpp +++ b/src/sonic-eventd/src/eventd.cpp @@ -302,7 +302,9 @@ static bool validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq) { bool ret = false; - + if(event.empty()) { + return ret; + } internal_event_t::const_iterator itc_r, itc_s, itc_e; itc_r = event.find(EVENT_RUNTIME_ID); itc_s = event.find(EVENT_SEQUENCE); @@ -357,7 +359,6 @@ capture_service::do_capture() int init_cnt; void *cap_sub_sock = NULL; counters_t total_overflow = 0; - static bool init_done = false; typedef enum { /* @@ -394,47 +395,6 @@ capture_service::do_capture() m_cap_run = true; - if(!init_done) { - zmq_msg_t msg; - zmq_msg_init(&msg); - int rc = zmq_msg_recv(&msg, cap_sub_sock, 0); - - /* - * When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1. - * When capture service begins to read, the very first message that it will read is this - * control character. - * - * We will handle by reading this message and dropping it before we begin reading for - * cached events. - * - * This behavior will only happen once when XSUB connects to XPUB not everytime cache is started. - * - * There are chances that there are events already published to XSUB endpoint before XSUB is able to connect to XPUB, so we can receive events - before the subscription message - */ - - - if(rc == 1) { // Expected case to receive subscription message as very first message - SWSS_LOG_INFO("Received subscription message when XSUB connects to XPUB"); - } else if (rc > 1) { // If there are events already published to XSUB before XSUB connects to XPUB, we can receive events before subscription message - string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); - SWSS_LOG_DEBUG("Receiving event from source: %s, will read second part of event", event_source.c_str()); - int more = 0; - size_t more_size = sizeof(more); - zmq_getsockopt(cap_sub_sock, ZMQ_RCVMORE, &more, &more_size); - if(more) { - zmq_msg_t msg_part; - zmq_msg_init(&msg_part); - zmq_msg_recv(&msg_part, cap_sub_sock, 0); - zmq_msg_close(&msg_part); - } - } else { - SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc); - } - zmq_msg_close(&msg); - init_done = true; - } - while (m_ctrl != START_CAPTURE) { /* Wait for capture start */ this_thread::sleep_for(chrono::milliseconds(10));