Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[syncd] Use hash as queue for fdb notifications #1060

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ libSyncd_a_SOURCES = \
NotificationHandler.cpp \
NotificationProcessor.cpp \
NotificationQueue.cpp \
NotificationQueueHash.cpp \
PortMap.cpp \
PortMapParser.cpp \
RedisClient.cpp \
Expand Down
30 changes: 28 additions & 2 deletions syncd/NotificationHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ NotificationHandler::NotificationHandler(
memset(&m_switchNotifications, 0, sizeof(m_switchNotifications));

m_notificationQueue = processor->getQueue();

m_notificationQueueHash = processor->getQueueHash();
}

NotificationHandler::~NotificationHandler()
Expand Down Expand Up @@ -133,9 +135,14 @@ void NotificationHandler::onFdbEvent(
{
SWSS_LOG_ENTER();

std::string s = sai_serialize_fdb_event_ntf(count, data);
for (uint32_t i = 0; i < count; i++)
{
std::string key = sai_serialize_fdb_entry(data[i].fdb_entry);

std::string s = sai_serialize_fdb_event_ntf(1, &data[i]);

enqueueNotification(SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT, s);
enqueueNotificationHash(key, SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT, s);
}
}

void NotificationHandler::onPortStateChange(
Expand Down Expand Up @@ -220,3 +227,22 @@ void NotificationHandler::enqueueNotification(
enqueueNotification(op, data, entry);
}

void NotificationHandler::enqueueNotificationHash(
_In_ const std::string& key,
_In_ const std::string& op,
_In_ const std::string& data)
{
SWSS_LOG_ENTER();

SWSS_LOG_INFO("%s %s", op.c_str(), data.c_str());

std::vector<swss::FieldValueTuple> entry;

swss::KeyOpFieldsValuesTuple item(op, data, entry);

if (m_notificationQueueHash->enqueue(key, item))
{
m_processor->signal();
}
}

8 changes: 8 additions & 0 deletions syncd/NotificationHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern "C"{
}

#include "NotificationQueue.h"
#include "NotificationQueueHash.h"
#include "NotificationProcessor.h"

#include "swss/table.h"
Expand Down Expand Up @@ -71,12 +72,19 @@ namespace syncd
_In_ const std::string& op,
_In_ const std::string& data);

void enqueueNotificationHash(
_In_ const std::string& key,
_In_ const std::string& op,
_In_ const std::string& data);

private:

sai_switch_notifications_t m_switchNotifications;

std::shared_ptr<NotificationQueue> m_notificationQueue;

std::shared_ptr<NotificationQueueHash> m_notificationQueueHash;

std::shared_ptr<NotificationProcessor> m_processor;
};
}
14 changes: 14 additions & 0 deletions syncd/NotificationProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ NotificationProcessor::NotificationProcessor(
m_runThread = false;

m_notificationQueue = std::make_shared<NotificationQueue>();

m_notificationQueueHash = std::make_shared<NotificationQueueHash>();
}

NotificationProcessor::~NotificationProcessor()
Expand Down Expand Up @@ -620,6 +622,11 @@ void NotificationProcessor::ntf_process_function()
{
processNotification(item);
}

while (m_notificationQueueHash->tryDequeue(item))
{
processNotification(item);
}
}
}

Expand Down Expand Up @@ -661,3 +668,10 @@ std::shared_ptr<NotificationQueue> NotificationProcessor::getQueue() const

return m_notificationQueue;
}

std::shared_ptr<NotificationQueueHash> NotificationProcessor::getQueueHash() const
{
SWSS_LOG_ENTER();

return m_notificationQueueHash;
}
5 changes: 5 additions & 0 deletions syncd/NotificationProcessor.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "NotificationQueue.h"
#include "NotificationQueueHash.h"
#include "VirtualOidTranslator.h"
#include "RedisClient.h"
#include "NotificationProducerBase.h"
Expand Down Expand Up @@ -29,6 +30,8 @@ namespace syncd

std::shared_ptr<NotificationQueue> getQueue() const;

std::shared_ptr<NotificationQueueHash> getQueueHash() const;

void signal();

void startNotificationsProcessingThread();
Expand Down Expand Up @@ -124,6 +127,8 @@ namespace syncd

std::shared_ptr<NotificationQueue> m_notificationQueue;

std::shared_ptr<NotificationQueueHash> m_notificationQueueHash;

std::shared_ptr<std::thread> m_ntf_process_thread;

// condition variable will be used to notify processing thread
Expand Down
78 changes: 78 additions & 0 deletions syncd/NotificationQueueHash.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#include "NotificationQueueHash.h"
#include "sairediscommon.h"

using namespace syncd;

#define NOTIFICATION_QUEUE_DROP_COUNT_INDICATOR (1000)
#define MUTEX std::lock_guard<std::mutex> _lock(m_mutex);

NotificationQueueHash::NotificationQueueHash():
m_dropCount(0)
{
SWSS_LOG_ENTER();

// empty;
}

NotificationQueueHash::~NotificationQueueHash()
{
SWSS_LOG_ENTER();

// empty
}

bool NotificationQueueHash::enqueue(
_In_ const std::string& key,
_In_ const swss::KeyOpFieldsValuesTuple& item)
{
MUTEX;

SWSS_LOG_ENTER();

// if key is already in the hash, then we will override it previous content
// which means we are dropping that item

if (m_queueHash.find(key) != m_queueHash.end())
{
m_dropCount++;

if (!(m_dropCount % NOTIFICATION_QUEUE_DROP_COUNT_INDICATOR))
{
SWSS_LOG_NOTICE("Dropped total of %zu notifications", m_dropCount);
}
}

m_queueHash[key] = item;

return true;
}

bool NotificationQueueHash::tryDequeue(
_Out_ swss::KeyOpFieldsValuesTuple& item)
{
MUTEX;

SWSS_LOG_ENTER();

if (m_queueHash.empty())
{
return false;
}

auto it = m_queueHash.begin();

item = it->second;

m_queueHash.erase(it);

return true;
}

size_t NotificationQueueHash::getQueueSize()
{
MUTEX;

SWSS_LOG_ENTER();

return m_queueHash.size();
}
41 changes: 41 additions & 0 deletions syncd/NotificationQueueHash.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

extern "C" {
#include <saimetadata.h>
}

#include "swss/table.h"

#include <mutex>
#include <unordered_map>

namespace syncd
{
class NotificationQueueHash
{
public:

NotificationQueueHash();

virtual ~NotificationQueueHash();

public:

bool enqueue(
_In_ const std::string& key,
_In_ const swss::KeyOpFieldsValuesTuple& msg);

bool tryDequeue(
_Out_ swss::KeyOpFieldsValuesTuple& msg);

size_t getQueueSize();

private:

std::mutex m_mutex;

std::unordered_map<std::string, swss::KeyOpFieldsValuesTuple> m_queueHash;

size_t m_dropCount;
};
}
2 changes: 2 additions & 0 deletions unittest/syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ tests_SOURCES = main.cpp \
MockHelper.cpp \
TestCommandLineOptions.cpp \
TestFlexCounter.cpp \
TestNotificationHandler.cpp \
TestNotificationQueueHash.cpp
TestNotificationQueue.cpp

tests_CXXFLAGS = $(DBGFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS_COMMON)
Expand Down
8 changes: 0 additions & 8 deletions unittest/syncd/MockHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,3 @@ namespace test_syncd {
mock_objectTypeQuery_result = mock_result;
}
}

namespace syncd {
sai_object_type_t VidManager::objectTypeQuery(_In_ sai_object_id_t objectId)
{
SWSS_LOG_ENTER();
return test_syncd::mock_objectTypeQuery_result;
}
}
34 changes: 17 additions & 17 deletions unittest/syncd/TestFlexCounter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ TEST(FlexCounter, addRemoveCounterForFlowCounter)
std::shared_ptr<MockableSaiInterface> sai(new MockableSaiInterface());
FlexCounter fc("test", sai, "COUNTERS_DB");

sai_object_id_t counterVid{100};
sai_object_id_t counterRid{100};
sai_object_id_t counterVid{0x54000000000000};
sai_object_id_t counterRid{0x54000000000000};
std::vector<swss::FieldValueTuple> values;
values.emplace_back(FLOW_COUNTER_ID_LIST, "SAI_COUNTER_STAT_PACKETS,SAI_COUNTER_STAT_BYTES");

Expand Down Expand Up @@ -46,12 +46,12 @@ TEST(FlexCounter, addRemoveCounterForFlowCounter)
std::vector<std::string> keys;
countersTable.getKeys(keys);
EXPECT_EQ(keys.size(), size_t(1));
EXPECT_EQ(keys[0], "oid:0x64");
EXPECT_EQ(keys[0], "oid:0x54000000000000");

std::string value;
countersTable.hget("oid:0x64", "SAI_COUNTER_STAT_PACKETS", value);
countersTable.hget("oid:0x54000000000000", "SAI_COUNTER_STAT_PACKETS", value);
EXPECT_EQ(value, "100");
countersTable.hget("oid:0x64", "SAI_COUNTER_STAT_BYTES", value);
countersTable.hget("oid:0x54000000000000", "SAI_COUNTER_STAT_BYTES", value);
EXPECT_EQ(value, "200");

fc.removeCounter(counterVid);
Expand Down Expand Up @@ -81,8 +81,8 @@ TEST(FlexCounter, addRemoveCounterForMACsecFlow)
std::shared_ptr<MockableSaiInterface> sai(new MockableSaiInterface());
FlexCounter fc("test", sai, "COUNTERS_DB");

sai_object_id_t macsecFlowVid{101};
sai_object_id_t macsecFlowRid{101};
sai_object_id_t macsecFlowVid{0x5a000000000000};
sai_object_id_t macsecFlowRid{0x5a000000000000};
std::vector<swss::FieldValueTuple> values;
values.emplace_back(MACSEC_FLOW_COUNTER_ID_LIST, "SAI_MACSEC_FLOW_STAT_CONTROL_PKTS,SAI_MACSEC_FLOW_STAT_PKTS_UNTAGGED");

Expand Down Expand Up @@ -114,17 +114,17 @@ TEST(FlexCounter, addRemoveCounterForMACsecFlow)
std::vector<std::string> keys;
countersTable.getKeys(keys);
EXPECT_EQ(keys.size(), size_t(1));
EXPECT_EQ(keys[0], "oid:0x65");
EXPECT_EQ(keys[0], "oid:0x5a000000000000");

std::string value;
countersTable.hget("oid:0x65", "SAI_MACSEC_FLOW_STAT_CONTROL_PKTS", value);
countersTable.hget("oid:0x5a000000000000", "SAI_MACSEC_FLOW_STAT_CONTROL_PKTS", value);
//EXPECT_EQ(value, "100");
countersTable.hget("oid:0x65", "SAI_MACSEC_FLOW_STAT_PKTS_UNTAGGED", value);
countersTable.hget("oid:0x5a000000000000", "SAI_MACSEC_FLOW_STAT_PKTS_UNTAGGED", value);
//EXPECT_EQ(value, "200");

fc.removeCounter(macsecFlowVid);
EXPECT_EQ(fc.isEmpty(), true);
countersTable.del("oid:0x65");
countersTable.del("oid:0x5a000000000000");
countersTable.getKeys(keys);

ASSERT_TRUE(keys.empty());
Expand All @@ -136,8 +136,8 @@ TEST(FlexCounter, addRemoveCounterForMACsecSA)
std::shared_ptr<MockableSaiInterface> sai(new MockableSaiInterface());
FlexCounter fc("test", sai, "COUNTERS_DB");

sai_object_id_t macsecSAVid{102};
sai_object_id_t macsecSARid{102};
sai_object_id_t macsecSAVid{0x5c000000000000};
sai_object_id_t macsecSARid{0x5c000000000000};
std::vector<swss::FieldValueTuple> values;
values.emplace_back(MACSEC_SA_COUNTER_ID_LIST, "SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED,SAI_MACSEC_SA_STAT_OCTETS_PROTECTED");

Expand Down Expand Up @@ -169,17 +169,17 @@ TEST(FlexCounter, addRemoveCounterForMACsecSA)
std::vector<std::string> keys;
countersTable.getKeys(keys);
EXPECT_EQ(keys.size(), size_t(1));
EXPECT_EQ(keys[0], "oid:0x66");
EXPECT_EQ(keys[0], "oid:0x5c000000000000");

std::string value;
countersTable.hget("oid:0x66", "SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED", value);
countersTable.hget("oid:0x5c000000000000", "SAI_MACSEC_SA_STAT_OCTETS_ENCRYPTED", value);
//EXPECT_EQ(value, "100");
countersTable.hget("oid:0x66", "SAI_MACSEC_SA_STAT_OCTETS_PROTECTED", value);
countersTable.hget("oid:0x5c000000000000", "SAI_MACSEC_SA_STAT_OCTETS_PROTECTED", value);
//EXPECT_EQ(value, "200");

fc.removeCounter(macsecSAVid);
EXPECT_EQ(fc.isEmpty(), true);
countersTable.del("oid:0x66");
countersTable.del("oid:0x5c000000000000");
countersTable.getKeys(keys);

ASSERT_TRUE(keys.empty());
Expand Down
Loading