Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
0.7.1RC parallel send, dynamic service start/stop infrastructure
Browse files Browse the repository at this point in the history
Pavel Kraynyukhov committed Jun 7, 2018
1 parent e83c6b0 commit 07fa99a
Showing 12 changed files with 411 additions and 540 deletions.
61 changes: 31 additions & 30 deletions include/Application.h
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@
#include <ApplicationContext.h>
#include <tsbqueue.h>

#include <WSWorkersPool.h>
//#include <WSWorkersPool.h>
#include <Singleton.h>
#include <abstract/Worker.h>

@@ -55,20 +55,19 @@ using json = nlohmann::json;

namespace LAppS
{

template <bool TLSEnable, bool StatsEnable, ApplicationProtocol Tproto>
class Application : public ::abstract::Application
{
private:
typedef std::shared_ptr<::abstract::Worker> WorkerSPtrType;

std::atomic<bool> mMayRun;
std::atomic<bool> mCanStop;
std::string mName;
std::string mTarget;
size_t max_inbound_message_size;
ApplicationContext<TLSEnable,StatsEnable,Tproto> mAppContext;
itc::tsbqueue<TaggedEvent> mEvents;
std::vector<WorkerSPtrType> mWorkers;
itc::tsbqueue<abstract::InEvent> mEvents;

public:

@@ -98,50 +97,53 @@ namespace LAppS
: mMayRun{true},mName(appName), mTarget(target),
max_inbound_message_size(mims),mAppContext(appName,this)
{
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(mWorkers);
}

const size_t getMaxMSGSize() const
{
return max_inbound_message_size;
}

auto getWorker(const size_t wid)
const bool try_enqueue(const std::vector<abstract::InEvent>& events)
{
if( wid<mWorkers.size() )
try
{
return mWorkers[wid];
if(mEvents.try_send(events))
return true;
else return false;
}
else
{ // Attempt to refresh local workers;
mWorkers.clear();
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(mWorkers);
if( wid < mWorkers.size())
{
return mWorkers[wid];
}
else
{
itc::getLog()->error(__FILE__,__LINE__,"No worker with ID %d is found",wid);
throw std::system_error(EINVAL,std::system_category(),"No requested worker is found. The system is probably going down");
}
catch(const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue request to application %s, exception: %s",mName.c_str(),e.what());
return false;
}
}
void enqueue(const abstract::InEvent& event)
{
try {
mEvents.send(event);
}
catch (const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue request to application %s, exception: %s",mName.c_str(),e.what());
}
}

void enqueue(const TaggedEvent& event)
void enqueue(const std::vector<abstract::InEvent>& event)
{
try {
mEvents.send(event);
} catch (const std::exception& e)
}
catch (const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue request to application %s, exception: %s",mName.c_str(),e.what());
}
}

void enqueueDisconnect(const size_t wid, const int32_t sockfd)
void enqueueDisconnect(const std::shared_ptr<::abstract::WebSocket>& ws)
{
try {
mEvents.send({wid,sockfd,{WebSocketProtocol::CLOSE,nullptr}});
mEvents.send({WebSocketProtocol::CLOSE,ws,nullptr});
} catch (const std::exception& e)
{
itc::getLog()->error(__FILE__,__LINE__,"Can't enqueue close event to application %s, exception: %s",mName.c_str(),e.what());
@@ -154,15 +156,15 @@ namespace LAppS
{
try
{
auto te=std::move(mEvents.recv());
switch(te.event.type)
auto event=std::move(mEvents.recv());
switch(event.opcode)
{
case WebSocketProtocol::OpCode::CLOSE:
mAppContext.onDisconnect(te.wid,te.sockfd);
mAppContext.onDisconnect(event.websocket);
break;
default:
{
const bool exec_result=mAppContext.onMessage(te.wid,te.sockfd,te.event);
const bool exec_result=mAppContext.onMessage(event);
if(!exec_result)
{
mMayRun.store(false);
@@ -176,7 +178,6 @@ namespace LAppS
itc::getLog()->flush();
}
}
mAppContext.clearCache();
mCanStop.store(true);
itc::getLog()->info(__FILE__,__LINE__,"Application instance [%s] main loop is finished.",mName.c_str());
}
38 changes: 17 additions & 21 deletions include/ApplicationContext.h
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@ extern "C" {

#include <WebSocket.h>
#include <abstract/Worker.h>
#include <WSWorkersPool.h>
#include <abstract/ApplicationContext.h>

#include <modules/nljson.h>
@@ -54,6 +53,9 @@ namespace LAppS
{
private:
typedef std::shared_ptr<::abstract::Worker> WorkerSPtrType;
typedef WebSocket<TLSEnable,StatsEnable> WSType;
typedef std::shared_ptr<WSType> WSSPtrType;

enum LAppSInMessageType { INVALID, CN, CN_WITH_PARAMS, REQUEST, REQUEST_WITH_PARAMS };
itc::utils::Int2Type<Tproto> mProtocol;
abstract::Application* mParent;
@@ -186,15 +188,15 @@ namespace LAppS
else return INVALID;
}

const bool onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event, const itc::utils::Int2Type<ApplicationProtocol::RAW>& protocol_is_raw)
const bool onMessage(const abstract::InEvent& event, const itc::utils::Int2Type<ApplicationProtocol::RAW>& protocol_is_raw)
{
cleanLuaStack();

lua_getfield(mLState, LUA_GLOBALSINDEX, mName.c_str());
lua_getfield(mLState,-1,"onMessage");

lua_pushinteger(mLState,(workerID<<32)|socketfd); // handler
lua_pushinteger(mLState,event.type); // opcode
lua_pushinteger(mLState, (lua_Integer)(event.websocket.get()));
lua_pushinteger(mLState, event.opcode);
lua_pushlstring(mLState,(const char*)(event.message->data()),event.message->size());

callAppOnMessage();
@@ -230,21 +232,21 @@ namespace LAppS
lua_setmetatable(mLState, -2);
}

const bool onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event, const itc::utils::Int2Type<ApplicationProtocol::LAPPS>& protocol_is_lapps)
const bool onMessage(const abstract::InEvent& event, const itc::utils::Int2Type<ApplicationProtocol::LAPPS>& protocol_is_lapps)
{
// exceptions from json and from lua stack MUST be handled in the Application class
// We must prevent possibility to kill app with inappropriate message, therefore
// content errors may not throw the exceptions.

// Protocol violation, LAppS protocol accepts only binary messages (CBOR encoded)
if(event.type != WebSocketProtocol::OpCode::BINARY)
if(event.opcode != WebSocketProtocol::OpCode::BINARY)
return false;

cleanLuaStack();

auto message=std::make_shared<json>(json::from_cbor(*event.message));
auto msg=std::make_shared<json>(json::from_cbor(*event.message));

auto msg_type=getLAppSInMessageType(*message);
auto msg_type=getLAppSInMessageType(*msg);

// "Protocol violation, LAppS protocol accepts only binary messages (CBOR encoded)"
if(msg_type == INVALID)
@@ -253,9 +255,9 @@ namespace LAppS
lua_getfield(mLState, LUA_GLOBALSINDEX, mName.c_str());
lua_getfield(mLState,-1,"onMessage");

lua_pushinteger(mLState,(workerID<<32)|socketfd); // socket handler for ws::send
lua_pushinteger(mLState,(lua_Integer)(event.websocket.get())); // socket handler for ws::send
lua_pushinteger(mLState,msg_type);
pushRequest(message);
pushRequest(msg);

callAppOnMessage(); // handler, type, userdata

@@ -346,33 +348,27 @@ namespace LAppS
mLState=nullptr;
}
}
void clearCache()
{
mustStop.store(true);
workersCache.clear();
}

virtual ~ApplicationContext() noexcept
{
if(mLState)
{
this->shutdown();
}
}
const bool onMessage(const size_t workerID, const int32_t socketfd, const WSEvent& event)
const bool onMessage(const abstract::InEvent& event)
{
if(mustStop) return false;
if(workersCache.empty())
itc::Singleton<WSWorkersPool<TLSEnable,StatsEnable>>::getInstance()->getWorkers(workersCache);
return onMessage(workerID,socketfd,event,mProtocol);
return onMessage(std::move(event),mProtocol);
}

void onDisconnect(const size_t workerID, const int32_t sockfd)
void onDisconnect(const std::shared_ptr<::abstract::WebSocket>& ws)
{
this->cleanLuaStack();

lua_getfield(mLState, LUA_GLOBALSINDEX, mName.c_str());
lua_getfield(mLState,-1,"onDisconnect");
lua_pushinteger(mLState,(workerID<<32)|sockfd);
lua_pushinteger(mLState,(lua_Integer)(ws.get()));

int ret = lua_pcall (mLState, 1, 0, 0);
checkForLuaErrorsOnPcall(ret,"onDisconnect");
Loading

0 comments on commit 07fa99a

Please sign in to comment.