Skip to content

Commit

Permalink
Merge pull request #4 from renukamanavalan/remanava_events
Browse files Browse the repository at this point in the history
Enabled cache to kick off right upon startup
  • Loading branch information
renukamanavalan authored Jul 14, 2022
2 parents 076a6ac + 91d7633 commit 65f42ed
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 35 deletions.
27 changes: 26 additions & 1 deletion src/sonic-eventd/src/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq
}


/*
* Initialize cache with set of events provided.
* Events read by cache service will be appended
*/
void
capture_service::init_capture_cache(const event_serialized_lst_t &lst)
{
Expand Down Expand Up @@ -137,6 +141,8 @@ capture_service::do_capture()
int rc;
int block_ms=300;
int init_cnt;
event_handle_t subs_handle = NULL;
void *sock = NULL;

typedef enum {
/*
Expand All @@ -154,7 +160,16 @@ capture_service::do_capture()

cap_state_t cap_state = CAP_STATE_INIT;

void *sock = zmq_socket(m_ctx, ZMQ_SUB);
/*
* Need subscription for publishers to publish. Start one.
* As we are reading off of capture socket, we don't read from
* this handle. Not reading is a not a concern, as zmq will cache
* few initial messages and rest it will drop.
*/
subs_handle = events_init_subscriber();
RET_ON_ERR(subs_handle != NULL, "failed to subscribe to all");

sock = zmq_socket(m_ctx, ZMQ_SUB);
RET_ON_ERR(sock != NULL, "failing to get ZMQ_SUB socket");

rc = zmq_connect(sock, get_config(string(CAPTURE_END_KEY)).c_str());
Expand Down Expand Up @@ -275,6 +290,7 @@ capture_service::do_capture()
* Capture stop will close the socket which fail the read
* and hence bail out.
*/
events_deinit_subscriber(subs_handle);
zmq_close(sock);
m_cap_run = false;
return;
Expand Down Expand Up @@ -383,6 +399,15 @@ run_eventd_service()

RET_ON_ERR(service.init_server(zctx) == 0, "Failed to init service");

/*
* Start cache service, right upon eventd starts so as not to lose
* events until telemetry starts.
* Telemetry will send a stop & collect cache upon startup
*/
capture = new capture_service(zctx, cache_max);
RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture");
RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture");

while(code != EVENT_EXIT) {
int resp = -1;
event_serialized_lst_t req_data, resp_data;
Expand Down
1 change: 1 addition & 0 deletions src/sonic-eventd/src/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Header file for eventd daemon
*/
#include "events_service.h"
#include "events.h"

typedef map<runtime_id_t, event_serialized_t> last_events_t;

Expand Down
97 changes: 63 additions & 34 deletions src/sonic-eventd/tests/eventd_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ void *init_pub(void *zctx)
EXPECT_TRUE(NULL != mock_pub);
EXPECT_EQ(0, zmq_connect(mock_pub, get_config(XSUB_END_KEY).c_str()));

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(200));

return mock_pub;
}

Expand Down Expand Up @@ -264,9 +267,6 @@ TEST(eventd, proxy)
/* Init pub connection */
void *mock_pub = init_pub(zctx);

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(100));

EXPECT_TRUE(5 < ARRAY_SIZE(ldata));

for(int i=0; i<5; ++i) {
Expand Down Expand Up @@ -312,10 +312,6 @@ TEST(eventd, capture)
printf("Capture TEST started\n");
debug_on();

/*
* Need to run subscriber; Else publisher would skip publishing
* in the absence of any subscriber.
*/
bool term_sub = false;
string sub_source;
int sub_evts_sz = 0;
Expand Down Expand Up @@ -370,6 +366,8 @@ TEST(eventd, capture)
/*
* Collect events to publish for capture to cache
* re-publishing some events sent in cache.
* Hence i=1, when first init_cache events are already
* in crash.
*/
for(int i=1; i < ARRAY_SIZE(ldata); ++i) {
internal_event_t ev(create_ev(ldata[i]));
Expand All @@ -395,9 +393,6 @@ TEST(eventd, capture)
/* Init pub connection */
void *mock_pub = init_pub(zctx);

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(200));

/* Publish events from 1 to all. */
run_pub(mock_pub, wr_source, wr_evts);

Expand Down Expand Up @@ -527,9 +522,6 @@ TEST(eventd, captureCacheMax)
/* Init pub connection */
void *mock_pub = init_pub(zctx);

/* Provide time for async connect to complete */
this_thread::sleep_for(chrono::milliseconds(200));

/* Publish events from 1 to all. */
run_pub(mock_pub, wr_source, wr_evts);

Expand Down Expand Up @@ -590,12 +582,7 @@ TEST(eventd, service)
printf("Service TEST started\n");
debug_on();

/* capture related */
int init_cache = 4; /* provided along with start capture */

/* startup strings; expected list & read list from capture */
event_serialized_lst_t evts_start, evts_read;

event_service service;

void *zctx = zmq_ctx_new();
Expand All @@ -605,37 +592,79 @@ TEST(eventd, service)
* Start the eventd server side service
* It runs proxy & capture service
* It uses its own zmq context
* It starts to capture too.
*/
thread thread_service(&run_eventd_service);

/* Need client side service to interact with server side */
EXPECT_EQ(0, service.init_client(zctx));

EXPECT_EQ(-1, service.cache_stop());
{
/* eventd_service starts cache too; Test this caching */
/* Init pub connection */
void *mock_pub = init_pub(zctx);

EXPECT_TRUE(init_cache > 1);
internal_events_lst_t wr_evts;
int wr_sz = 2;
string wr_source("hello");

/* Collect few serailized strings of events for startup cache */
for(int i=0; i < init_cache; ++i) {
internal_event_t ev(create_ev(ldata[i]));
string evt_str;
serialize(ev, evt_str);
evts_start.push_back(evt_str);
/* Test service startup caching */
event_serialized_lst_t evts_start, evts_read;

for(int i=0; i<wr_sz; ++i) {
string evt_str;
internal_event_t ev(create_ev(ldata[i]));

wr_evts.push_back(ev);
serialize(ev, evt_str);
evts_start.push_back(evt_str);
}

/* Publish events. */
run_pub(mock_pub, wr_source, wr_evts);

EXPECT_EQ(0, service.cache_init());
EXPECT_EQ(0, service.cache_start(evts_start));
/* Published events must have been captured. Give a pause, to ensure sent. */
this_thread::sleep_for(chrono::milliseconds(200));

this_thread::sleep_for(chrono::milliseconds(200));
EXPECT_EQ(0, service.cache_stop());

/* Stop capture, closes socket & terminates the thread */
EXPECT_EQ(0, service.cache_stop());
/* Read the cache; expect wr_sz events */
EXPECT_EQ(0, service.cache_read(evts_read));

/* Read the cache */
EXPECT_EQ(0, service.cache_read(evts_read));
EXPECT_EQ(evts_read, evts_start);

EXPECT_EQ(evts_read, evts_start);
zmq_close(mock_pub);
}

{
/* Test normal cache op; init, start & stop via event_service APIs */
int init_cache = 4; /* provided along with start capture */
event_serialized_lst_t evts_start, evts_read;

EXPECT_TRUE(init_cache > 1);

/* Collect few serailized strings of events for startup cache */
for(int i=0; i < init_cache; ++i) {
internal_event_t ev(create_ev(ldata[i]));
string evt_str;
serialize(ev, evt_str);
evts_start.push_back(evt_str);
}


EXPECT_EQ(0, service.cache_init());
EXPECT_EQ(0, service.cache_start(evts_start));

this_thread::sleep_for(chrono::milliseconds(200));

/* Stop capture, closes socket & terminates the thread */
EXPECT_EQ(0, service.cache_stop());

/* Read the cache */
EXPECT_EQ(0, service.cache_read(evts_read));

EXPECT_EQ(evts_read, evts_start);
}

EXPECT_EQ(0, service.send_recv(EVENT_EXIT));

Expand Down

0 comments on commit 65f42ed

Please sign in to comment.