Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Per listener Balancer. Lock-free WorkersStats
Browse files Browse the repository at this point in the history
Pavel Kraynyukhov committed Jun 2, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent cd8e4a8 commit e00f34e
Showing 6 changed files with 250 additions and 321 deletions.
119 changes: 47 additions & 72 deletions include/Balancer.h
Original file line number Diff line number Diff line change
@@ -26,103 +26,78 @@

#include <TCPListener.h>
#include <WSWorkersPool.h>
#include <tsbqueue.h>
#include <sys/CancelableThread.h>
#include <atomic>
#include <memory>
#include <abstract/Runnable.h>
#include <ext/tsl/robin_map.h>

namespace LAppS
{


template <bool TLSEnable=true, bool StatsEnable=true>
class Balancer
: public ::itc::TCPListener::ViewType,
public ::itc::abstract::IRunnable
: public ::itc::TCPListener::ViewType
{
private:
using WorkersPool=itc::Singleton<LAppS::WSWorkersPool<TLSEnable,StatsEnable>>;

float mConnectionWeight;
std::atomic<bool> mMayRun;
itc::tsbqueue<::itc::TCPListener::value_type> mInbound;
std::vector<std::shared_ptr<::abstract::Worker>> mWorkersCache;

public:
void onUpdate(const ::itc::TCPListener::value_type& data)
{
mInbound.send(std::move(data));
}

void onUpdate(const std::vector<::itc::TCPListener::value_type>& data)
{
mInbound.send(std::move(data));
}

Balancer(const float connw=0.7):mConnectionWeight(connw),mMayRun{true},mWorkersCache(0)
{
WorkersPool::getInstance()->getWorkers(mWorkersCache);
}
void shutdown()
{
mMayRun.store(false);
}
void onCancel()
{
shutdown();
}
~Balancer()
{
this->shutdown();
}
void execute()
const size_t selectWorker(const tsl::robin_map<size_t,IOStats>& current_stats)
{
while(mMayRun)
if(current_stats.begin()!=current_stats.end())
{
try {
auto inbound_connection=std::move(mInbound.recv());
auto it=current_stats.begin();
size_t choosen=it.key();
IOStats stats_detail=it.value();

if(mWorkersCache.size()!=WorkersPool::getInstance()->size())
WorkersPool::getInstance()->getWorkers(mWorkersCache);

if(mWorkersCache.size()>0)
std::for_each(
current_stats.begin(),current_stats.end(),
[this,&stats_detail,&choosen](const auto& kv)
{
size_t chosen=0;
auto stats=mWorkersCache[0]->getMinStats();
for(size_t i=1;i<mWorkersCache.size();++i)
// no connections limit check, as it is the workers job to announce 403 Forbidden to the client
size_t m1=(kv.second.mConnections+kv.second.mInQueueDepth)*mConnectionWeight+kv.second.mEventQSize;
size_t m2=(stats_detail.mConnections+stats_detail.mInQueueDepth)*mConnectionWeight+stats_detail.mEventQSize;
if((m1)<(m2))
{
auto stats2=mWorkersCache[i]->getMinStats();
if(stats.mConnections>stats2.mConnections) // candidate i
{
chosen=i;
if(stats.mEventQSize > stats2.mEventQSize)
{
stats=stats2;
chosen=i;
}
}
else
{
if(stats.mEventQSize > stats2.mConnections*mConnectionWeight)
{
stats=stats2;
chosen=i;
}
}
choosen=kv.first;
stats_detail=kv.second;
}
mWorkersCache[chosen]->update(std::move(inbound_connection));
}else
{
mMayRun.store(false);
}
);
return choosen;
}
throw std::system_error(EINVAL,std::system_category(),"No workers are available");
}
public:
void onUpdate(const ::itc::TCPListener::value_type& data)
{
tsl::robin_map<size_t,IOStats> current_stats;
LAppS::WStats::getInstance()->getStats(current_stats);
try{
size_t choosen=selectWorker(current_stats);
auto worker=WorkersPool::getInstance()->get(choosen);
worker->getRunnable()->enqueue(data);
}catch(const std::exception& e)
{
// ignore
}
}

}catch (const std::exception& e)
void onUpdate(const std::vector<::itc::TCPListener::value_type>& data)
{
std::for_each(
data.begin(),data.end(),
[this](const auto& value)
{
mMayRun.store(false);
this->onUpdate(value);
}
}
);
}

explicit Balancer(const float connw=0.7):mConnectionWeight(connw)
{
}
~Balancer()=default;
};
}
#endif /* __BALANCER_H__ */
66 changes: 0 additions & 66 deletions include/ConnectionsInQueue.h

This file was deleted.

Loading

0 comments on commit e00f34e

Please sign in to comment.