Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mirror] Observe lag member status change #1846

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
129 changes: 97 additions & 32 deletions orchagent/mirrororch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ MirrorOrch::MirrorOrch(TableConnector stateDbConnector, TableConnector confDbCon
m_neighOrch(neighOrch),
m_fdbOrch(fdbOrch),
m_policerOrch(policerOrch),
m_mirrorTable(stateDbConnector.first, stateDbConnector.second)
m_mirrorTable(stateDbConnector.first, stateDbConnector.second),
m_applDb(make_shared<DBConnector>("APPL_DB", 0)),
m_applLagMemberTable(make_shared<Table>(m_applDb.get(), APP_LAG_MEMBER_TABLE_NAME))
{
m_portsOrch->attach(this);
m_neighOrch->attach(this);
Expand Down Expand Up @@ -164,6 +166,12 @@ void MirrorOrch::update(SubjectType type, void *cntx)
updateLagMember(*update);
break;
}
case SUBJECT_TYPE_LAG_MEMBER_STATUS_CHANGE:
{
LagMemberStatusUpdate *update = static_cast<LagMemberStatusUpdate *>(cntx);
updateLagMemberStatus(*update);
break;
}
case SUBJECT_TYPE_VLAN_MEMBER_CHANGE:
{
VlanMemberUpdate *update = static_cast<VlanMemberUpdate *>(cntx);
Expand Down Expand Up @@ -615,12 +623,14 @@ bool MirrorOrch::getNeighborInfo(const string& name, MirrorEntry& session)
}
else
{
// Get the first member of the LAG
Port member;
string first_member_alias = *session.neighborInfo.port.m_members.begin();
m_portsOrch->getPort(first_member_alias, member);
Port lmp;
if (!selectEnabledLagMember(session.neighborInfo.port, lmp))
{
session.neighborInfo.portId = SAI_NULL_OBJECT_ID;
return false;
}

session.neighborInfo.portId = member.m_port_id;
session.neighborInfo.portId = lmp.m_port_id;
}

return true;
Expand Down Expand Up @@ -1270,6 +1280,7 @@ void MirrorOrch::updateNeighbor(const NeighborUpdate& update)
auto& session = it->second;

// Check if the session's destination IP matches the neighbor's update IP
// (destination IP in directly connected subnet),
// or if the session's next hop IP matches the neighbor's update IP
if (session.dstIp != update.entry.ip_address &&
session.nexthopInfo.nexthop.ip_address != update.entry.ip_address)
Expand Down Expand Up @@ -1339,6 +1350,36 @@ void MirrorOrch::updateFdb(const FdbUpdate& update)
}
}

bool MirrorOrch::selectEnabledLagMember(const Port &lag, Port &port)
{
// Select a LAG member of enabled status using first-fit strategy
for (const auto &member : lag.m_members)
{
// Get member oper status
string status;
string key = lag.m_alias + m_applLagMemberTable->getTableNameSeparator() + member;
if (!m_applLagMemberTable->hget(key, "status", status))
{
continue;
}

if (status == "enabled")
{
Port p;
if (m_portsOrch->getPort(member, p))
{
port = p;
return true;
}

SWSS_LOG_ERROR("Failed to get Port object for lag %s member %s",
lag.m_alias.c_str(),
member.c_str());
}
}
return false;
}

void MirrorOrch::updateLagMember(const LagMemberUpdate& update)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -1377,43 +1418,67 @@ void MirrorOrch::updateLagMember(const LagMemberUpdate& update)
continue;
}

if (update.add)
if (!update.add)
{
// Activate mirror session if it was deactivated due to the reason
// that previously there was no member in the LAG. If the mirror
// session is already activated, no further action is needed.
if (!session.status)
if (session.status && session.neighborInfo.portId == update.member.m_port_id)
{
assert(!update.lag.m_members.empty());
const string& member_name = *update.lag.m_members.begin();
Port member;
m_portsOrch->getPort(member_name, member);
Port lmp;
if (selectEnabledLagMember(update.lag, lmp))
{
session.neighborInfo.portId = lmp.m_port_id;
updateSessionDstPort(name, session);
}
else
{
session.neighborInfo.portId = SAI_NULL_OBJECT_ID;
deactivateSession(name, session);
}
}
}
}
}

session.neighborInfo.portId = member.m_port_id;
void MirrorOrch::updateLagMemberStatus(const LagMemberStatusUpdate& update)
{
SWSS_LOG_ENTER();

for (auto it = m_syncdMirrors.begin(); it != m_syncdMirrors.end(); it++)
{
const auto &name = it->first;
auto &session = it->second;

// Pre-check:
// Neighbor's local counterpart is LAG
// Local LAG counterpart matches the update LAG
if (session.neighborInfo.port.m_type != Port::LAG ||
session.neighborInfo.port != update.lag)
{
continue;
}

if (update.enabled)
{
if (!session.status)
{
session.neighborInfo.portId = update.member.m_port_id;
activateSession(name, session);
}
}
else
{
// If LAG is empty, deactivate session
if (update.lag.m_members.empty())
if (session.status && session.neighborInfo.portId == update.member.m_port_id)
{
if (session.status)
Port lmp;
if (selectEnabledLagMember(update.lag, lmp))
{
session.neighborInfo.portId = lmp.m_port_id;
updateSessionDstPort(name, session);
}
else
{
session.neighborInfo.portId = SAI_NULL_OBJECT_ID;
deactivateSession(name, session);
}
session.neighborInfo.portId = SAI_OBJECT_TYPE_NULL;
}
// Switch to a new member of the LAG
else
{
const string& member_name = *update.lag.m_members.begin();
Port member;
m_portsOrch->getPort(member_name, member);

session.neighborInfo.portId = member.m_port_id;
// The destination MAC remains the same
updateSessionDstPort(name, session);
}
}
}
Expand Down Expand Up @@ -1446,7 +1511,7 @@ void MirrorOrch::updateVlanMember(const VlanMemberUpdate& update)
}

// Deactivate session. Wait for FDB event to activate session
session.neighborInfo.portId = SAI_OBJECT_TYPE_NULL;
session.neighborInfo.portId = SAI_NULL_OBJECT_ID;
deactivateSession(name, session);
}
}
Expand Down
6 changes: 6 additions & 0 deletions orchagent/mirrororch.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class MirrorOrch : public Orch, public Observer, public Subject
// session_name -> VLAN | monitor_port_alias | next_hop_ip
map<string, string> m_recoverySessionMap;

shared_ptr<DBConnector> m_applDb = nullptr;
shared_ptr<Table> m_applLagMemberTable = nullptr;

task_process_status createEntry(const string&, const vector<FieldValueTuple>&);
task_process_status deleteEntry(const string&);

Expand All @@ -125,6 +128,7 @@ class MirrorOrch : public Orch, public Observer, public Subject
void updateNeighbor(const NeighborUpdate&);
void updateFdb(const FdbUpdate&);
void updateLagMember(const LagMemberUpdate&);
void updateLagMemberStatus(const LagMemberStatusUpdate&);
void updateVlanMember(const VlanMemberUpdate&);

bool checkPortExistsInSrcPortList(const string& port, const string& srcPortList);
Expand All @@ -134,6 +138,8 @@ class MirrorOrch : public Orch, public Observer, public Subject
sai_object_id_t sessionId);
bool configurePortMirrorSession(const string&, MirrorEntry&, bool enable);

bool selectEnabledLagMember(const Port &lag, Port &port);

void doTask(Consumer& consumer);
};

Expand Down
1 change: 1 addition & 0 deletions orchagent/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum SubjectType
SUBJECT_TYPE_NEIGH_CHANGE,
SUBJECT_TYPE_FDB_CHANGE,
SUBJECT_TYPE_LAG_MEMBER_CHANGE,
SUBJECT_TYPE_LAG_MEMBER_STATUS_CHANGE,
SUBJECT_TYPE_VLAN_MEMBER_CHANGE,
SUBJECT_TYPE_MIRROR_SESSION_CHANGE,
SUBJECT_TYPE_INT_SESSION_CHANGE,
Expand Down
6 changes: 6 additions & 0 deletions orchagent/portsorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3784,6 +3784,9 @@ void PortsOrch::doLagMemberTask(Consumer &consumer)
if (setCollectionOnLagMember(port, true) &&
setDistributionOnLagMember(port, true))
{
LagMemberStatusUpdate update = { lag, port, true };
notify(SUBJECT_TYPE_LAG_MEMBER_STATUS_CHANGE, static_cast<void *>(&update));

it = consumer.m_toSync.erase(it);
}
else
Expand All @@ -3801,6 +3804,9 @@ void PortsOrch::doLagMemberTask(Consumer &consumer)
if (setDistributionOnLagMember(port, false) &&
setCollectionOnLagMember(port, false))
{
LagMemberStatusUpdate update = { lag, port, false };
notify(SUBJECT_TYPE_LAG_MEMBER_STATUS_CHANGE, static_cast<void *>(&update));

it = consumer.m_toSync.erase(it);
}
else
Expand Down
7 changes: 7 additions & 0 deletions orchagent/portsorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ struct LagMemberUpdate
bool add;
};

struct LagMemberStatusUpdate
{
Port lag;
Port member;
bool enabled;
};

struct VlanMemberUpdate
{
Port vlan;
Expand Down
3 changes: 2 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,8 @@ def dvs_route(request, dvs) -> DVSRoute:
@pytest.yield_fixture(scope="class")
def dvs_lag_manager(request, dvs):
request.cls.dvs_lag = dvs_lag.DVSLag(dvs.get_asic_db(),
dvs.get_config_db())
dvs.get_config_db(),
dvs.get_app_db())


@pytest.yield_fixture(scope="class")
Expand Down
9 changes: 8 additions & 1 deletion tests/dvslib/dvs_lag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from swsscommon import swsscommon

class DVSLag(object):
def __init__(self, adb, cdb):
def __init__(self, adb, cdb, pdb):
self.asic_db = adb
self.config_db = cdb
self.appl_db = pdb

def create_port_channel(self, lag_id, admin_status="up", mtu="1500"):
lag = "PortChannel{}".format(lag_id)
Expand All @@ -27,3 +30,7 @@ def get_and_verify_port_channel_members(self, expected_num):
def get_and_verify_port_channel(self, expected_num):
return self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_LAG", expected_num)

def set_port_channel_member_status(self, lag_id, interface, status):
fvs = swsscommon.FieldValuePairs([("status", status)])
tbl = swsscommon.ProducerStateTable(self.appl_db.db_connection, "LAG_MEMBER_TABLE")
tbl.set("PortChannel{}:{}".format(lag_id, interface), fvs)
Loading