Skip to content

Commit

Permalink
First run code complete
Browse files Browse the repository at this point in the history
  • Loading branch information
renukamanavalan committed May 19, 2022
1 parent 4bba076 commit ff78343
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 46 deletions.
155 changes: 116 additions & 39 deletions src/sonic-eventd/eventd/eventd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,28 @@
* Main proxy service that runs XSUB/XPUB ends
*/

#define MB(N) ((N) * 1024 * 1024)
#define EVT_SIZE_AVG 150

#define MAX_CACHE_SIZE (MB(100) / (EVT_SIZE_AVG))

/* Count of elements returned in each read */
#define READ_SET_SIZE 100

/*
* Max count of possible concurrent event publishers
* A rough estimate only, more for mem reserve.
* So this does not limit any usage
*/
#define MAX_PUBLISHERS_COUNT 1000


eventd_server::eventd_server() : m_capture(NULL)
eventd_server::eventd_server() : m_capture(NULL),
{
m_ctx = zmq_ctx_new();
RET_ON_ERR(m_ctx != NULL, "Failed to get zmq ctx");
m_cache_max = get_config_data(CACHE_MAX_CNT, (uint32_t)MAX_CACHE_SIZE);

out:
return;
}
Expand Down Expand Up @@ -82,61 +97,108 @@ eventd_server::zproxy_service_run(void *frontend, void *backend, void *capture)


int
eventd_server::capture_events()
eventd_server::capture_events(events_data_lst_t &lst)
{
/* clean any pre-existing cache */
int ret = -1;
int i;

events_data_lst_t.swap(m_events);
last_events_t.swap(m_last_events);

/*
* Reserve a MAX_PUBLISHERS_COUNT entries for last events, as we use it only
* upon m_events/vector overflow, which might block adding new entries in map
* if overall mem consumption is too high. Clearing the map just before use
* is likely to help.
*/
for (i=0; i<MAX_PUBLISHERS_COUNT; ++i) {
m_last_events[to_string(i)] = "";
}

vector<strings>().swap(m_events);
map<runtime_id_t, string>.swap(m_last_events);
/* Cache last
typedef map<runtime_id_t, sequence_t> pre_exist_id_t;
pre_exist_id_t pre_exist_id;
RET_ON_ERR(m_capture != NULL, "capture sock is not initialized yet");
while(true) {
zmq_msg_t msg;
internal_event_t event;
int more = 0;
size_t more_size = sizeof (more);
if (!lst.empty()) {
for (events_data_lst_t::it = lst.begin(); it != lst.end(); ++it) {
internal_event_t event;
{
zmq_msg_t pat;
zmq_msg_init(&pat);
RET_ON_ERR(zmq_msg_recv(&pat, m_capture, 0) != -1,
"Failed to capture pattern");
zmq_msg_close(&pat);
deserialize(*itc, event);
pre_exist_id[event[EVENT_RUNTIME_ID]] = events_base::str_to_seq(event[EVENT_SEQUENCE]);
}
m_events.swap(lst);
}
RET_ON_ERR(zmq_getsockopt (m_capture, ZMQ_RCVMORE, &more, &more_size) == 0,
"Failed to get sockopt for capture sock");
RET_ON_ERR(more, "Event data expected, but more is false");
if (!pre_exist_id.empty()) {
/* Check read events against provided cache for 2 seconds to skip */
chrono::steady_clock::timepoint start = chrono::steady_clock::now();
while(!pre_exist_id.empty()) {
internal_event_t event;
string source, evt_str;

zmq_msg_init(&msg);
RET_ON_ERR(zmq_msg_recv(&msg, m_capture, 0) != -1,
"Failed to read event data");
RET_ON_ERR(zmq_message_read(m_socket, 0, source, evt_str) == 0,
"Failed to read from capture socket");

string s((const char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
zmq_msg_close(&msg);
deserialize(evt_str, event);

deserialize(s, event);
pre_exist_id_t::iterator it = pre_exist_id.find(event[EVENT_RUNTIME_ID]);
if (it != pre_exist_id.end()) {
seq = events_base::str_to_seq(event[EVENT_SEQUENCE]);
if (seq > it->second) {
m_events.push_back(evt_str);
}
if (seq >= it->second) {
pre_exist_id.erase(it);
}
}
if(chrono::steady_clock::now() - start > chrono::seconds(2))
break;
}
pre_exist_id_t().swap(pre_exist_id);
}

m_last_events[event[EVENT_RUNTIME_ID]] = s;
/* Save until max allowed */
while(m_events.size() < m_cache_max) {
string source, evt_str;

RET_ON_ERR(zmq_message_read(m_socket, ZMQ_DONTWAIT, source, evt_str) == 0,
"Failed to read from capture socket");
try
{
m_events.push_back(s);
m_events.push_back(evt_str);
}
catch (exception& e)
{
stringstream ss;
ss << e.what();
SWSS_LOG_ERROR("Cache save event failed with %s events:size=%d",
ss.str().c_str(), m_events.size());
goto out;
break;
}
}


/* Save only last event per sender */
m_last_events.clear();
while(true) {
internal_event_t event;
string source, evt_str;

RET_ON_ERR(zmq_message_read(m_socket, ZMQ_DONTWAIT, source, evt_str) == 0,
"Failed to read from capture socket");

deserialize(evt_str, event);

m_last_events[event[EVENT_RUNTIME_ID]] = evt_str;
}
out:
/* Destroy the service and exit the thread */
close();
/*
* Capture stop will close the socket which fail the read
* and hence bail out.
*/
return 0;
}

Expand All @@ -152,13 +214,14 @@ eventd_server::eventd_service()

while(true) {
int code, resp = -1;
vector<events_cache_type_t> req_data, resp_data;
events_data_lst_t req_data, resp_data;

RET_ON_ERR(channel_read(code, data) == 0,
"Failed to read request");

switch(code) {
case EVENT_CACHE_START:
case EVENT_CACHE_INIT:
/* connect only*/
if (m_capture != NULL) {
resp_code = 1;
break;
Expand All @@ -172,21 +235,33 @@ eventd_server::eventd_service()
rc = zmq_setsockopt(sub_read, ZMQ_SUBSCRIBE, "", 0);
RET_ON_ERR(rc == 0, "Failing to ZMQ_SUBSCRIBE");

resp_code = 0;
break;


case EVENT_CACHE_START:
if (m_capture == NULL) {
resp_code = -1;
break;
}
/* Kick off the service */
m_thread_capture = thread(&eventd_server::capture_events, this);
m_thread_capture = thread(&eventd_server::capture_events, this, req_data);

resp_code = 0;
break;


case EVENT_CACHE_STOP:
resp_code = 0;
if (m_capture != NULL) {
close(m_capture);
m_capture = NULL;

/* Wait for thread to end */
m_thread_capture.join();
resp_code = 0;
}
else {
resp_code = -1;
}
break;

Expand All @@ -204,13 +279,15 @@ eventd_server::eventd_service()

int sz = m_events.size() < READ_SET_SIZE ? m_events.size() : READ_SET_SIZE;

auto it = std::next(m_events.begin(), sz);
move(m_events.begin(), m_events.end(), back_inserter(resp_data));
if (sz != 0) {
auto it = std::next(m_events.begin(), sz);
move(m_events.begin(), m_events.end(), back_inserter(resp_data));

if (sz == m_events.size()) {
events_data_lst_t().swap(m_events);
} else {
m_events.erase(m_events.begin(), it);
if (sz == m_events.size()) {
events_data_lst_t().swap(m_events);
} else {
m_events.erase(m_events.begin(), it);
}
}
break;

Expand Down
10 changes: 3 additions & 7 deletions src/sonic-eventd/eventd/eventd.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ class eventd_server {
*
* Thread is started upon creating SUB end of capture socket.
*/
int capture_events();
int capture_events(events_data_lst_t &);


private:
uint32_t m_cache_max;

void *m_ctx;

events_data_lst_t m_events;
Expand All @@ -96,9 +98,3 @@ class eventd_server {
};








0 comments on commit ff78343

Please sign in to comment.