From 20747fa63d59f4aadaa06da282e23d8e1e0b9fdf Mon Sep 17 00:00:00 2001 From: "Sudharsan D.G" Date: Mon, 28 Oct 2019 18:09:16 -0500 Subject: [PATCH] Sflow orchagent changes (#1012) * Sflow orchagent changes --- cfgmgr/Makefile.am | 9 +- cfgmgr/sflowmgr.cpp | 376 +++++++++++++++++++++++++++++++++++++++ cfgmgr/sflowmgr.h | 67 +++++++ cfgmgr/sflowmgrd.cpp | 88 +++++++++ orchagent/Makefile.am | 1 + orchagent/orchdaemon.cpp | 9 +- orchagent/orchdaemon.h | 1 + orchagent/saihelper.cpp | 3 + orchagent/sfloworch.cpp | 347 ++++++++++++++++++++++++++++++++++++ orchagent/sfloworch.h | 47 +++++ tests/test_sflow.py | 130 ++++++++++++++ 11 files changed, 1075 insertions(+), 3 deletions(-) create mode 100644 cfgmgr/sflowmgr.cpp create mode 100644 cfgmgr/sflowmgr.h create mode 100644 cfgmgr/sflowmgrd.cpp create mode 100644 orchagent/sfloworch.cpp create mode 100644 orchagent/sfloworch.h create mode 100644 tests/test_sflow.py diff --git a/cfgmgr/Makefile.am b/cfgmgr/Makefile.am index ee5c3b5ce69e..c1e3f06996f9 100644 --- a/cfgmgr/Makefile.am +++ b/cfgmgr/Makefile.am @@ -3,7 +3,7 @@ CFLAGS_SAI = -I /usr/include/sai LIBNL_CFLAGS = -I/usr/include/libnl3 LIBNL_LIBS = -lnl-genl-3 -lnl-route-3 -lnl-3 -bin_PROGRAMS = vlanmgrd teammgrd portmgrd intfmgrd buffermgrd vrfmgrd nbrmgrd vxlanmgrd +bin_PROGRAMS = vlanmgrd teammgrd portmgrd intfmgrd buffermgrd vrfmgrd nbrmgrd vxlanmgrd sflowmgrd if DEBUG DBGFLAGS = -ggdb -DDEBUG @@ -49,4 +49,9 @@ nbrmgrd_LDADD = -lswsscommon $(LIBNL_LIBS) vxlanmgrd_SOURCES = vxlanmgrd.cpp vxlanmgr.cpp $(top_srcdir)/orchagent/orch.cpp $(top_srcdir)/orchagent/request_parser.cpp shellcmd.h vxlanmgrd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) vxlanmgrd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) -vxlanmgrd_LDADD = -lswsscommon \ No newline at end of file +vxlanmgrd_LDADD = -lswsscommon + +sflowmgrd_SOURCES = sflowmgrd.cpp sflowmgr.cpp $(top_srcdir)/orchagent/orch.cpp $(top_srcdir)/orchagent/request_parser.cpp shellcmd.h +sflowmgrd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) +sflowmgrd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) +sflowmgrd_LDADD = -lswsscommon diff --git a/cfgmgr/sflowmgr.cpp b/cfgmgr/sflowmgr.cpp new file mode 100644 index 000000000000..98d6a7775696 --- /dev/null +++ b/cfgmgr/sflowmgr.cpp @@ -0,0 +1,376 @@ +#include "logger.h" +#include "dbconnector.h" +#include "producerstatetable.h" +#include "tokenize.h" +#include "ipprefix.h" +#include "sflowmgr.h" +#include "exec.h" +#include "shellcmd.h" + +using namespace std; +using namespace swss; + +map sflowSpeedRateInitMap = +{ + {SFLOW_SAMPLE_RATE_KEY_400G, SFLOW_SAMPLE_RATE_VALUE_400G}, + {SFLOW_SAMPLE_RATE_KEY_100G, SFLOW_SAMPLE_RATE_VALUE_100G}, + {SFLOW_SAMPLE_RATE_KEY_50G, SFLOW_SAMPLE_RATE_VALUE_50G}, + {SFLOW_SAMPLE_RATE_KEY_40G, SFLOW_SAMPLE_RATE_VALUE_40G}, + {SFLOW_SAMPLE_RATE_KEY_25G, SFLOW_SAMPLE_RATE_VALUE_25G}, + {SFLOW_SAMPLE_RATE_KEY_10G, SFLOW_SAMPLE_RATE_VALUE_10G}, + {SFLOW_SAMPLE_RATE_KEY_1G, SFLOW_SAMPLE_RATE_VALUE_1G} +}; + +SflowMgr::SflowMgr(DBConnector *cfgDb, DBConnector *appDb, const vector &tableNames) : + Orch(cfgDb, tableNames), + m_cfgSflowTable(cfgDb, CFG_SFLOW_TABLE_NAME), + m_cfgSflowSessionTable(cfgDb, CFG_SFLOW_SESSION_TABLE_NAME), + m_appSflowTable(appDb, APP_SFLOW_TABLE_NAME), + m_appSflowSessionTable(appDb, APP_SFLOW_SESSION_TABLE_NAME) +{ + m_intfAllConf = true; + m_gEnable = false; +} + +void SflowMgr::sflowHandleService(bool enable) +{ + stringstream cmd; + string res; + + SWSS_LOG_ENTER(); + + if (enable) + { + cmd << "service hsflowd restart"; + } + else + { + cmd << "service hsflowd stop"; + } + + int ret = swss::exec(cmd.str(), res); + if (ret) + { + SWSS_LOG_ERROR("Command '%s' failed with rc %d", cmd.str().c_str(), ret); + } + else + { + SWSS_LOG_NOTICE("Starting hsflowd service"); + SWSS_LOG_INFO("Command '%s' succeeded", cmd.str().c_str()); + } + +} + +void SflowMgr::sflowUpdatePortInfo(Consumer &consumer) +{ + auto it = consumer.m_toSync.begin(); + + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + string op = kfvOp(t); + auto values = kfvFieldsValues(t); + + if (op == SET_COMMAND) + { + SflowPortInfo port_info; + bool new_port = false; + + auto sflowPortConf = m_sflowPortConfMap.find(key); + if (sflowPortConf == m_sflowPortConfMap.end()) + { + new_port = true; + port_info.local_conf = false; + port_info.speed = SFLOW_ERROR_SPEED_STR; + port_info.rate = ""; + port_info.admin = ""; + m_sflowPortConfMap[key] = port_info; + } + for (auto i : values) + { + if (fvField(i) == "speed") + { + m_sflowPortConfMap[key].speed = fvValue(i); + } + } + + if (new_port) + { + if (m_gEnable && m_intfAllConf) + { + vector fvs; + sflowGetGlobalInfo(fvs, m_sflowPortConfMap[key].speed); + m_appSflowSessionTable.set(key, fvs); + } + } + } + else if (op == DEL_COMMAND) + { + auto sflowPortConf = m_sflowPortConfMap.find(key); + if (sflowPortConf != m_sflowPortConfMap.end()) + { + bool local_cfg = m_sflowPortConfMap[key].local_conf; + + m_sflowPortConfMap.erase(key); + if ((m_intfAllConf && m_gEnable) || local_cfg) + { + m_appSflowSessionTable.del(key); + } + } + } + it = consumer.m_toSync.erase(it); + } +} + +void SflowMgr::sflowHandleSessionAll(bool enable) +{ + for (auto it: m_sflowPortConfMap) + { + if (!it.second.local_conf) + { + vector fvs; + sflowGetGlobalInfo(fvs, it.second.speed); + if (enable) + { + m_appSflowSessionTable.set(it.first, fvs); + } + else + { + m_appSflowSessionTable.del(it.first); + } + } + } +} + +void SflowMgr::sflowHandleSessionLocal(bool enable) +{ + for (auto it: m_sflowPortConfMap) + { + if (it.second.local_conf) + { + vector fvs; + sflowGetPortInfo(fvs, it.second); + if (enable) + { + m_appSflowSessionTable.set(it.first, fvs); + } + else + { + m_appSflowSessionTable.del(it.first); + } + } + } +} + +void SflowMgr::sflowGetGlobalInfo(vector &fvs, string speed) +{ + string rate; + FieldValueTuple fv1("admin_state", "up"); + fvs.push_back(fv1); + + if (speed != SFLOW_ERROR_SPEED_STR) + { + rate = sflowSpeedRateInitMap[speed]; + } + else + { + rate = SFLOW_ERROR_SPEED_STR; + } + FieldValueTuple fv2("sample_rate",rate); + fvs.push_back(fv2); +} + +void SflowMgr::sflowGetPortInfo(vector &fvs, SflowPortInfo &local_info) +{ + if (local_info.admin.length() > 0) + { + FieldValueTuple fv1("admin_state", local_info.admin); + fvs.push_back(fv1); + } + + FieldValueTuple fv2("sample_rate", local_info.rate); + fvs.push_back(fv2); +} + +void SflowMgr::sflowCheckAndFillValues(string alias, vector &fvs) +{ + string rate; + bool admin_present = false; + bool rate_present = false; + + for (auto i : fvs) + { + if (fvField(i) == "sample_rate") + { + rate_present = true; + m_sflowPortConfMap[alias].rate = fvValue(i); + } + if (fvField(i) == "admin_state") + { + admin_present = true; + m_sflowPortConfMap[alias].admin = fvValue(i); + } + } + + if (!rate_present) + { + if (m_sflowPortConfMap[alias].rate == "") + { + string speed = m_sflowPortConfMap[alias].speed; + + if (speed != SFLOW_ERROR_SPEED_STR) + { + rate = sflowSpeedRateInitMap[speed]; + } + else + { + rate = SFLOW_ERROR_SPEED_STR; + } + m_sflowPortConfMap[alias].rate = rate; + } + FieldValueTuple fv("sample_rate", m_sflowPortConfMap[alias].rate); + fvs.push_back(fv); + } + + if (!admin_present) + { + if (m_sflowPortConfMap[alias].admin == "") + { + /* By default admin state is enable if not set explicitely */ + m_sflowPortConfMap[alias].admin = "up"; + } + FieldValueTuple fv("admin_state", m_sflowPortConfMap[alias].admin); + fvs.push_back(fv); + } +} + +void SflowMgr::doTask(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + + auto table = consumer.getTableName(); + + if (table == CFG_PORT_TABLE_NAME) + { + sflowUpdatePortInfo(consumer); + return; + } + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + + string key = kfvKey(t); + string op = kfvOp(t); + auto values = kfvFieldsValues(t); + + if (op == SET_COMMAND) + { + if (table == CFG_SFLOW_TABLE_NAME) + { + for (auto i : values) + { + if (fvField(i) == "admin_state") + { + bool enable = false; + if (fvValue(i) == "up") + { + enable = true; + } + if (enable == m_gEnable) + { + break; + } + m_gEnable = enable; + sflowHandleService(enable); + if (m_intfAllConf) + { + sflowHandleSessionAll(enable); + } + sflowHandleSessionLocal(enable); + } + } + m_appSflowTable.set(key, values); + } + else if (table == CFG_SFLOW_SESSION_TABLE_NAME) + { + if (key == "all") + { + for (auto i : values) + { + if (fvField(i) == "admin_state") + { + bool enable = false; + + if (fvValue(i) == "up") + { + enable = true; + } + if ((enable != m_intfAllConf) && (m_gEnable)) + { + sflowHandleSessionAll(enable); + } + m_intfAllConf = enable; + } + } + } + else + { + auto sflowPortConf = m_sflowPortConfMap.find(key); + + if (sflowPortConf == m_sflowPortConfMap.end()) + { + it++; + continue; + } + sflowCheckAndFillValues(key,values); + m_sflowPortConfMap[key].local_conf = true; + m_appSflowSessionTable.set(key, values); + } + } + } + else if (op == DEL_COMMAND) + { + if (table == CFG_SFLOW_TABLE_NAME) + { + if (m_gEnable) + { + sflowHandleService(false); + sflowHandleSessionAll(false); + } + m_gEnable = false; + m_appSflowTable.del(key); + } + else if (table == CFG_SFLOW_SESSION_TABLE_NAME) + { + if (key == "all") + { + if (!m_intfAllConf) + { + sflowHandleSessionAll(true); + } + m_intfAllConf = true; + } + else + { + m_appSflowSessionTable.del(key); + m_sflowPortConfMap[key].local_conf = false; + m_sflowPortConfMap[key].rate = ""; + m_sflowPortConfMap[key].admin = ""; + + /* If Global configured, set global session on port after local config is deleted */ + if (m_intfAllConf) + { + vector fvs; + sflowGetGlobalInfo(fvs, m_sflowPortConfMap[key].speed); + m_appSflowSessionTable.set(key,fvs); + } + } + } + } + it = consumer.m_toSync.erase(it); + } +} diff --git a/cfgmgr/sflowmgr.h b/cfgmgr/sflowmgr.h new file mode 100644 index 000000000000..16cd22379887 --- /dev/null +++ b/cfgmgr/sflowmgr.h @@ -0,0 +1,67 @@ +#pragma once + +#include "dbconnector.h" +#include "orch.h" +#include "producerstatetable.h" + +#include +#include +#include + +namespace swss { + +#define SFLOW_SAMPLE_RATE_KEY_400G "400000" +#define SFLOW_SAMPLE_RATE_KEY_100G "100000" +#define SFLOW_SAMPLE_RATE_KEY_50G "50000" +#define SFLOW_SAMPLE_RATE_KEY_40G "40000" +#define SFLOW_SAMPLE_RATE_KEY_25G "25000" +#define SFLOW_SAMPLE_RATE_KEY_10G "10000" +#define SFLOW_SAMPLE_RATE_KEY_1G "1000" + +#define SFLOW_SAMPLE_RATE_VALUE_400G "40000" +#define SFLOW_SAMPLE_RATE_VALUE_100G "10000" +#define SFLOW_SAMPLE_RATE_VALUE_50G "5000" +#define SFLOW_SAMPLE_RATE_VALUE_40G "4000" +#define SFLOW_SAMPLE_RATE_VALUE_25G "2500" +#define SFLOW_SAMPLE_RATE_VALUE_10G "1000" +#define SFLOW_SAMPLE_RATE_VALUE_1G "100" + +#define SFLOW_ERROR_SPEED_STR "error" + +struct SflowPortInfo +{ + bool local_conf; + std::string speed; + std::string rate; + std::string admin; +}; + +/* Port to Local config map */ +typedef std::map SflowPortConfMap; + +class SflowMgr : public Orch +{ +public: + SflowMgr(DBConnector *cfgDb, DBConnector *appDb, const std::vector &tableNames); + + using Orch::doTask; +private: + Table m_cfgSflowTable; + Table m_cfgSflowSessionTable; + ProducerStateTable m_appSflowTable; + ProducerStateTable m_appSflowSessionTable; + SflowPortConfMap m_sflowPortConfMap; + bool m_intfAllConf; + bool m_gEnable; + + void doTask(Consumer &consumer); + void sflowHandleService(bool enable); + void sflowUpdatePortInfo(Consumer &consumer); + void sflowHandleSessionAll(bool enable); + void sflowHandleSessionLocal(bool enable); + void sflowCheckAndFillValues(std::string alias, std::vector &fvs); + void sflowGetPortInfo(std::vector &fvs, SflowPortInfo &local_info); + void sflowGetGlobalInfo(std::vector &fvs, std::string speed); +}; + +} diff --git a/cfgmgr/sflowmgrd.cpp b/cfgmgr/sflowmgrd.cpp new file mode 100644 index 000000000000..343f0ead0acf --- /dev/null +++ b/cfgmgr/sflowmgrd.cpp @@ -0,0 +1,88 @@ +#include +#include +#include +#include +#include + +#include "exec.h" +#include "sflowmgr.h" +#include "schema.h" +#include "select.h" + +using namespace std; +using namespace swss; + +/* select() function timeout retry time, in millisecond */ +#define SELECT_TIMEOUT 1000 + +/* + * Following global variables are defined here for the purpose of + * using existing Orch class which is to be refactored soon to + * eliminate the direct exposure of the global variables. + * + * Once Orch class refactoring is done, these global variables + * should be removed from here. + */ +int gBatchSize = 0; +bool gSwssRecord = false; +bool gLogRotate = false; +ofstream gRecordOfs; +string gRecordFile; +/* Global database mutex */ +mutex gDbMutex; + +int main(int argc, char **argv) +{ + Logger::linkToDbNative("sflowmgrd"); + SWSS_LOG_ENTER(); + + SWSS_LOG_NOTICE("--- Starting sflowmgrd ---"); + + try + { + vector cfg_sflow_tables = { + CFG_SFLOW_TABLE_NAME, + CFG_SFLOW_SESSION_TABLE_NAME, + CFG_PORT_TABLE_NAME + }; + + DBConnector cfgDb(CONFIG_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); + DBConnector appDb(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0); + + SflowMgr sflowmgr(&cfgDb, &appDb, cfg_sflow_tables); + + vector cfgOrchList = {&sflowmgr}; + + swss::Select s; + for (Orch *o : cfgOrchList) + { + s.addSelectables(o->getSelectables()); + } + + while (true) + { + Selectable *sel; + int ret; + + ret = s.select(&sel, SELECT_TIMEOUT); + if (ret == Select::ERROR) + { + SWSS_LOG_NOTICE("Error: %s!", strerror(errno)); + continue; + } + if (ret == Select::TIMEOUT) + { + sflowmgr.doTask(); + continue; + } + + auto *c = (Executor *)sel; + c->execute(); + } + } + catch (const exception &e) + { + SWSS_LOG_ERROR("Runtime error: %s", e.what()); + } + return -1; +} diff --git a/orchagent/Makefile.am b/orchagent/Makefile.am index 7db2f403b13a..d7a705a490c8 100644 --- a/orchagent/Makefile.am +++ b/orchagent/Makefile.am @@ -53,6 +53,7 @@ orchagent_SOURCES = \ flexcounterorch.cpp \ watermarkorch.cpp \ policerorch.cpp \ + sfloworch.cpp \ chassisorch.cpp orchagent_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index e174160e0269..df151f996630 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -193,6 +193,13 @@ bool OrchDaemon::init() WatermarkOrch *wm_orch = new WatermarkOrch(m_configDb, wm_tables); + vector sflow_tables = { + APP_SFLOW_TABLE_NAME, + APP_SFLOW_SESSION_TABLE_NAME, + APP_SFLOW_SAMPLE_RATE_TABLE_NAME + }; + SflowOrch *sflow_orch = new SflowOrch(m_applDb, sflow_tables); + /* * The order of the orch list is important for state restore of warm start and * the queued processing in m_toSync map after gPortsOrch->allPortsReady() is set. @@ -201,7 +208,7 @@ bool OrchDaemon::init() * when iterating ConsumerMap. * That is ensured implicitly by the order of map key, "LAG_TABLE" is smaller than "VLAN_TABLE" in lexicographic order. */ - m_orchList = { gSwitchOrch, gCrmOrch, gBufferOrch, gPortsOrch, gIntfsOrch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, wm_orch, policer_orch }; + m_orchList = { gSwitchOrch, gCrmOrch, gBufferOrch, gPortsOrch, gIntfsOrch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, wm_orch, policer_orch, sflow_orch}; bool initialize_dtel = false; diff --git a/orchagent/orchdaemon.h b/orchagent/orchdaemon.h index 14d43fb2c1af..bfa52fdef818 100644 --- a/orchagent/orchdaemon.h +++ b/orchagent/orchdaemon.h @@ -27,6 +27,7 @@ #include "flexcounterorch.h" #include "watermarkorch.h" #include "policerorch.h" +#include "sfloworch.h" #include "directory.h" using namespace swss; diff --git a/orchagent/saihelper.cpp b/orchagent/saihelper.cpp index 4b01ddeb39f8..dbadc7e28830 100644 --- a/orchagent/saihelper.cpp +++ b/orchagent/saihelper.cpp @@ -41,6 +41,7 @@ sai_mirror_api_t* sai_mirror_api; sai_fdb_api_t* sai_fdb_api; sai_dtel_api_t* sai_dtel_api; sai_bmtor_api_t* sai_bmtor_api; +sai_samplepacket_api_t* sai_samplepacket_api; extern sai_object_id_t gSwitchId; extern bool gSairedisRecord; @@ -130,6 +131,7 @@ void initSaiApi() sai_api_query(SAI_API_ACL, (void **)&sai_acl_api); sai_api_query(SAI_API_DTEL, (void **)&sai_dtel_api); sai_api_query((sai_api_t)SAI_API_BMTOR, (void **)&sai_bmtor_api); + sai_api_query(SAI_API_SAMPLEPACKET, (void **)&sai_samplepacket_api); sai_log_set(SAI_API_SWITCH, SAI_LOG_LEVEL_NOTICE); sai_log_set(SAI_API_BRIDGE, SAI_LOG_LEVEL_NOTICE); @@ -156,6 +158,7 @@ void initSaiApi() sai_log_set(SAI_API_ACL, SAI_LOG_LEVEL_NOTICE); sai_log_set(SAI_API_DTEL, SAI_LOG_LEVEL_NOTICE); sai_log_set((sai_api_t)SAI_API_BMTOR, SAI_LOG_LEVEL_NOTICE); + sai_log_set(SAI_API_SAMPLEPACKET, SAI_LOG_LEVEL_NOTICE); } void initSaiRedis(const string &record_location) diff --git a/orchagent/sfloworch.cpp b/orchagent/sfloworch.cpp new file mode 100644 index 000000000000..3c00c23f9f4d --- /dev/null +++ b/orchagent/sfloworch.cpp @@ -0,0 +1,347 @@ +#include "sai.h" +#include "sfloworch.h" +#include "tokenize.h" + +using namespace std; +using namespace swss; + +extern sai_samplepacket_api_t* sai_samplepacket_api; +extern sai_port_api_t* sai_port_api; +extern sai_object_id_t gSwitchId; +extern PortsOrch* gPortsOrch; + +SflowOrch::SflowOrch(DBConnector* db, vector &tableNames) : + Orch(db, tableNames) +{ + SWSS_LOG_ENTER(); + m_sflowStatus = false; +} + +bool SflowOrch::sflowCreateSession(uint32_t rate, SflowSession &session) +{ + sai_attribute_t attr; + sai_object_id_t session_id = SAI_NULL_OBJECT_ID; + sai_status_t sai_rc; + + attr.id = SAI_SAMPLEPACKET_ATTR_SAMPLE_RATE; + attr.value.u32 = rate; + + sai_rc = sai_samplepacket_api->create_samplepacket(&session_id, gSwitchId, + 1, &attr); + if (sai_rc != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to create sample packet session with rate %d", rate); + return false; + } + session.m_sample_id = session_id; + session.ref_count = 0; + return true; +} + +bool SflowOrch::sflowDestroySession(SflowSession &session) +{ + sai_status_t sai_rc; + + sai_rc = sai_samplepacket_api->remove_samplepacket(session.m_sample_id); + if (sai_rc != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to destroy sample packet session with id %lx", + session.m_sample_id); + return false; + } + return true; +} + +bool SflowOrch::sflowUpdateRate(sai_object_id_t port_id, uint32_t rate) +{ + auto port_info = m_sflowPortInfoMap.find(port_id); + auto session = m_sflowRateSampleMap.find(rate); + SflowSession new_session; + uint32_t old_rate = sflowSessionGetRate(port_info->second.m_sample_id); + + if (session == m_sflowRateSampleMap.end()) + { + if (!sflowCreateSession(rate, new_session)) + { + SWSS_LOG_ERROR("Creating sflow session with rate %d failed", rate); + return false; + } + m_sflowRateSampleMap[rate] = new_session; + } + else + { + new_session = session->second; + } + + if (port_info->second.admin_state) + { + if (!sflowAddPort(new_session.m_sample_id, port_id)) + { + return false; + } + } + port_info->second.m_sample_id = new_session.m_sample_id; + + m_sflowRateSampleMap[rate].ref_count++; + m_sflowRateSampleMap[old_rate].ref_count--; + if (m_sflowRateSampleMap[old_rate].ref_count == 0) + { + if (!sflowDestroySession(m_sflowRateSampleMap[old_rate])) + { + SWSS_LOG_ERROR("Failed to clean old session %lx with rate %d", + m_sflowRateSampleMap[old_rate].m_sample_id, old_rate); + } + else + { + m_sflowRateSampleMap.erase(old_rate); + } + } + return true; +} + +bool SflowOrch::sflowAddPort(sai_object_id_t sample_id, sai_object_id_t port_id) +{ + sai_attribute_t attr; + sai_status_t sai_rc; + + attr.id = SAI_PORT_ATTR_INGRESS_SAMPLEPACKET_ENABLE; + attr.value.oid = sample_id; + sai_rc = sai_port_api->set_port_attribute(port_id, &attr); + + if (sai_rc != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to set session %lx on port %lx", sample_id, port_id); + return false; + } + return true; +} + +bool SflowOrch::sflowDelPort(sai_object_id_t port_id) +{ + sai_attribute_t attr; + sai_status_t sai_rc; + + attr.id = SAI_PORT_ATTR_INGRESS_SAMPLEPACKET_ENABLE; + attr.value.oid = SAI_NULL_OBJECT_ID; + sai_rc = sai_port_api->set_port_attribute(port_id, &attr); + + if (sai_rc != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to delete session on port %lx", port_id); + return false; + } + return true; +} + +void SflowOrch::sflowExtractInfo(vector &fvs, bool &admin, uint32_t &rate) +{ + for (auto i : fvs) + { + if (fvField(i) == "admin_state") + { + if (fvValue(i) == "up") + { + admin = true; + } + else if (fvValue(i) == "down") + { + admin = false; + } + } + else if (fvField(i) == "sample_rate") + { + if (fvValue(i) != "error") + { + rate = (uint32_t)stoul(fvValue(i)); + } + else + { + rate = 0; + } + } + } +} + +void SflowOrch::sflowStatusSet(Consumer &consumer) +{ + auto it = consumer.m_toSync.begin(); + + while (it != consumer.m_toSync.end()) + { + auto tuple = it->second; + string op = kfvOp(tuple); + uint32_t rate = 0; + + if (op == SET_COMMAND) + { + sflowExtractInfo(kfvFieldsValues(tuple), m_sflowStatus, rate); + } + else if (op == DEL_COMMAND) + { + m_sflowStatus = false; + } + it = consumer.m_toSync.erase(it); + } +} + +uint32_t SflowOrch::sflowSessionGetRate(sai_object_id_t m_sample_id) +{ + for (auto it: m_sflowRateSampleMap) + { + if (it.second.m_sample_id == m_sample_id) + { + return it.first; + } + } + return 0; +} + +bool SflowOrch::handleSflowSessionDel(sai_object_id_t port_id) +{ + auto sflowInfo = m_sflowPortInfoMap.find(port_id); + + if (sflowInfo != m_sflowPortInfoMap.end()) + { + uint32_t rate = sflowSessionGetRate(sflowInfo->second.m_sample_id); + if (sflowInfo->second.admin_state) + { + if (!sflowDelPort(port_id)) + { + return false; + } + sflowInfo->second.admin_state = false; + } + + m_sflowPortInfoMap.erase(port_id); + m_sflowRateSampleMap[rate].ref_count--; + if (m_sflowRateSampleMap[rate].ref_count == 0) + { + if (!sflowDestroySession(m_sflowRateSampleMap[rate])) + { + return false; + } + m_sflowRateSampleMap.erase(rate); + } + } + return true; +} + +void SflowOrch::doTask(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + Port port; + string table_name = consumer.getTableName(); + + if (table_name == APP_SFLOW_TABLE_NAME) + { + sflowStatusSet(consumer); + return; + } + if (!gPortsOrch->allPortsReady()) + { + return; + } + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + auto tuple = it->second; + string op = kfvOp(tuple); + string alias = kfvKey(tuple); + + gPortsOrch->getPort(alias, port); + if (op == SET_COMMAND) + { + bool admin_state = m_sflowStatus; + uint32_t rate = 0; + + if (!m_sflowStatus) + { + return; + } + auto sflowInfo = m_sflowPortInfoMap.find(port.m_port_id); + if (sflowInfo != m_sflowPortInfoMap.end()) + { + rate = sflowSessionGetRate(sflowInfo->second.m_sample_id); + admin_state = sflowInfo->second.admin_state; + } + + sflowExtractInfo(kfvFieldsValues(tuple), admin_state, rate); + if (sflowInfo == m_sflowPortInfoMap.end()) + { + if (rate == 0) + { + it++; + continue; + } + + SflowPortInfo port_info; + auto session_info = m_sflowRateSampleMap.find(rate); + if (session_info != m_sflowRateSampleMap.end()) + { + port_info.m_sample_id = session_info->second.m_sample_id; + } + else + { + SflowSession session; + if (!sflowCreateSession(rate, session)) + { + it++; + continue; + } + m_sflowRateSampleMap[rate] = session; + port_info.m_sample_id = session.m_sample_id; + } + if (admin_state) + { + if (!sflowAddPort(port_info.m_sample_id, port.m_port_id)) + { + it++; + continue; + } + } + port_info.admin_state = admin_state; + m_sflowPortInfoMap[port.m_port_id] = port_info; + m_sflowRateSampleMap[rate].ref_count++; + } + else + { + if (rate != sflowSessionGetRate(sflowInfo->second.m_sample_id)) + { + if (!sflowUpdateRate(port.m_port_id, rate)) + { + it++; + continue; + } + } + if (admin_state != sflowInfo->second.admin_state) + { + bool ret = false; + if (admin_state) + { + ret = sflowAddPort(sflowInfo->second.m_sample_id, port.m_port_id); + } + else + { + ret = sflowDelPort(port.m_port_id); + } + if (!ret) + { + it++; + continue; + } + sflowInfo->second.admin_state = admin_state; + } + } + } + else if (op == DEL_COMMAND) + { + if (!handleSflowSessionDel(port.m_port_id)) + { + it++; + continue; + } + } + it = consumer.m_toSync.erase(it); + } +} diff --git a/orchagent/sfloworch.h b/orchagent/sfloworch.h new file mode 100644 index 000000000000..ea63c092a435 --- /dev/null +++ b/orchagent/sfloworch.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include + +#include "orch.h" +#include "portsorch.h" + +struct SflowPortInfo +{ + bool admin_state; + sai_object_id_t m_sample_id; +}; + +struct SflowSession +{ + sai_object_id_t m_sample_id; + uint32_t ref_count; +}; + +/* SAI Port to Sflow Port Info Map */ +typedef std::map SflowPortInfoMap; + +/* Sample-rate(unsigned int) to Sflow session map */ +typedef std::map SflowRateSampleMap; + +class SflowOrch : public Orch +{ +public: + SflowOrch(DBConnector* db, std::vector &tableNames); + +private: + SflowPortInfoMap m_sflowPortInfoMap; + SflowRateSampleMap m_sflowRateSampleMap; + bool m_sflowStatus; + + virtual void doTask(Consumer& consumer); + bool sflowCreateSession(uint32_t rate, SflowSession &session); + bool sflowDestroySession(SflowSession &session); + bool sflowAddPort(sai_object_id_t sample_id, sai_object_id_t port_id); + bool sflowDelPort(sai_object_id_t port_id); + void sflowStatusSet(Consumer &consumer); + bool sflowUpdateRate(sai_object_id_t port_id, uint32_t rate); + uint32_t sflowSessionGetRate(sai_object_id_t sample_id); + bool handleSflowSessionDel(sai_object_id_t port_id); + void sflowExtractInfo(std::vector &fvs, bool &admin, uint32_t &rate); +}; diff --git a/tests/test_sflow.py b/tests/test_sflow.py new file mode 100644 index 000000000000..1e064d699e31 --- /dev/null +++ b/tests/test_sflow.py @@ -0,0 +1,130 @@ +from swsscommon import swsscommon + +import time +import os + +class TestSflow(object): + def setup_sflow(self, dvs): + self.pdb = swsscommon.DBConnector(0, dvs.redis_sock, 0) + self.adb = swsscommon.DBConnector(1, dvs.redis_sock, 0) + ptbl = swsscommon.ProducerStateTable(self.pdb, "SFLOW_TABLE") + fvs = swsscommon.FieldValuePairs([("admin_state", "up")]) + ptbl.set("global", fvs) + + time.sleep(1) + + def test_SflowDisble(self, dvs, testlog): + self.setup_sflow(dvs) + ptbl = swsscommon.ProducerStateTable(self.pdb, "SFLOW_SESSION_TABLE") + gtbl = swsscommon.ProducerStateTable(self.pdb, "SFLOW_TABLE") + fvs = swsscommon.FieldValuePairs([("admin_state", "down")]) + gtbl.set("global", fvs) + + time.sleep(1) + fvs = swsscommon.FieldValuePairs([("admin_state", "up"),("sample_rate","1000")]) + ptbl.set("Ethernet0", fvs) + + time.sleep(1) + + + atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_PORT") + (status, fvs) = atbl.get(dvs.asicdb.portnamemap["Ethernet0"]) + + assert status == True + + sample_session = "" + for fv in fvs: + if fv[0] == "SAI_PORT_ATTR_INGRESS_SAMPLEPACKET_ENABLE": + sample_session = fv[1] + + assert sample_session == "" + + fvs = swsscommon.FieldValuePairs([("admin_state", "up")]) + gtbl.set("global", fvs) + + time.sleep(1) + + atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_PORT") + (status, fvs) = atbl.get(dvs.asicdb.portnamemap["Ethernet0"]) + + assert status == True + + sample_session = "" + for fv in fvs: + if fv[0] == "SAI_PORT_ATTR_INGRESS_SAMPLEPACKET_ENABLE": + sample_session = fv[1] + + assert sample_session != "oid:0x0" + assert sample_session != "" + + atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_SAMPLEPACKET") + (status, fvs) = atbl.get(sample_session) + + assert status == True + + for fv in fvs: + if fv[0] == "SAI_SAMPLEPACKET_ATTR_SAMPLE_RATE": + assert fv[1] == "1000" + + ptbl._del("Ethernet0") + + def test_InterfaceSet(self, dvs, testlog): + self.setup_sflow(dvs) + ptbl = swsscommon.ProducerStateTable(self.pdb, "SFLOW_SESSION_TABLE") + gtbl = swsscommon.ProducerStateTable(self.pdb, "SFLOW_TABLE") + fvs = swsscommon.FieldValuePairs([("admin_state", "up"),("sample_rate","1000")]) + ptbl.set("Ethernet0", fvs) + + time.sleep(1) + + atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_PORT") + (status, fvs) = atbl.get(dvs.asicdb.portnamemap["Ethernet0"]) + + assert status == True + + sample_session = "" + for fv in fvs: + if fv[0] == "SAI_PORT_ATTR_INGRESS_SAMPLEPACKET_ENABLE": + sample_session = fv[1] + + assert sample_session != "" + + atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_SAMPLEPACKET") + (status, fvs) = atbl.get(sample_session) + + assert status == True + + for fv in fvs: + if fv[0] == "SAI_SAMPLEPACKET_ATTR_SAMPLE_RATE": + assert fv[1] == "1000" + + ptbl._del("Ethernet0") + + def test_ConfigDel(self, dvs, testlog): + self.setup_sflow(dvs) + ptbl = swsscommon.ProducerStateTable(self.pdb, "SFLOW_SESSION_TABLE") + fvs = swsscommon.FieldValuePairs([("admin_state", "up"),("sample_rate","1000")]) + ptbl.set("Ethernet0", fvs) + + time.sleep(1) + + ptbl._del("Ethernet0") + + time.sleep(1) + + atbl = swsscommon.Table(self.adb, "ASIC_STATE:SAI_OBJECT_TYPE_PORT") + (status, fvs) = atbl.get(dvs.asicdb.portnamemap["Ethernet0"]) + + assert status == True + + sample_session = "" + speed = "" + + for fv in fvs: + if fv[0] == "SAI_PORT_ATTR_INGRESS_SAMPLEPACKET_ENABLE": + sample_session = fv[1] + elif fv[0] == "SAI_PORT_ATTR_SPEED": + speed = fv[1] + + assert speed != "" + assert sample_session == "oid:0x0"