Skip to content

Commit

Permalink
[thread host] implement join for rcp host
Browse files Browse the repository at this point in the history
  • Loading branch information
Irving-cl committed Dec 5, 2024
1 parent 5102c52 commit 437dcd5
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 3 deletions.
91 changes: 88 additions & 3 deletions src/ncp/rcp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ void RcpHost::Deinit(void)
mThreadEnabledStateChangedCallbacks.clear();
mResetHandlers.clear();

mJoinReceiver = nullptr;
mSetThreadEnabledReceiver = nullptr;
mScheduleMigrationReceiver = nullptr;
mDetachGracefullyCallbacks.clear();
Expand All @@ -344,6 +345,18 @@ void RcpHost::HandleStateChanged(otChangedFlags aFlags)
}

mThreadHelper->StateChangedCallback(aFlags);

if ((aFlags & OT_CHANGED_THREAD_ROLE) && IsAttached() && mJoinReceiver != nullptr)
{
otOperationalDatasetTlvs datasetTlvs;

GetDatasetActiveTlvs(datasetTlvs);
if (datasetTlvs.mLength != 0)
{
otbrLogInfo("Join succeeded");
SafeInvokeAndClear(mJoinReceiver, OT_ERROR_NONE, "Join succeeded");
}
}
}

void RcpHost::Update(MainloopContext &aMainloop)
Expand Down Expand Up @@ -438,12 +451,83 @@ const char *RcpHost::GetThreadVersion(void)
return version;
}

static bool areDatasetsEqual(const otOperationalDatasetTlvs &aLhs, const otOperationalDatasetTlvs &aRhs)
{
bool result = false;

otOperationalDataset lhsDataset;
otOperationalDataset rhsDataset;
otOperationalDatasetTlvs lhsNormalizedTlvs;
otOperationalDatasetTlvs rhsNormalizedTlvs;

// Sort the TLVs in the TLV byte arrays by leveraging the deterministic nature of the two OT APIs
SuccessOrExit(otDatasetParseTlvs(&aLhs, &lhsDataset));
SuccessOrExit(otDatasetParseTlvs(&aRhs, &rhsDataset));
otDatasetConvertToTlvs(&lhsDataset, &lhsNormalizedTlvs);
otDatasetConvertToTlvs(&rhsDataset, &rhsNormalizedTlvs);

result = (lhsNormalizedTlvs.mLength == rhsNormalizedTlvs.mLength) &&
(memcmp(lhsNormalizedTlvs.mTlvs, rhsNormalizedTlvs.mTlvs, lhsNormalizedTlvs.mLength) == 0);

exit:
return result;
}

void RcpHost::Join(const otOperationalDatasetTlvs &aActiveOpDatasetTlvs, const AsyncResultReceiver &aReceiver)
{
OT_UNUSED_VARIABLE(aActiveOpDatasetTlvs);
otError error = OT_ERROR_NONE;
std::string errorMsg;
bool receiveResultHere = true;
otOperationalDatasetTlvs curDatasetTlvs;

VerifyOrExit(mInstance != nullptr, error = OT_ERROR_INVALID_STATE, errorMsg = "OT is not initialized");
VerifyOrExit(mThreadEnabledState != ThreadEnabledState::kStateDisabling, error = OT_ERROR_BUSY,
errorMsg = "Thread is disabling");
VerifyOrExit(mThreadEnabledState == ThreadEnabledState::kStateEnabled, error = OT_ERROR_INVALID_STATE,
errorMsg = "Thread is disabled");

// TODO: Implement Join under RCP mode.
mTaskRunner.Post([aReceiver](void) { aReceiver(OT_ERROR_NOT_IMPLEMENTED, "Not implemented!"); });
otbrLogInfo("Start joining...");

error = otDatasetGetActiveTlvs(mInstance, &curDatasetTlvs);
if (error == OT_ERROR_NONE && areDatasetsEqual(aActiveOpDatasetTlvs, curDatasetTlvs) && IsAttached())
{
// Do not leave and re-join if this device has already joined the same network. This can help elimilate
// unnecessary connectivity and topology disruption and save the time for re-joining. It's more useful for use
// cases where Thread networks are dynamically brought up and torn down (e.g. Thread on mobile phones).
errorMsg = "Already Joined the target network";
ExitNow();
}

if (GetDeviceRole() != OT_DEVICE_ROLE_DISABLED)
{
ThreadDetachGracefully([aActiveOpDatasetTlvs, aReceiver, this] {
ConditionalErasePersistentInfo(true);
Join(aActiveOpDatasetTlvs, aReceiver);
});
receiveResultHere = false;
ExitNow();
}

SuccessOrExit(error = otDatasetSetActiveTlvs(mInstance, &aActiveOpDatasetTlvs),
errorMsg = "Failed to set Active Operational Dataset");

// TODO(b/273160198): check how we can implement join as a child
SuccessOrExit(error = otIp6SetEnabled(mInstance, true), errorMsg = "Failed to bring up Thread interface");
SuccessOrExit(error = otThreadSetEnabled(mInstance, true), errorMsg = "Failed to bring up Thread stack");

// Abort an ongoing join()
if (mJoinReceiver != nullptr)
{
SafeInvoke(mJoinReceiver, OT_ERROR_ABORT, "Join() is aborted");
}
mJoinReceiver = aReceiver;
receiveResultHere = false;

exit:
if (receiveResultHere)
{
mTaskRunner.Post([aReceiver, error, errorMsg](void) { aReceiver(error, errorMsg); });
}
}

void RcpHost::Leave(bool aEraseDataset, const AsyncResultReceiver &aReceiver)
Expand Down Expand Up @@ -636,6 +720,7 @@ void RcpHost::ThreadDetachGracefullyCallback(void *aContext)

void RcpHost::ThreadDetachGracefullyCallback(void)
{
SafeInvokeAndClear(mJoinReceiver, OT_ERROR_ABORT, "Aborted by leave/disable operation");
SafeInvokeAndClear(mScheduleMigrationReceiver, OT_ERROR_ABORT, "Aborted by leave/disable operation");

for (auto &callback : mDetachGracefullyCallbacks)
Expand Down
1 change: 1 addition & 0 deletions src/ncp/rcp_host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ class RcpHost : public MainloopProcessor, public ThreadHost, public OtNetworkPro
std::vector<ThreadEnabledStateCallback> mThreadEnabledStateChangedCallbacks;
bool mEnableAutoAttach = false;
ThreadEnabledState mThreadEnabledState;
AsyncResultReceiver mJoinReceiver;
AsyncResultReceiver mSetThreadEnabledReceiver;
AsyncResultReceiver mScheduleMigrationReceiver;
std::vector<DetachGracefullyCallback> mDetachGracefullyCallbacks;
Expand Down
98 changes: 98 additions & 0 deletions tests/gtest/test_rcp_host_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,101 @@ TEST(RcpHostApi, StateChangesCorrectlyAfterScheduleMigration)

host.Deinit();
}

TEST(RcpHostApi, StateChangesCorrectlyAfterJoin)
{
otError error = OT_ERROR_NONE;
std::string errorMsg = "";
bool resultReceived = false;
otbr::MainloopContext mainloop;
otbr::Ncp::ThreadHost::AsyncResultReceiver receiver = [&resultReceived, &error,
&errorMsg](otError aError, const std::string &aErrorMsg) {
resultReceived = true;
error = aError;
errorMsg = aErrorMsg;
};
otbr::Ncp::RcpHost host("wpan0", std::vector<const char *>(), /* aBackboneInterfaceName */ "", /* aDryRun */ false,
/* aEnableAutoAttach */ false);

otOperationalDataset dataset;
(void)dataset;
otOperationalDatasetTlvs datasetTlvs;

// 1. Call Join when host hasn't been initialized.
otbr::MainloopManager::GetInstance().RemoveMainloopProcessor(
&host); // Temporarily remove RcpHost because it's not initialized yet.
host.Join(datasetTlvs, receiver);
MainloopProcessUntil(mainloop, /* aTimeoutSec */ 0, [&resultReceived]() { return resultReceived; });
EXPECT_EQ(error, OT_ERROR_INVALID_STATE);
EXPECT_STREQ(errorMsg.c_str(), "OT is not initialized");
otbr::MainloopManager::GetInstance().AddMainloopProcessor(&host);

host.Init();
OT_UNUSED_VARIABLE(otDatasetCreateNewNetwork(ot::FakePlatform::CurrentInstance(), &dataset));
otDatasetConvertToTlvs(&dataset, &datasetTlvs);

// 2. Call Join when Thread is not enabled.
error = OT_ERROR_NONE;
resultReceived = false;
host.Join(datasetTlvs, receiver);
MainloopProcessUntil(mainloop, /* aTimeoutSec */ 0, [&resultReceived]() { return resultReceived; });
EXPECT_EQ(error, OT_ERROR_INVALID_STATE);
EXPECT_STREQ(errorMsg.c_str(), "Thread is disabled");

// 3. Call two consecutive Join. The first one should be aborted. The second one should succeed.
error = OT_ERROR_NONE;
resultReceived = false;
host.SetThreadEnabled(true, receiver);
MainloopProcessUntil(mainloop, /* aTimeoutSec */ 0, [&resultReceived]() { return resultReceived; });
otError error_ = OT_ERROR_NONE;
std::string errorMsg_ = "";
bool resultReceived_ = false;
error = OT_ERROR_NONE;
resultReceived = false;
host.Join(datasetTlvs, [&resultReceived_, &error_, &errorMsg_](otError aError, const std::string &aErrorMsg) {
error_ = aError;
errorMsg_ = aErrorMsg;
resultReceived_ = true;
});
host.Join(datasetTlvs, receiver);

MainloopProcessUntil(mainloop, /* aTimeoutSec */ 0,
[&resultReceived, &resultReceived_]() { return resultReceived && resultReceived_; });
EXPECT_EQ(error_, OT_ERROR_ABORT);
EXPECT_STREQ(errorMsg_.c_str(), "Aborted by leave/disable operation"); // The second Join will trigger Leave first.
EXPECT_EQ(error, OT_ERROR_NONE);
EXPECT_STREQ(errorMsg.c_str(), "Join succeeded");
EXPECT_EQ(host.GetDeviceRole(), OT_DEVICE_ROLE_LEADER);

// 4. Call Join with the same dataset.
error = OT_ERROR_NONE;
resultReceived = false;
host.Join(datasetTlvs, receiver);
MainloopProcessUntil(mainloop, /* aTimeoutSec */ 0, [&resultReceived]() { return resultReceived; });
EXPECT_EQ(error, OT_ERROR_NONE);
EXPECT_STREQ(errorMsg.c_str(), "Already Joined the target network");

// 5. Call Disable right after Join.
error = OT_ERROR_NONE;
resultReceived = false;
error_ = OT_ERROR_NONE;
resultReceived_ = false;

OT_UNUSED_VARIABLE(otDatasetCreateNewNetwork(ot::FakePlatform::CurrentInstance(), &dataset));
otDatasetConvertToTlvs(&dataset, &datasetTlvs); // Use a different dataset.

host.Join(datasetTlvs, [&resultReceived_, &error_, &errorMsg_](otError aError, const std::string &aErrorMsg) {
error_ = aError;
errorMsg_ = aErrorMsg;
resultReceived_ = true;
});
host.SetThreadEnabled(false, receiver);

MainloopProcessUntil(mainloop, /* aTimeoutSec */ 0,
[&resultReceived, &resultReceived_]() { return resultReceived && resultReceived_; });
EXPECT_EQ(error_, OT_ERROR_BUSY);
EXPECT_STREQ(errorMsg_.c_str(), "Thread is disabling");
EXPECT_EQ(error, OT_ERROR_NONE);

host.Deinit();
}

0 comments on commit 437dcd5

Please sign in to comment.