From 567ecb3f8274d63a7c7689826b381a366cb87433 Mon Sep 17 00:00:00 2001 From: Zang MingJie Date: Tue, 16 Nov 2021 22:04:32 +0800 Subject: [PATCH] Migrate TCP::mPendingPackets to Pool interface --- src/channel/Channel.cpp | 2 +- src/channel/ChannelContext.cpp | 2 +- src/lib/support/Pool.cpp | 13 ++-- src/lib/support/Pool.h | 3 - src/lib/support/PoolWrapper.h | 112 +++++++++++++++++++++++++++++++++ src/messaging/ExchangeMgr.cpp | 2 +- src/transport/raw/TCP.cpp | 95 ++++++++++------------------ src/transport/raw/TCP.h | 31 ++++----- 8 files changed, 168 insertions(+), 92 deletions(-) create mode 100644 src/lib/support/PoolWrapper.h diff --git a/src/channel/Channel.cpp b/src/channel/Channel.cpp index 151b10ab78ed74..5995d686756230 100644 --- a/src/channel/Channel.cpp +++ b/src/channel/Channel.cpp @@ -32,7 +32,7 @@ ChannelState ChannelHandle::GetState() const ExchangeContext * ChannelHandle::NewExchange(ExchangeDelegate * delegate) { - assert(mAssociation != nullptr); + VerifyOrDie(mAssociation != nullptr); return mAssociation->mChannelContext->NewExchange(delegate); } diff --git a/src/channel/ChannelContext.cpp b/src/channel/ChannelContext.cpp index 3e34a095cfc9a0..8455dfef95841f 100644 --- a/src/channel/ChannelContext.cpp +++ b/src/channel/ChannelContext.cpp @@ -37,7 +37,7 @@ void ChannelContext::Start(const ChannelBuilder & builder) ExchangeContext * ChannelContext::NewExchange(ExchangeDelegate * delegate) { - assert(GetState() == ChannelState::kReady); + VerifyOrDie(GetState() == ChannelState::kReady); return mExchangeManager->NewContext(GetReadyVars().mSession, delegate); } diff --git a/src/lib/support/Pool.cpp b/src/lib/support/Pool.cpp index ff8789e3c0a956..ddb574a3bec0e1 100644 --- a/src/lib/support/Pool.cpp +++ b/src/lib/support/Pool.cpp @@ -17,10 +17,9 @@ * limitations under the License. */ +#include #include -#include - namespace chip { namespace internal { @@ -68,20 +67,20 @@ void StaticAllocatorBitmap::Deallocate(void * element) size_t offset = index - (word * kBitChunkSize); // ensure the element is in the pool - assert(index < Capacity()); + VerifyOrDie(index < Capacity()); auto value = mUsage[word].fetch_and(~(kBit1 << offset)); - nlASSERT((value & (kBit1 << offset)) != 0); // assert fail when free an unused slot + VerifyOrDie((value & (kBit1 << offset)) != 0); // assert fail when free an unused slot DecreaseUsage(); } size_t StaticAllocatorBitmap::IndexOf(void * element) { std::ptrdiff_t diff = static_cast(element) - static_cast(mElements); - assert(diff >= 0); - assert(static_cast(diff) % mElementSize == 0); + VerifyOrDie(diff >= 0); + VerifyOrDie(static_cast(diff) % mElementSize == 0); auto index = static_cast(diff) / mElementSize; - assert(index < Capacity()); + VerifyOrDie(index < Capacity()); return index; } diff --git a/src/lib/support/Pool.h b/src/lib/support/Pool.h index 47df3efd9ef729..e99e58725c42b2 100644 --- a/src/lib/support/Pool.h +++ b/src/lib/support/Pool.h @@ -22,9 +22,6 @@ #pragma once -#include - -#include #include #include #include diff --git a/src/lib/support/PoolWrapper.h b/src/lib/support/PoolWrapper.h new file mode 100644 index 00000000000000..9bd3699ae769fc --- /dev/null +++ b/src/lib/support/PoolWrapper.h @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2021 Project CHIP Authors + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace chip { + +/// Provides an interface over a pool implementation which doesn't expose the size and the actual type of the pool. +template +class PoolInterface +{ +public: + // For convenient use in PoolImpl + using Interface = std::tuple; + + virtual ~PoolInterface() {} + + virtual U * CreateObject(ConstructorArguments &&... args) = 0; + virtual void ReleaseObject(U * element) = 0; + virtual void ResetObject(U * element, ConstructorArguments &&... args) = 0; + + template + bool ForEachActiveObject(Function && function) + { + auto proxy = [&](U * target) -> bool { return function(target); }; + return ForEachActiveObjectInner( + &proxy, [](void * context, U * target) -> bool { return (*static_cast(context))(target); }); + } + +protected: + using Lambda = bool (*)(void *, U *); + virtual bool ForEachActiveObjectInner(void * context, Lambda lambda) = 0; +}; + +template +class PoolProxy; + +template +class PoolProxy> : public PoolInterface +{ +public: + static_assert(std::is_base_of::value, "Interface type is not derived from Pool type"); + + PoolProxy() {} + virtual ~PoolProxy() override {} + + virtual U * CreateObject(ConstructorArguments &&... args) override + { + return Impl().CreateObject(std::forward(args)...); + } + + virtual void ReleaseObject(U * element) override { Impl().ReleaseObject(static_cast(element)); } + + virtual void ResetObject(U * element, ConstructorArguments &&... args) override + { + return Impl().ResetObject(static_cast(element), std::forward(args)...); + } + +protected: + virtual bool ForEachActiveObjectInner(void * context, + typename PoolInterface::Lambda lambda) override + { + return Impl().ForEachActiveObject([&](T * target) { return lambda(context, static_cast(target)); }); + } + + virtual BitMapObjectPool & Impl() = 0; +}; + +/* + * @brief + * Define a implementation of a pool which derive and expose PoolInterface's. + * + * @tparam T a subclass of element to be allocated. + * @tparam N a positive integer max number of elements the pool provides. + * @tparam Interfaces a list of parameters which defines PoolInterface's. each interface is defined by a + * std::tuple. The PoolImpl is derived from every + * PoolInterface, the PoolImpl can be converted to the interface type + * and passed around + */ +template +class PoolImpl : public PoolProxy... +{ +public: + PoolImpl() {} + virtual ~PoolImpl() override {} + +protected: + virtual BitMapObjectPool & Impl() override { return mImpl; } + +private: + BitMapObjectPool mImpl; +}; + +} // namespace chip diff --git a/src/messaging/ExchangeMgr.cpp b/src/messaging/ExchangeMgr.cpp index e0b53dd6e17d30..0826c118b71b5c 100644 --- a/src/messaging/ExchangeMgr.cpp +++ b/src/messaging/ExchangeMgr.cpp @@ -100,7 +100,7 @@ CHIP_ERROR ExchangeManager::Shutdown() mContextPool.ForEachActiveObject([](auto * ec) { // There should be no active object in the pool - assert(false); + VerifyOrDie(false); return true; }); diff --git a/src/transport/raw/TCP.cpp b/src/transport/raw/TCP.cpp index 07d6290c7a0fe5..1d76e430a92258 100644 --- a/src/transport/raw/TCP.cpp +++ b/src/transport/raw/TCP.cpp @@ -59,11 +59,6 @@ TCPBase::~TCPBase() } CloseActiveConnections(); - - for (size_t i = 0; i < mPendingPacketsSize; i++) - { - mPendingPackets[i].packetBuffer = nullptr; - } } void TCPBase::CloseActiveConnections() @@ -207,53 +202,39 @@ CHIP_ERROR TCPBase::SendMessage(const Transport::PeerAddress & address, System:: CHIP_ERROR TCPBase::SendAfterConnect(const PeerAddress & addr, System::PacketBufferHandle && msg) { // This will initiate a connection to the specified peer - CHIP_ERROR err = CHIP_NO_ERROR; - PendingPacket * packet = nullptr; - bool alreadyConnecting = false; - Inet::TCPEndPoint * endPoint = nullptr; + bool alreadyConnecting = false; // Iterate through the ENTIRE array. If a pending packet for // the address already exists, this means a connection is pending and // does NOT need to be re-established. - for (size_t i = 0; i < mPendingPacketsSize; i++) - { - if (mPendingPackets[i].packetBuffer.IsNull()) - { - if (packet == nullptr) - { - // found a slot to store the packet into - packet = mPendingPackets + i; - } - } - else if (mPendingPackets[i].peerAddress == addr) + mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) { + if (pending->mPeerAddress == addr) { // same destination exists. alreadyConnecting = true; - - // ensure packets are ORDERED - if (packet != nullptr) - { - packet->peerAddress = addr; - packet->packetBuffer = std::move(mPendingPackets[i].packetBuffer); - packet = mPendingPackets + i; - } + pending->mPacketBuffer->AddToEnd(std::move(msg)); + return false; } - } - - VerifyOrExit(packet != nullptr, err = CHIP_ERROR_NO_MEMORY); + return true; + }); // If already connecting, buffer was just enqueued for more sending - VerifyOrExit(!alreadyConnecting, err = CHIP_NO_ERROR); + if (alreadyConnecting) + { + return CHIP_NO_ERROR; + } // Ensures sufficient active connections size exist - VerifyOrExit(mUsedEndPointCount < mActiveConnectionsSize, err = CHIP_ERROR_NO_MEMORY); + VerifyOrReturnError(mUsedEndPointCount < mActiveConnectionsSize, CHIP_ERROR_NO_MEMORY); + Inet::TCPEndPoint * endPoint = nullptr; #if INET_CONFIG_ENABLE_TCP_ENDPOINT err = mListenSocket->GetEndPointManager().NewEndPoint(&endPoint); + auto EndPointDeletor = [](Inet::TCPEndPoint * e) { e->Free(); }; + std::unique_ptr endPointHolder(endPoint, EndPointDeletor); #else - err = CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE; + return CHIP_ERROR_UNSUPPORTED_CHIP_FEATURE; #endif - SuccessOrExit(err); endPoint->mAppState = reinterpret_cast(this); endPoint->OnDataReceived = OnTcpReceive; @@ -263,23 +244,17 @@ CHIP_ERROR TCPBase::SendAfterConnect(const PeerAddress & addr, System::PacketBuf endPoint->OnAcceptError = OnAcceptError; endPoint->OnPeerClose = OnPeerClosed; - err = endPoint->Connect(addr.GetIPAddress(), addr.GetPort(), addr.GetInterface()); - SuccessOrExit(err); + ReturnErrorOnFailure(endPoint->Connect(addr.GetIPAddress(), addr.GetPort(), addr.GetInterface())); // enqueue the packet once the connection succeeds - packet->peerAddress = addr; - packet->packetBuffer = std::move(msg); + VerifyOrReturnError(mPendingPackets.CreateObject(addr, std::move(msg)) != nullptr, CHIP_ERROR_NO_MEMORY); mUsedEndPointCount++; -exit: - if (err != CHIP_NO_ERROR) - { - if (endPoint != nullptr) - { - endPoint->Free(); - } - } - return err; +#if INET_CONFIG_ENABLE_TCP_ENDPOINT + endPointHolder.release(); +#endif + + return CHIP_NO_ERROR; } CHIP_ERROR TCPBase::ProcessReceivedBuffer(Inet::TCPEndPoint * endPoint, const PeerAddress & peerAddress, @@ -391,22 +366,20 @@ void TCPBase::OnConnectionComplete(Inet::TCPEndPoint * endPoint, CHIP_ERROR inet PeerAddress addr = PeerAddress::TCP(ipAddress, port, interfaceId); // Send any pending packets - for (size_t i = 0; i < tcp->mPendingPacketsSize; i++) - { - if ((tcp->mPendingPackets[i].peerAddress != addr) || (tcp->mPendingPackets[i].packetBuffer.IsNull())) + tcp->mPendingPackets.ForEachActiveObject([&](PendingPacket * pending) { + if (pending->mPeerAddress == addr) { - continue; - } - foundPendingPacket = true; + foundPendingPacket = true; + System::PacketBufferHandle buffer = std::move(pending->mPacketBuffer); + tcp->mPendingPackets.ReleaseObject(pending); - System::PacketBufferHandle buffer = std::move(tcp->mPendingPackets[i].packetBuffer); - tcp->mPendingPackets[i].peerAddress = PeerAddress::Uninitialized(); - - if ((inetErr == CHIP_NO_ERROR) && (err == CHIP_NO_ERROR)) - { - err = endPoint->Send(std::move(buffer)); + if ((inetErr == CHIP_NO_ERROR) && (err == CHIP_NO_ERROR)) + { + err = endPoint->Send(std::move(buffer)); + } } - } + return true; + }); if (err == CHIP_NO_ERROR) { diff --git a/src/transport/raw/TCP.h b/src/transport/raw/TCP.h index 0a0cb2bfbf4418..16cc64c5b6cc88 100644 --- a/src/transport/raw/TCP.h +++ b/src/transport/raw/TCP.h @@ -33,6 +33,7 @@ #include #include #include +#include #include namespace chip { @@ -84,8 +85,12 @@ class TcpListenParameters */ struct PendingPacket { - PeerAddress peerAddress; // where the packet is being sent to - System::PacketBufferHandle packetBuffer; // what data needs to be sent + PendingPacket(const PeerAddress & peerAddress, System::PacketBufferHandle && packetBuffer) : + mPeerAddress(peerAddress), mPacketBuffer(std::move(packetBuffer)) + {} + + PeerAddress mPeerAddress; // where the packet is being sent to + System::PacketBufferHandle mPacketBuffer; // what data needs to be sent }; /** Implements a transport using TCP. */ @@ -128,20 +133,11 @@ class DLL_EXPORT TCPBase : public Base }; public: - TCPBase(ActiveConnectionState * activeConnectionsBuffer, size_t bufferSize, PendingPacket * packetBuffers, - size_t packetsBuffersSize) : - mActiveConnections(activeConnectionsBuffer), - mActiveConnectionsSize(bufferSize), mPendingPackets(packetBuffers), mPendingPacketsSize(packetsBuffersSize) + using PendingPacketPoolType = PoolInterface; + TCPBase(ActiveConnectionState * activeConnectionsBuffer, size_t bufferSize, PendingPacketPoolType & packetBuffers) : + mActiveConnections(activeConnectionsBuffer), mActiveConnectionsSize(bufferSize), mPendingPackets(packetBuffers) { // activeConnectionsBuffer must be initialized by the caller. - for (size_t i = 0; i < mPendingPacketsSize; ++i) - { - mPendingPackets[i].peerAddress = PeerAddress::Uninitialized(); - // In the typical case, the TCPBase constructor is invoked from the TCP constructor on its mPendingPackets, - // which has not yet been initialized. That means we can't do a normal move assignment or construction of - // the PacketBufferHandle, since that would call PacketBuffer::Free on the uninitialized data. - new (&mPendingPackets[i].packetBuffer) System::PacketBufferHandle(); - } } ~TCPBase() override; @@ -266,15 +262,14 @@ class DLL_EXPORT TCPBase : public Base const size_t mActiveConnectionsSize; // Data to be sent when connections succeed - PendingPacket * mPendingPackets; - const size_t mPendingPacketsSize; + PendingPacketPoolType & mPendingPackets; }; template class TCP : public TCPBase { public: - TCP() : TCPBase(mConnectionsBuffer, kActiveConnectionsSize, mPendingPackets, kPendingPacketSize) + TCP() : TCPBase(mConnectionsBuffer, kActiveConnectionsSize, mPendingPackets) { for (size_t i = 0; i < kActiveConnectionsSize; ++i) { @@ -285,7 +280,7 @@ class TCP : public TCPBase private: friend class TCPTest; TCPBase::ActiveConnectionState mConnectionsBuffer[kActiveConnectionsSize]; - PendingPacket mPendingPackets[kPendingPacketSize]; + PoolImpl mPendingPackets; }; } // namespace Transport