Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabled cache to kick off right upon startup #4

Merged
merged 2 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq
}


/*
* Initialize cache with set of events provided.
* Events read by cache service will be appended
*/
void
capture_service::init_capture_cache(const event_serialized_lst_t &lst)
{
Expand Down Expand Up @@ -137,6 +141,8 @@ capture_service::do_capture()
int rc;
int block_ms=300;
int init_cnt;
event_handle_t subs_handle = NULL;
void *sock = NULL;

typedef enum {
/*
Expand All @@ -154,7 +160,16 @@ capture_service::do_capture()

cap_state_t cap_state = CAP_STATE_INIT;

void *sock = zmq_socket(m_ctx, ZMQ_SUB);
/*
* Need subscription for publishers to publish. Start one.
* As we are reading off of capture socket, we don't read from
* this handle. Not reading is a not a concern, as zmq will cache
* few initial messages and rest it will drop.
*/
subs_handle = events_init_subscriber();
RET_ON_ERR(subs_handle != NULL, "failed to subscribe to all");

sock = zmq_socket(m_ctx, ZMQ_SUB);
RET_ON_ERR(sock != NULL, "failing to get ZMQ_SUB socket");

rc = zmq_connect(sock, get_config(string(CAPTURE_END_KEY)).c_str());
Expand Down Expand Up @@ -275,6 +290,7 @@ capture_service::do_capture()
* Capture stop will close the socket which fail the read
* and hence bail out.
*/
events_deinit_subscriber(subs_handle);
zmq_close(sock);
m_cap_run = false;
return;
Expand Down Expand Up @@ -383,6 +399,15 @@ run_eventd_service()

RET_ON_ERR(service.init_server(zctx) == 0, "Failed to init service");

/*
* Start cache service, right upon eventd starts so as not to lose
* events until telemetry starts.
* Telemetry will send a stop & collect cache upon startup
*/
capture = new capture_service(zctx, cache_max);
RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture");
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");

while(code != EVENT_EXIT) {
int resp = -1;
event_serialized_lst_t req_data, resp_data;
Expand Down
1 change: 1 addition & 0 deletions src/sonic-eventd/src/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Header file for eventd daemon
*/
#include "events_service.h"
#include "events.h"

typedef map<runtime_id_t, event_serialized_t> last_events_t;

Expand Down
97 changes: 63 additions & 34 deletions src/sonic-eventd/tests/eventd_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ void *init_pub(void *zctx)
EXPECT_TRUE(NULL != mock_pub);
EXPECT_EQ(0, zmq_connect(mock_pub, get_config(XSUB_END_KEY).c_str()));

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(200));

return mock_pub;
}

Expand Down Expand Up @@ -264,9 +267,6 @@ TEST(eventd, proxy)
/* Init pub connection */
void *mock_pub = init_pub(zctx);

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(100));

EXPECT_TRUE(5 < ARRAY_SIZE(ldata));

for(int i=0; i<5; ++i) {
Expand Down Expand Up @@ -312,10 +312,6 @@ TEST(eventd, capture)
printf("Capture TEST started\n");
debug_on();

/*
* Need to run subscriber; Else publisher would skip publishing
* in the absence of any subscriber.
*/
bool term_sub = false;
string sub_source;
int sub_evts_sz = 0;
Expand Down Expand Up @@ -370,6 +366,8 @@ TEST(eventd, capture)
/*
* Collect events to publish for capture to cache
* re-publishing some events sent in cache.
* Hence i=1, when first init_cache events are already
* in crash.
*/
for(int i=1; i < ARRAY_SIZE(ldata); ++i) {
internal_event_t ev(create_ev(ldata[i]));
Expand All @@ -395,9 +393,6 @@ TEST(eventd, capture)
/* Init pub connection */
void *mock_pub = init_pub(zctx);

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(200));

/* Publish events from 1 to all. */
run_pub(mock_pub, wr_source, wr_evts);

Expand Down Expand Up @@ -527,9 +522,6 @@ TEST(eventd, captureCacheMax)
/* Init pub connection */
void *mock_pub = init_pub(zctx);

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(200));

/* Publish events from 1 to all. */
run_pub(mock_pub, wr_source, wr_evts);

Expand Down Expand Up @@ -590,12 +582,7 @@ TEST(eventd, service)
printf("Service TEST started\n");
debug_on();

/* capture related */
int init_cache = 4; /* provided along with start capture */

/* startup strings; expected list & read list from capture */
event_serialized_lst_t evts_start, evts_read;

event_service service;

void *zctx = zmq_ctx_new();
Expand All @@ -605,37 +592,79 @@ TEST(eventd, service)
* Start the eventd server side service
* It runs proxy & capture service
* It uses its own zmq context
* It starts to capture too.
*/
thread thread_service(&run_eventd_service);

/* Need client side service to interact with server side */
EXPECT_EQ(0, service.init_client(zctx));

EXPECT_EQ(-1, service.cache_stop());
{
/* eventd_service starts cache too; Test this caching */
/* Init pub connection */
void *mock_pub = init_pub(zctx);

EXPECT_TRUE(init_cache > 1);
internal_events_lst_t wr_evts;
int wr_sz = 2;
string wr_source("hello");

/* Collect few serailized strings of events for startup cache */
for(int i=0; i < init_cache; ++i) {
internal_event_t ev(create_ev(ldata[i]));
string evt_str;
serialize(ev, evt_str);
evts_start.push_back(evt_str);
/* Test service startup caching */
event_serialized_lst_t evts_start, evts_read;

for(int i=0; i<wr_sz; ++i) {
string evt_str;
internal_event_t ev(create_ev(ldata[i]));

wr_evts.push_back(ev);
serialize(ev, evt_str);
evts_start.push_back(evt_str);
}

/* Publish events. */
run_pub(mock_pub, wr_source, wr_evts);

EXPECT_EQ(0, service.cache_init());
EXPECT_EQ(0, service.cache_start(evts_start));
/* Published events must have been captured. Give a pause, to ensure sent. */
this_thread::sleep_for(chrono::milliseconds(200));

this_thread::sleep_for(chrono::milliseconds(200));
EXPECT_EQ(0, service.cache_stop());

/* Stop capture, closes socket & terminates the thread */
EXPECT_EQ(0, service.cache_stop());
/* Read the cache; expect wr_sz events */
EXPECT_EQ(0, service.cache_read(evts_read));

/* Read the cache */
EXPECT_EQ(0, service.cache_read(evts_read));
EXPECT_EQ(evts_read, evts_start);

EXPECT_EQ(evts_read, evts_start);
zmq_close(mock_pub);
}

{
/* Test normal cache op; init, start & stop via event_service APIs */
int init_cache = 4; /* provided along with start capture */
event_serialized_lst_t evts_start, evts_read;

EXPECT_TRUE(init_cache > 1);

/* Collect few serailized strings of events for startup cache */
for(int i=0; i < init_cache; ++i) {
internal_event_t ev(create_ev(ldata[i]));
string evt_str;
serialize(ev, evt_str);
evts_start.push_back(evt_str);
}


EXPECT_EQ(0, service.cache_init());
EXPECT_EQ(0, service.cache_start(evts_start));

this_thread::sleep_for(chrono::milliseconds(200));

/* Stop capture, closes socket & terminates the thread */
EXPECT_EQ(0, service.cache_stop());

/* Read the cache */
EXPECT_EQ(0, service.cache_read(evts_read));

EXPECT_EQ(evts_read, evts_start);
}

EXPECT_EQ(0, service.send_recv(EVENT_EXIT));

Expand Down