Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New p4orch development changes #3066

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cfgmgr/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CFLAGS_SAI = -I /usr/include/sai
LIBNL_CFLAGS = -I/usr/include/libnl3
LIBNL_LIBS = -lnl-genl-3 -lnl-route-3 -lnl-3
SAIMETA_LIBS = -lsaimeta -lsaimetadata -lzmq
COMMON_LIBS = -lswsscommon
COMMON_LIBS = -lswsscommon -lpthread

bin_PROGRAMS = vlanmgrd teammgrd portmgrd intfmgrd buffermgrd vrfmgrd nbrmgrd vxlanmgrd sflowmgrd natmgrd coppmgrd tunnelmgrd macsecmgrd fabricmgrd

Expand Down
2 changes: 1 addition & 1 deletion orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class Orch
void addExecutor(Executor* executor);
Executor *getExecutor(std::string executorName);

ResponsePublisher m_publisher;
ResponsePublisher m_publisher{"APPL_STATE_DB"};
private:
void addConsumer(swss::DBConnector *db, std::string tableName, int pri = default_orch_pri);
};
Expand Down
2 changes: 2 additions & 0 deletions orchagent/p4orch/p4orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ void P4Orch::doTask(Consumer &consumer)
{
manager->drain();
}

m_publisher.flush();
}

void P4Orch::doTask(swss::SelectableTimer &timer)
Expand Down
3 changes: 3 additions & 0 deletions orchagent/p4orch/p4orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@ class P4Orch : public Orch
// Notification consumer for port state change
swss::NotificationConsumer *m_portStatusNotificationConsumer;

// Sepcial publisher that writes to APPL DB instead of APPL STATE DB.
ResponsePublisher m_publisher{"APPL_DB", /*bool buffered=*/true, /*db_write_thread=*/true};

friend class p4orch::test::WcmpManagerTest;
};
1 change: 1 addition & 0 deletions orchagent/p4orch/tests/return_code_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ TEST(ReturnCodeTest, SaiCodeToReturnCodeMapping)
{SAI_STATUS_TABLE_FULL, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_NOT_IMPLEMENTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_OBJECT_IN_USE, StatusCode::SWSS_RC_IN_USE},
{SAI_STATUS_NOT_EXECUTED, StatusCode::SWSS_RC_NOT_EXECUTED},
{SAI_STATUS_FAILURE, StatusCode::SWSS_RC_UNKNOWN},
{SAI_STATUS_INVALID_ATTRIBUTE_0, StatusCode::SWSS_RC_INVALID_PARAM},
{SAI_STATUS_INVALID_ATTRIBUTE_10, StatusCode::SWSS_RC_INVALID_PARAM},
Expand Down
120 changes: 103 additions & 17 deletions orchagent/response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,45 @@ void RecordResponse(const std::string &response_channel, const std::string &key,

} // namespace

ResponsePublisher::ResponsePublisher(bool buffered)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please provide the details of response_publisher in the description? What was the behavior before and whats the new change? Is it breaking any previous implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description.

: m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)),
m_pipe(std::make_unique<swss::RedisPipeline>(m_db.get())), m_buffered(buffered)
ResponsePublisher::ResponsePublisher(const std::string &dbName, bool buffered, bool db_write_thread)
: m_db(std::make_unique<swss::DBConnector>(dbName, 0)), m_buffered(buffered)
{
if (m_buffered)
{
m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get());
m_db_pipe = std::make_unique<swss::RedisPipeline>(m_db.get());
}
else
{
m_ntf_pipe = std::make_unique<swss::RedisPipeline>(m_db.get(), 1);
m_db_pipe = std::make_unique<swss::RedisPipeline>(m_db.get(), 1);
}
if (db_write_thread)
{
m_update_thread = std::unique_ptr<std::thread>(new std::thread(&ResponsePublisher::dbUpdateThread, this));
}
}

void ResponsePublisher::publish(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &intent_attrs, const ReturnCode &status,
const std::vector<swss::FieldValueTuple> &state_attrs, bool replace)
ResponsePublisher::~ResponsePublisher()
{
// Write to the DB only if:
// 1) A write operation is being performed and state attributes are specified.
// 2) A successful delete operation.
if ((intent_attrs.size() && state_attrs.size()) || (status.ok() && !intent_attrs.size()))
if (m_update_thread != nullptr)
{
writeToDB(table, key, state_attrs, intent_attrs.size() ? SET_COMMAND : DEL_COMMAND, replace);
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.emplace(/*table=*/"", /*key=*/"", /*values =*/std::vector<swss::FieldValueTuple>{}, /*op=*/"",
/*replace=*/false, /*flush=*/false, /*shutdown=*/true);
}
m_signal.notify_one();
m_update_thread->join();
}
}

void ResponsePublisher::publish(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &intent_attrs, const ReturnCode &status,
const std::vector<swss::FieldValueTuple> &state_attrs, bool replace)
{
std::string response_channel = "APPL_DB_" + table + "_RESPONSE_CHANNEL";
swss::NotificationProducer notificationProducer{m_pipe.get(), response_channel, m_buffered};
swss::NotificationProducer notificationProducer{m_ntf_pipe.get(), response_channel, m_buffered};

auto intent_attrs_copy = intent_attrs;
// Add error message as the first field-value-pair.
Expand All @@ -92,6 +111,14 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
// Sends the response to the notification channel.
notificationProducer.send(status.codeStr(), key, intent_attrs_copy);
RecordResponse(response_channel, key, intent_attrs_copy, status.codeStr());

// Write to the DB only if:
// 1) A write operation is being performed and state attributes are specified.
// 2) A successful delete operation.
if ((intent_attrs.size() && state_attrs.size()) || (status.ok() && !intent_attrs.size()))
{
writeToDB(table, key, state_attrs, intent_attrs.size() ? SET_COMMAND : DEL_COMMAND, replace);
}
}

void ResponsePublisher::publish(const std::string &table, const std::string &key,
Expand All @@ -113,7 +140,26 @@ void ResponsePublisher::publish(const std::string &table, const std::string &key
void ResponsePublisher::writeToDB(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace)
{
swss::Table applStateTable{m_pipe.get(), table, m_buffered};
if (m_update_thread != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.emplace(table, key, values, op, replace, /*flush=*/false, /*shutdown=*/false);
}
m_signal.notify_one();
}
else
{
writeToDBInternal(table, key, values, op, replace);
}
RecordDBWrite(table, key, values, op);
}

void ResponsePublisher::writeToDBInternal(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op,
bool replace)
{
swss::Table applStateTable{m_db_pipe.get(), table, m_buffered};

auto attrs = values;
if (op == SET_COMMAND)
Expand All @@ -133,7 +179,6 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
if (!applStateTable.get(key, fv))
{
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
return;
}
for (auto it = attrs.cbegin(); it != attrs.cend();)
Expand All @@ -150,22 +195,63 @@ void ResponsePublisher::writeToDB(const std::string &table, const std::string &k
if (attrs.size())
{
applStateTable.set(key, attrs);
RecordDBWrite(table, key, attrs, op);
}
}
else if (op == DEL_COMMAND)
{
applStateTable.del(key);
RecordDBWrite(table, key, {}, op);
}
}

void ResponsePublisher::flush()
{
m_pipe->flush();
m_ntf_pipe->flush();
if (m_update_thread != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_lock);
m_queue.emplace(/*table=*/"", /*key=*/"", /*values =*/std::vector<swss::FieldValueTuple>{}, /*op=*/"",
/*replace=*/false, /*flush=*/true, /*shutdown=*/false);
}
m_signal.notify_one();
}
else
{
m_db_pipe->flush();
}
}

void ResponsePublisher::setBuffered(bool buffered)
{
m_buffered = buffered;
}

void ResponsePublisher::dbUpdateThread()
{
while (true)
{
entry e;
{
std::unique_lock<std::mutex> lock(m_lock);
while (m_queue.empty())
{
m_signal.wait(lock);
}

e = m_queue.front();
m_queue.pop();
}
if (e.shutdown)
{
break;
}
if (e.flush)
{
m_db_pipe->flush();
}
else
{
writeToDBInternal(e.table, e.key, e.values, e.op, e.replace);
}
}
}
41 changes: 38 additions & 3 deletions orchagent/response_publisher.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

Expand All @@ -17,9 +21,9 @@
class ResponsePublisher : public ResponsePublisherInterface
{
public:
explicit ResponsePublisher(bool buffered = false);
explicit ResponsePublisher(const std::string &dbName, bool buffered = false, bool db_write_thread = false);

virtual ~ResponsePublisher() = default;
virtual ~ResponsePublisher();

// Intent attributes are the attributes sent in the notification into the
// redis channel.
Expand Down Expand Up @@ -57,8 +61,39 @@ class ResponsePublisher : public ResponsePublisherInterface
void setBuffered(bool buffered);

private:
struct entry
Copy link
Contributor

@qiluo-msft qiluo-msft May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entry

Where can I find HLD? #Closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just internal implementation details. We don't have HLD for this.

The overall design is to put the DB update operation into a different thread for the response publisher.

In the detailed implementation here, we use a FIFO queue to store the DB update events. The main thread will queue up the event into the queue, and the "DB update thread" will read from the queue and process the DB update.
The "entry" struct here is the "DB update event".

Let me know if you need more details on this.

{
std::string table;
std::string key;
std::vector<swss::FieldValueTuple> values;
std::string op;
bool replace;
bool flush;
bool shutdown;

entry()
{
}

entry(const std::string &table, const std::string &key, const std::vector<swss::FieldValueTuple> &values,
const std::string &op, bool replace, bool flush, bool shutdown)
: table(table), key(key), values(values), op(op), replace(replace), flush(flush), shutdown(shutdown)
{
}
};

void dbUpdateThread();
void writeToDBInternal(const std::string &table, const std::string &key,
const std::vector<swss::FieldValueTuple> &values, const std::string &op, bool replace);

std::unique_ptr<swss::DBConnector> m_db;
std::unique_ptr<swss::RedisPipeline> m_pipe;
std::unique_ptr<swss::RedisPipeline> m_ntf_pipe;
std::unique_ptr<swss::RedisPipeline> m_db_pipe;

bool m_buffered{false};
// Thread to write to DB.
std::unique_ptr<std::thread> m_update_thread;
std::queue<entry> m_queue;
mutable std::mutex m_lock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutable

why mutable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a common practice for mutex object.

The key word "mutable" means that the variable can be changed in a const method.
If we have a method to read a variable that is protected by a mutex, it is nature to declare the method as const since it only does read operation. But the mutex object needs to be mutable for the read method to get the lock.

In this case, we might not really need it to be mutable. But we should follow the common practice.

Copy link
Contributor

@qiluo-msft qiluo-msft May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might not really need it to be mutable -> let's just remove it.

Thanks for the explanation! (not a blocking issue)

std::condition_variable m_signal;
};
36 changes: 19 additions & 17 deletions orchagent/return_code.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,24 @@ class ReturnCode
ReturnCode(const sai_status_t &status, const std::string &message = "")
: stream_(std::ios_base::out | std::ios_base::ate), is_sai_(true)
{
if (m_saiStatusCodeLookup.find(status) == m_saiStatusCodeLookup.end())
// Non-ranged SAI codes that are not included in this lookup map will map to
// SWSS_RC_UNKNOWN. This includes the general SAI failure:
// SAI_STATUS_FAILURE.
static const auto *const saiStatusCodeLookup = new std::unordered_map<sai_status_t, StatusCode>({
{SAI_STATUS_SUCCESS, StatusCode::SWSS_RC_SUCCESS},
{SAI_STATUS_NOT_SUPPORTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_NO_MEMORY, StatusCode::SWSS_RC_NO_MEMORY},
{SAI_STATUS_INSUFFICIENT_RESOURCES, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_INVALID_PARAMETER, StatusCode::SWSS_RC_INVALID_PARAM},
{SAI_STATUS_ITEM_ALREADY_EXISTS, StatusCode::SWSS_RC_EXISTS},
{SAI_STATUS_ITEM_NOT_FOUND, StatusCode::SWSS_RC_NOT_FOUND},
{SAI_STATUS_TABLE_FULL, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_NOT_IMPLEMENTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_OBJECT_IN_USE, StatusCode::SWSS_RC_IN_USE},
{SAI_STATUS_NOT_EXECUTED, StatusCode::SWSS_RC_NOT_EXECUTED},
});

if (saiStatusCodeLookup->find(status) == saiStatusCodeLookup->end())
{
// Check for ranged SAI codes.
if (SAI_RANGED_STATUS_IS_INVALID_ATTRIBUTE(status))
Expand Down Expand Up @@ -207,7 +224,7 @@ class ReturnCode
}
else
{
status_ = m_saiStatusCodeLookup[status];
status_ = saiStatusCodeLookup->at(status);
}
stream_ << message;
}
Expand Down Expand Up @@ -298,21 +315,6 @@ class ReturnCode
}

private:
// Non-ranged SAI codes that are not included in this lookup map will map to
// SWSS_RC_UNKNOWN. This includes the general SAI failure: SAI_STATUS_FAILURE.
std::unordered_map<sai_status_t, StatusCode> m_saiStatusCodeLookup = {
{SAI_STATUS_SUCCESS, StatusCode::SWSS_RC_SUCCESS},
{SAI_STATUS_NOT_SUPPORTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_NO_MEMORY, StatusCode::SWSS_RC_NO_MEMORY},
{SAI_STATUS_INSUFFICIENT_RESOURCES, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_INVALID_PARAMETER, StatusCode::SWSS_RC_INVALID_PARAM},
{SAI_STATUS_ITEM_ALREADY_EXISTS, StatusCode::SWSS_RC_EXISTS},
{SAI_STATUS_ITEM_NOT_FOUND, StatusCode::SWSS_RC_NOT_FOUND},
{SAI_STATUS_TABLE_FULL, StatusCode::SWSS_RC_FULL},
{SAI_STATUS_NOT_IMPLEMENTED, StatusCode::SWSS_RC_UNIMPLEMENTED},
{SAI_STATUS_OBJECT_IN_USE, StatusCode::SWSS_RC_IN_USE},
};

StatusCode status_;
std::stringstream stream_;
// Whether the ReturnCode is generated from a SAI status code or not.
Expand Down
5 changes: 4 additions & 1 deletion tests/mock_tests/fake_response_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
* when needed to test code that uses response publisher. */
std::unique_ptr<MockResponsePublisher> gMockResponsePublisher;

ResponsePublisher::ResponsePublisher(bool buffered) : m_db(std::make_unique<swss::DBConnector>("APPL_STATE_DB", 0)), m_buffered(buffered) {}
ResponsePublisher::ResponsePublisher(const std::string& dbName, bool buffered, bool db_write_thread) :
m_db(std::make_unique<swss::DBConnector>(dbName, 0)), m_buffered(buffered) {}

ResponsePublisher::~ResponsePublisher() {}

void ResponsePublisher::publish(
const std::string& table, const std::string& key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TEST(ResponsePublisher, TestPublish)
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};
ResponsePublisher publisher{"APPL_STATE_DB"};

publisher.publish("SOME_TABLE", "SOME_KEY", {{"field", "value"}}, ReturnCode(SAI_STATUS_SUCCESS));
ASSERT_TRUE(stateTable.hget("SOME_KEY", "field", value));
Expand All @@ -21,7 +21,7 @@ TEST(ResponsePublisher, TestPublishBuffered)
DBConnector conn{"APPL_STATE_DB", 0};
Table stateTable{&conn, "SOME_TABLE"};
std::string value;
ResponsePublisher publisher{};
ResponsePublisher publisher{"APPL_STATE_DB"};

publisher.setBuffered(true);

Expand Down
Loading
Loading