Skip to content

Commit

Permalink
Fix liveness timer not firing for subscriptions (#21693)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrjerryjohns authored and pull[bot] committed Jun 23, 2023
1 parent 779bd5b commit 1446168
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 32 deletions.
33 changes: 24 additions & 9 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,16 @@ void ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchange
Status status = Status::Success;
mExchange.Grab(apExchangeContext);

//
// Let's update the session we're tracking in our SessionHolder to that associated with the message that was just received.
// This CAN be different from the one we were tracking before, since the server is permitted to send exchanges on any valid
// session to us, of which there could be multiple.
//
// Since receipt of a message is proof of a working session on the peer, it's always best to update to that if possible
// to maximize our chances of success later.
//
mReadPrepareParams.mSessionHolder.Grab(mExchange->GetSessionHandle());

CHIP_ERROR err = ProcessReportData(std::move(aPayload));
if (err != CHIP_NO_ERROR)
{
Expand Down Expand Up @@ -576,14 +586,14 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)
{
MoveToState(ClientState::AwaitingSubscribeResponse);
}
else if (IsSubscriptionActive())
else if (IsSubscriptionActive() && err == CHIP_NO_ERROR)
{
//
// Only refresh the liveness check timer if we've successfully established
// a subscription and have a valid value for mMaxInterval which the function
// relies on.
//
RefreshLivenessCheckTimer();
err = RefreshLivenessCheckTimer();
}
}

Expand Down Expand Up @@ -753,7 +763,11 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea
void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout)
{
mLivenessTimeoutOverride = aLivenessTimeout;
RefreshLivenessCheckTimer();
auto err = RefreshLivenessCheckTimer();
if (err != CHIP_NO_ERROR)
{
Close(err);
}
}

CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
Expand Down Expand Up @@ -784,11 +798,6 @@ CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer(
timeout, OnLivenessTimeoutCallback, this);

if (err != CHIP_NO_ERROR)
{
Close(err);
}

return err;
}

Expand Down Expand Up @@ -854,7 +863,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP

mNumRetries = 0;

RefreshLivenessCheckTimer();
ReturnErrorOnFailure(RefreshLivenessCheckTimer());

return CHIP_NO_ERROR;
}
Expand All @@ -874,13 +883,19 @@ CHIP_ERROR ReadClient::SendSubscribeRequest(const ReadPrepareParams & aReadPrepa
{
VerifyOrReturnError(aReadPrepareParams.mMinIntervalFloorSeconds <= aReadPrepareParams.mMaxIntervalCeilingSeconds,
CHIP_ERROR_INVALID_ARGUMENT);

return SendSubscribeRequestImpl(aReadPrepareParams);
}

CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadPrepareParams)
{
VerifyOrReturnError(ClientState::Idle == mState, CHIP_ERROR_INCORRECT_STATE);

if (&aReadPrepareParams != &mReadPrepareParams)
{
mReadPrepareParams.mSessionHolder = aReadPrepareParams.mSessionHolder;
}

mMinIntervalFloorSeconds = aReadPrepareParams.mMinIntervalFloorSeconds;

// Todo: Remove the below, Update span in ReadPrepareParams
Expand Down
6 changes: 6 additions & 0 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,12 @@ class ReadClient : public Messaging::ExchangeDelegate

ReadClient * mpNext = nullptr;
InteractionModelEngine * mpImEngine = nullptr;

//
// This stores the params associated with the interaction in a specific set of cases:
// 1. Stores all parameters when used with subscriptions initiated using SendAutoResubscribeRequest.
// 2. Stores just the SessionHolder when used with any subscriptions.
//
ReadPrepareParams mReadPrepareParams;
uint32_t mNumRetries = 0;

Expand Down
8 changes: 4 additions & 4 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1752,7 +1752,7 @@ void TestReadInteraction::TestSubscribeWildcard(nlTestSuite * apSuite, void * ap
readPrepareParams.mAttributePathParamsListSize = 2;

readPrepareParams.mMinIntervalFloorSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);

{
Expand Down Expand Up @@ -1857,7 +1857,7 @@ void TestReadInteraction::TestSubscribePartialOverlap(nlTestSuite * apSuite, voi
readPrepareParams.mAttributePathParamsListSize = 1;

readPrepareParams.mMinIntervalFloorSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);

{
Expand Down Expand Up @@ -1933,7 +1933,7 @@ void TestReadInteraction::TestSubscribeSetDirtyFullyOverlap(nlTestSuite * apSuit
readPrepareParams.mAttributePathParamsListSize = 1;

readPrepareParams.mMinIntervalFloorSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);

{
Expand Down Expand Up @@ -2059,7 +2059,7 @@ void TestReadInteraction::TestSubscribeInvalidAttributePathRoundtrip(nlTestSuite

readPrepareParams.mSessionHolder.Grab(ctx.GetSessionBobToAlice());
readPrepareParams.mMinIntervalFloorSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 1;
printf("\nSend subscribe request message to Node: %" PRIu64 "\n", chip::kTestDeviceNodeId);

{
Expand Down
123 changes: 104 additions & 19 deletions src/controller/tests/data_model/TestRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class TestReadInteraction : public app::ReadHandler::ApplicationCallback
static void TestReadAttributeError(nlTestSuite * apSuite, void * apContext);
static void TestReadAttributeTimeout(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext);
static void TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext);
static void TestReadEventResponse(nlTestSuite * apSuite, void * apContext);
static void TestReadFabricScopedWithoutFabricFilter(nlTestSuite * apSuite, void * apContext);
static void TestReadFabricScopedWithFabricFilter(nlTestSuite * apSuite, void * apContext);
Expand Down Expand Up @@ -1488,20 +1489,12 @@ class TestResubscriptionCallback : public app::ReadClient::Callback
mLastError = aError;
}

void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override
{
mOnSubscriptionEstablishedCount++;

//
// Set the liveness timeout to a super small number that isn't 0 to
// force the liveness timeout to fire.
//
mpReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(10));
}
void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablishedCount++; }

CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override
{
mOnResubscriptionsAttempted++;
mLastError = aTerminationCause;
return apReadClient->ScheduleResubscription(apReadClient->ComputeTimeTillNextSubscription(), NullOptional, false);
}

Expand Down Expand Up @@ -1532,11 +1525,13 @@ class TestResubscriptionCallback : public app::ReadClient::Callback
// TODO: This does not validate the CASE establishment pathways since we're limited by the PASE-centric TestContext.
//
//
void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
void TestReadInteraction::TestResubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
auto sessionHandle = ctx.GetSessionBobToAlice();

ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive);

{
TestResubscriptionCallback callback;
app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback,
Expand All @@ -1556,27 +1551,115 @@ void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, v

readPrepareParams.mMaxIntervalCeilingSeconds = 1;

readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
auto err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

//
// Drive servicing IO till we have established a subscription at least 2 times.
// Drive servicing IO till we have established a subscription.
//
ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2),
[&]() { return callback.mOnSubscriptionEstablishedCount > 1; });
ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
[&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);
NL_TEST_ASSERT(apSuite, callback.mOnError == 0);
NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0);

NL_TEST_ASSERT(apSuite, callback.mOnDone == 0);
//
// Disable packet transmission, and drive IO till we have reported a re-subscription attempt.
//
// 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout.
//
ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount;
ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500),
[&]() { return callback.mOnResubscriptionsAttempted > 0; });

NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1);
NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT);

ctx.GetLoopback().mNumMessagesToDrop = 0;
callback.ClearCounters();

//
// With re-sub enabled, we shouldn't encounter any errors.
// Drive servicing IO till we have established a subscription.
//
ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
[&]() { return callback.mOnSubscriptionEstablishedCount == 1; });
NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);

//
// With re-sub enabled, we shouldn't have encountered any errors
//
NL_TEST_ASSERT(apSuite, callback.mOnError == 0);
NL_TEST_ASSERT(apSuite, callback.mOnDone == 0);
}

ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);

app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

//
// This validates a vanilla subscription with re-susbcription disabled timing out correctly on the client
// side and triggering the OnError callback with the right error code.
//
void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
auto sessionHandle = ctx.GetSessionBobToAlice();

ctx.SetMRPMode(Test::MessagingContext::MRPMode::kResponsive);

{
TestResubscriptionCallback callback;
app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback,
app::ReadClient::InteractionType::Subscribe);

callback.SetReadClient(&readClient);

app::ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());

app::AttributePathParams attributePathParams[1];
readPrepareParams.mpAttributePathParamsList = attributePathParams;
readPrepareParams.mAttributePathParamsListSize = ArraySize(attributePathParams);
attributePathParams[0].mClusterId = app::Clusters::TestCluster::Id;
attributePathParams[0].mAttributeId = app::Clusters::TestCluster::Attributes::Boolean::Id;

//
// We should have attempted just one re-subscription.
// Request a max interval that's very small to reduce time to discovering a liveness failure.
//
NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1);
readPrepareParams.mMaxIntervalCeilingSeconds = 1;

auto err = readClient.SendRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

//
// Drive servicing IO till we have established a subscription.
//
ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(2000),
[&]() { return callback.mOnSubscriptionEstablishedCount >= 1; });
NL_TEST_ASSERT(apSuite, callback.mOnSubscriptionEstablishedCount == 1);

//
// Request we drop all further messages.
//
ctx.GetLoopback().mNumMessagesToDrop = Test::LoopbackTransport::kUnlimitedMessageCount;

//
// Drive IO until we get an error on the subscription, which should be caused
// by the liveness timer firing within ~1s of the establishment of the subscription.
//
// 1.5s should cover the liveness timeout in the client of 1s max interval + 50ms ACK timeout.
//
ctx.GetIOContext().DriveIOUntil(System::Clock::Milliseconds32(1500), [&]() { return callback.mOnError >= 1; });

NL_TEST_ASSERT(apSuite, callback.mOnError == 1);
NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_ERROR_TIMEOUT);
NL_TEST_ASSERT(apSuite, callback.mOnDone == 1);
NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 0);
}

ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);

app::InteractionModelEngine::GetInstance()->ShutdownActiveReads();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}
Expand Down Expand Up @@ -1645,6 +1728,7 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptions(nlTestSuite * ap
NL_TEST_ASSERT(apSuite, gTestReadInteraction.mNumActiveSubscriptions == 0);
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);

ctx.SetMRPMode(Test::MessagingContext::MRPMode::kDefault);
app::InteractionModelEngine::GetInstance()->UnregisterReadHandlerAppCallback();
}

Expand Down Expand Up @@ -4334,6 +4418,7 @@ const nlTest sTests[] =
NL_TEST_DEF("TestReadAttribute_ManyDataValues", TestReadInteraction::TestReadAttribute_ManyDataValues),
NL_TEST_DEF("TestReadAttribute_ManyDataValuesWrongPath", TestReadInteraction::TestReadAttribute_ManyDataValuesWrongPath),
NL_TEST_DEF("TestReadAttribute_ManyErrors", TestReadInteraction::TestReadAttribute_ManyErrors),
NL_TEST_DEF("TestResubscribeAttributeTimeout", TestReadInteraction::TestResubscribeAttributeTimeout),
NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout),
NL_TEST_SENTINEL()
};
Expand Down
23 changes: 23 additions & 0 deletions src/messaging/tests/MessagingContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

#include "MessagingContext.h"
#include "system/SystemClock.h"

#include <credentials/tests/CHIPCert_unit_test_vectors.h>
#include <lib/support/CodeUtils.h>
Expand Down Expand Up @@ -99,6 +100,28 @@ void MessagingContext::ShutdownAndRestoreExisting(MessagingContext & existing)
existing.mTransport->SetSessionManager(&existing.GetSecureSessionManager());
}

void MessagingContext::SetMRPMode(MRPMode mode)
{
if (mode == MRPMode::kDefault)
{
mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig(GetDefaultMRPConfig());
}
else
{
mSessionBobToAlice->AsSecureSession()->SetRemoteMRPConfig(
ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
mSessionAliceToBob->AsSecureSession()->SetRemoteMRPConfig(
ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
mSessionCharlieToDavid->AsSecureSession()->SetRemoteMRPConfig(
ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
mSessionDavidToCharlie->AsSecureSession()->SetRemoteMRPConfig(
ReliableMessageProtocolConfig(System::Clock::Milliseconds32(10), System::Clock::Milliseconds32(10)));
}
}

CHIP_ERROR MessagingContext::CreateSessionBobToAlice()
{
return mSessionManager.InjectPaseSessionWithTestKey(mSessionBobToAlice, kBobKeyId, GetAliceFabric()->GetNodeId(), kAliceKeyId,
Expand Down
13 changes: 13 additions & 0 deletions src/messaging/tests/MessagingContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ class PlatformMemoryUser
class MessagingContext : public PlatformMemoryUser
{
public:
enum MRPMode
{
kDefault = 1, // This adopts the default MRP values for idle/active as per the spec.
// i.e IDLE = 4s, ACTIVE = 300ms

kResponsive = 2, // This adopts values that are better suited for loopback tests that
// don't actually go over a network interface, and are tuned much lower
// to permit more responsive tests.
// i.e IDLE = 10ms, ACTIVE = 10ms
};

MessagingContext() :
mInitialized(false), mAliceAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT + 1)),
mBobAddress(Transport::PeerAddress::UDP(GetAddress(), CHIP_PORT))
Expand Down Expand Up @@ -129,6 +140,8 @@ class MessagingContext : public PlatformMemoryUser
void ExpireSessionAliceToBob();
void ExpireSessionBobToFriends();

void SetMRPMode(MRPMode mode);

SessionHandle GetSessionBobToAlice();
SessionHandle GetSessionAliceToBob();
SessionHandle GetSessionCharlieToDavid();
Expand Down
Loading

0 comments on commit 1446168

Please sign in to comment.