Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
balancer temporary disabled. to be reworked together with lock-free s…
Browse files Browse the repository at this point in the history
…tats tables
Pavel Kraynyukhov committed May 23, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 61f2d07 commit 0771b47
Showing 2 changed files with 39 additions and 59 deletions.
84 changes: 31 additions & 53 deletions include/IOWorker.h
Original file line number Diff line number Diff line change
@@ -31,6 +31,8 @@
#include <abstract/Worker.h>
#include <sys/mutex.h>
#include <time.h>
#include <ConnectionsInQueue.h>
#include <ext/tsl/robin_map.h>


namespace LAppS
@@ -50,22 +52,22 @@ namespace LAppS
std::atomic<bool> mCanStop;

mutable itc::sys::mutex mConnectionsMutex;
itc::sys::mutex mInboundMutex;
itc::sys::mutex mDCQMutex;
//itc::sys::mutex mDCQMutex;

LAppS::Shakespeer<TLSEnable,StatsEnable> mShakespeer;
SharedEPollType mEPoll;

std::queue<itc::CSocketSPtr> mInboundConnections;
std::map<int,WSSPtr> mConnections;
std::queue<int32_t> mDCQueue;
tsl::robin_map<int,WSSPtr> mConnections;
//std::queue<int32_t> mDCQueue;
itc::cfifo<int32_t> mDCQueue;

std::vector<epoll_event> mEvents;

std::atomic<bool> haveConnections;
std::atomic<bool> haveNewConnections;
std::atomic<bool> haveDisconnects;

itc::sys::Nap nap;


bool error_bit(const uint32_t event) const
{
@@ -144,10 +146,10 @@ namespace LAppS

explicit IOWorker(const size_t id, const size_t maxConnections,const bool auto_fragment)
: Worker(id,maxConnections,auto_fragment), enableTLS(), enableStatsUpdate(),
mMayRun{true}, mCanStop{false}, mConnectionsMutex(), mInboundMutex(),
mDCQMutex(), mShakespeer(), mEPoll(std::make_shared<ePoll>()),
mInboundConnections(), mConnections(), mDCQueue(), mEvents(1000),
haveConnections{false},haveNewConnections{false},haveDisconnects{false}
mMayRun{true}, mCanStop{false}, mConnectionsMutex(),
/*mDCQMutex(),*/ mShakespeer(), mEPoll(std::make_shared<ePoll>()),
mConnections(), mDCQueue(20), mEvents(1000),
haveConnections{false},haveDisconnects{false}
{
mConnections.clear();
}
@@ -156,29 +158,10 @@ namespace LAppS
IOWorker(const IOWorker&)=delete;
IOWorker(IOWorker&)=delete;

void onUpdate(const ::itc::TCPListener::value_type& socketsptr)
{
ITCSyncLock sync(mInboundMutex);
mInboundConnections.push(std::move(socketsptr));
updateStats();
haveNewConnections.store(true);
}

void onUpdate(const std::vector<::itc::TCPListener::value_type>& socketsptr)
{
ITCSyncLock sync(mInboundMutex);
for(size_t i=0;i<socketsptr.size();++i)
{
mInboundConnections.push(std::move(socketsptr[i]));
}
updateStats();
haveNewConnections.store(true);
}

void disconnect(const int32_t fd)
{
ITCSyncLock sync(mDCQMutex);
mDCQueue.push(fd);
//ITCSyncLock sync(mDCQMutex);
mDCQueue.send(fd);
haveDisconnects.store(true);
}

@@ -211,17 +194,17 @@ namespace LAppS
processInbound();
if(haveDisconnects)
{
ITCSyncLock syncdcq(mDCQMutex);
//ITCSyncLock syncdcq(mDCQMutex);
ITCSyncLock sync(mConnectionsMutex);
while(!mDCQueue.empty())
{
const int32_t fd=std::move(mDCQueue.front());
mDCQueue.pop();
const int32_t fd=std::move(mDCQueue.recv());
//mDCQueue.pop();
deleteConnection(fd);
}
haveDisconnects.store(false);
}
if(haveConnections)
if(haveConnections.load())
{
try{
int ret=mEPoll->poll(mEvents,1);
@@ -250,8 +233,8 @@ namespace LAppS
}
else
{
itc::sys::Nap nap;
nap.usleep(1000);
processInbound();
}
}
mCanStop.store(true);
@@ -286,7 +269,7 @@ namespace LAppS

void updateStats()
{
mStats.mConnections=mConnections.size()+mInboundConnections.size();
mStats.mConnections=mConnections.size();
}

private:
@@ -295,16 +278,15 @@ namespace LAppS
**/
void processInbound()
{
if(haveNewConnections&&mConnectionsMutex.try_lock())
if(!LAppS::CInQ::getInstance()->empty())
{
ITCSyncLock sync(mInboundMutex);

while(!mInboundConnections.empty())
{
try{
auto current=mkWebSocket(mInboundConnections.front());
try{
itc::CSocketSPtr sockt;
if(LAppS::CInQ::getInstance()->try_recv(sockt))
{
auto current=mkWebSocket(sockt);
int fd=current->getfd();

ITCSyncLock sync(mConnectionsMutex);
if(mConnections.size()<mMaxConnections)
{
itc::getLog()->info(
@@ -325,16 +307,12 @@ namespace LAppS
"Too many connections, new connection from %s on fd %d is rejected",
current->getPeerAddress().c_str(),fd
);
}
}catch(const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Connection became invalid before handshake. Exception: %s",e.what());
}
}

mInboundConnections.pop();
}catch(const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Connection became invalid before handshake. Exception: %s",e.what());
}
mConnectionsMutex.unlock();
haveNewConnections.store(false);
}
}

14 changes: 8 additions & 6 deletions include/wsServer.h
Original file line number Diff line number Diff line change
@@ -52,9 +52,11 @@
#include <TLSServerContext.h>
#include <abstract/Worker.h>
#include <IOWorker.h>
#include <Balancer.h>
//#include <Balancer.h>
#include <ConnectionsInQueue.h>
#include <Deployer.h>
#include <NetworkACL.h>
#include <WSWorkersPool.h>

// libressl
#include <tls.h>
@@ -70,7 +72,7 @@ namespace LAppS
using WorkersPool=itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>;
using DeployerType=LAppS::Deployer<TLSEnable,StatsEnable>;
using DeployerThread=itc::sys::CancelableThread<DeployerType>;
using BalancerThread=itc::sys::CancelableThread<Balancer<TLSEnable,StatsEnable>>;
// using BalancerThread=itc::sys::CancelableThread<Balancer<TLSEnable,StatsEnable>>;

itc::utils::Bool2Type<TLSEnable> enableTLS;
itc::utils::Bool2Type<StatsEnable> enableStatsUpdate;
@@ -80,7 +82,7 @@ namespace LAppS
std::shared_ptr<LAppS::NetworkACL> mNACL;
DeployerThread mDeployer;
std::vector<TCPListenerThreadSPtr> mListenersPool;
BalancerThread mBalancer;
// BalancerThread mBalancer;

void loadNACL()
{
@@ -121,7 +123,7 @@ namespace LAppS
std::make_shared<::itc::TCPListener>(
LAppSConfig::getInstance()->getWSConfig()["ip"],
LAppSConfig::getInstance()->getWSConfig()["port"],
mBalancer.getRunnable(),
LAppS::CInQ::getInstance(),
[this](const uint32_t address)
{
switch(this->mNACL->mPolicy)
@@ -157,8 +159,8 @@ namespace LAppS
wsServer()
: enableTLS(), enableStatsUpdate(), mConnectionWeight(
LAppSConfig::getInstance()->getWSConfig()["connection_weight"]
), mWorkers(1), mAllStats{0,0,0,0,0,0,0,0}, mDeployer(std::make_shared<DeployerType>()),
mBalancer(std::make_shared<Balancer<TLSEnable, StatsEnable>>(mConnectionWeight))
), mWorkers(1), mAllStats{0,0,0,0,0,0,0,0}, mDeployer(std::make_shared<DeployerType>())//,
// mBalancer(std::make_shared<Balancer<TLSEnable, StatsEnable>>(mConnectionWeight))
{
itc::getLog()->info(__FILE__,__LINE__,"Starting WS Server");

0 comments on commit 0771b47

Please sign in to comment.