Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/csp/engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(ENGINE_PUBLIC_HEADERS
OutputAdapter.h
PendingPushEvents.h
Profiler.h
PushPullEvent.h
PushEvent.h
PullInputAdapter.h
PushInputAdapter.h
Expand Down
34 changes: 34 additions & 0 deletions cpp/csp/engine/PushPullEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#ifndef _IN_CSP_ENGINE_PUSHPULLEVENT_H
#define _IN_CSP_ENGINE_PUSHPULLEVENT_H

namespace csp
{

class PushPullInputAdapter;

struct PushPullEvent
{
PushPullEvent( PushPullInputAdapter *adapter_, DateTime time_ ) : time( time_ ),
adapter( adapter_ ),
next( nullptr )
{}

DateTime time;
PushPullInputAdapter * adapter;
PushPullEvent * next;
};

template<typename T>
struct TypedPushPullEvent : public PushPullEvent
{
TypedPushPullEvent( PushPullInputAdapter *adapter, DateTime time,
T &&d ) : PushPullEvent( adapter, time ),
data( std::forward<T>( d ) )
{}

typename std::remove_reference<T>::type data;
};

}

#endif
96 changes: 57 additions & 39 deletions cpp/csp/engine/PushPullInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,66 @@ PushPullInputAdapter::PushPullInputAdapter( Engine *engine, CspTypePtr &type, Pu
PushGroup *group, bool adjustOutOfOrderTime )
: PushInputAdapter(engine, type, pushMode, group),
m_nextPullEvent(nullptr),
m_tailEvent(nullptr),
m_notifiedEndOfPull(false),
m_adjustOutOfOrderTime(adjustOutOfOrderTime)
{
}

PushPullInputAdapter::~PushPullInputAdapter()
{
//free up any unused events
while( m_nextPullEvent )
PushPullEvent * event = nextPullEvent();
while( event )
{
delete m_nextPullEvent;
m_nextPullEvent = nextPullEvent();
delete event;
event = nextPullEvent();
}
}

void PushPullInputAdapter::start( DateTime start, DateTime end )
{
m_nextPullEvent = nextPullEvent();
if( m_nextPullEvent )
{
m_timerHandle = rootEngine() -> scheduleCallback( m_nextPullEvent -> time,
[this]() { return processNextPullEvent() ? nullptr : this; } );
}
auto * nextEvent = nextPullEvent();
if( nextEvent )
scheduleNextPullEvent( nextEvent );
}

void PushPullInputAdapter::stop()
{
rootEngine() -> cancelCallback( m_timerHandle );
//shouldnt need to lock at this point
m_threadQueue.emplace( nullptr );
auto * replayCompleteEvent = new PushPullEvent( this, DateTime::NONE() );
rootEngine() -> pushPullEventQueue().push( replayCompleteEvent );
}

void PushPullInputAdapter::scheduleNextPullEvent( PushPullEvent * nextEvent )
{
//Note that we make nextEvent mutable in the lambda since we need to be able to update it in processNextPullEvent
//which can return false to force a rescheduled re-attempt with a new event pointer
m_timerHandle = rootEngine() -> scheduleCallback( nextEvent -> time,
[this, nextEvent]() mutable
{
return processNextPullEvent( nextEvent ) ? nullptr : this;
} );
}

bool PushPullInputAdapter::processNextPullEvent()
bool PushPullInputAdapter::processNextPullEvent( PushPullEvent *& nextEvent )
{
bool consumed = switchCspType( dataType(),
[ this ]( auto tag )
[ this, &nextEvent ]( auto tag )
{
using T = typename decltype(tag)::type;
TypedPullDataEvent<T> *tevent = static_cast<TypedPullDataEvent<T> *>( m_nextPullEvent );
TypedPushPullEvent<T> *tevent = static_cast<TypedPushPullEvent<T> *>( nextEvent );

bool consumed = consumeTick( tevent -> data );
assert( consumed );

delete tevent;

while( ( m_nextPullEvent = nextPullEvent() ) &&
m_nextPullEvent -> time == rootEngine() -> now() )
while( ( nextEvent = nextPullEvent() ) &&
nextEvent -> time == rootEngine() -> now() )
{
tevent = static_cast<TypedPullDataEvent<T> *>( m_nextPullEvent );
tevent = static_cast<TypedPushPullEvent<T> *>( nextEvent );
consumed = consumeTick( tevent -> data );
if( !consumed )
return false;
Expand All @@ -60,38 +75,41 @@ bool PushPullInputAdapter::processNextPullEvent()
return true;
} );

if( consumed && m_nextPullEvent )
{
m_timerHandle = rootEngine() -> scheduleCallback( m_nextPullEvent->time,
[this]() { return processNextPullEvent() ? nullptr : this; } );
}
if( consumed && nextEvent )
scheduleNextPullEvent( nextEvent );

return consumed;
}

PushPullInputAdapter::PullDataEvent * PushPullInputAdapter::nextPullEvent()
PushPullEvent * PushPullInputAdapter::nextPullEvent()
{
//spin while we wait for data
while( m_poppedPullEvents.empty() )
while( m_nextPullEvent == nullptr )
{
std::lock_guard<std::mutex> g( m_queueMutex );
m_threadQueue.swap( m_poppedPullEvents );
//Any PushPullInputAdapter instance can update events on any other adapter
PushPullEvent * event = rootEngine() -> pushPullEventQueue().popAll();
while( event )
{
PushPullEvent * next = event -> next;
event -> adapter -> setNextPushPullEvent( event );
event = next;
}
}

auto * event = m_poppedPullEvents.front();
m_poppedPullEvents.pop();
//DateTime of None is the sentinel value for replay complete
if( m_nextPullEvent -> time.isNone() )
return nullptr;

auto * event = m_nextPullEvent;
m_nextPullEvent = m_nextPullEvent -> next;

if( event )
{
//Always force time before start to start. There are two possibilities:
//- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime
//- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime
if( unlikely( event -> time < rootEngine() -> startTime() ) )
event -> time = rootEngine() -> startTime();

if( m_adjustOutOfOrderTime )
event -> time = std::max( event -> time, rootEngine() -> now() );
}
//Always force time before start to start. There are two possibilities:
//- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime
//- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime
if( unlikely( event -> time < rootEngine() -> startTime() ) )
event -> time = rootEngine() -> startTime();

if( m_adjustOutOfOrderTime )
event -> time = std::max( event -> time, rootEngine() -> now() );

return event;
}
Expand Down
60 changes: 27 additions & 33 deletions cpp/csp/engine/PushPullInputAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
#define _IN_CSP_ENGINE_PUSHPULLINPUTADAPTER_H

#include <csp/engine/PushInputAdapter.h>
#include <queue>
#include <csp/engine/PushPullEvent.h>

namespace csp
{

//A variation of PushInputAdapter that lets you schedule historical data as well. Used for adapters
//that can replay history and switch to realtime seamlessly ( ie kafka )

class PushPullInputAdapter : public PushInputAdapter
{
public:
PushPullInputAdapter( Engine * engine, CspTypePtr & type, PushMode pushMode,
PushGroup * group = nullptr, bool adjustOutOfOrderTime = false );
~PushPullInputAdapter();

template<typename T>
void pushTick( bool live, DateTime time, T &&value, PushBatch *batch = nullptr );
Expand All @@ -24,47 +25,44 @@ class PushPullInputAdapter : public PushInputAdapter
void stop() override;

protected:

struct PullDataEvent
{
DateTime time;
};

virtual PullDataEvent * nextPullEvent();
virtual PushPullEvent * nextPullEvent();

bool flaggedLive() const { return m_notifiedEndOfPull; }

private:
template<typename T>
struct TypedPullDataEvent : public PullDataEvent
void setNextPushPullEvent( PushPullEvent * event )
{
TypedPullDataEvent( DateTime t, T && d ) : PullDataEvent{ t },
data( std::forward<T>( d ) )
{}

typename std::remove_reference<T>::type data;
};

bool processNextPullEvent();
if( !m_nextPullEvent )
m_nextPullEvent = event;
else
{
assert( m_tailEvent );
assert( m_nextPullEvent );
m_tailEvent -> next = event;
}

using QueueT = std::queue<PullDataEvent *>;
std::mutex m_queueMutex;
QueueT m_threadQueue;
QueueT m_poppedPullEvents;
m_tailEvent = event;
event -> next = nullptr;
}

private:
bool processNextPullEvent( PushPullEvent *& nextEvent );
void scheduleNextPullEvent( PushPullEvent * nextEvent );

Scheduler::Handle m_timerHandle;
PullDataEvent * m_nextPullEvent;
PushPullEvent * m_nextPullEvent;
PushPullEvent * m_tailEvent;
bool m_notifiedEndOfPull; //flagged when we're done pushing pull values
bool m_adjustOutOfOrderTime;

};

inline void PushPullInputAdapter::flagReplayComplete()
{
if( unlikely( !m_notifiedEndOfPull ) )
{
m_notifiedEndOfPull = true;
std::lock_guard<std::mutex> g( m_queueMutex );
m_threadQueue.emplace( nullptr );
auto * replayCompleteEvent = new PushPullEvent( this, DateTime::NONE() );
rootEngine() -> pushPullEventQueue().push( replayCompleteEvent );
}
}

Expand All @@ -84,12 +82,8 @@ inline void PushPullInputAdapter::pushTick( bool live, DateTime time, T &&value,
if( unlikely( m_notifiedEndOfPull ) )
CSP_THROW( RuntimeException, "PushPullInputAdapter tried to push a sim tick after live tick" );

//TBD allocators
PullDataEvent * event = new TypedPullDataEvent<T>( time, std::forward<T>(value) );
{
std::lock_guard<std::mutex> g( m_queueMutex );
m_threadQueue.emplace( event );
}
PushPullEvent * event = new TypedPushPullEvent<T>( this, time, std::forward<T>(value) );
rootEngine() -> pushPullEventQueue().push( event );
}
}

Expand Down
10 changes: 8 additions & 2 deletions cpp/csp/engine/RootEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <csp/engine/PendingPushEvents.h>
#include <csp/engine/Profiler.h>
#include <csp/engine/PushEvent.h>
#include <csp/engine/PushPullEvent.h>
#include <csp/engine/Scheduler.h>
#include <memory>

Expand All @@ -35,7 +36,8 @@ class EndCycleListener

class RootEngine : public Engine
{
using PushEventQueue = SRMWLockFreeQueue<PushEvent>;
using PushEventQueue = SRMWLockFreeQueue<PushEvent>;
using PushPullEventQueue = SRMWLockFreeQueue<PushPullEvent>;

public:
RootEngine( const Dictionary & );
Expand Down Expand Up @@ -87,6 +89,8 @@ class RootEngine : public Engine

bool interrupted() const;

PushPullEventQueue & pushPullEventQueue() { return m_pushPullEventQueue; }

protected:
enum State { NONE, STARTING, RUNNING, SHUTDOWN, DONE };
using EndCycleListeners = std::vector<EndCycleListener*>;
Expand Down Expand Up @@ -129,7 +133,9 @@ class RootEngine : public Engine
bool m_inRealtime;
int m_initSignalCount;

PushEventQueue m_pushEventQueue;
PushEventQueue m_pushEventQueue;
//This queue is managed entirely from the PushPullInputAdapter
PushPullEventQueue m_pushPullEventQueue;

std::exception_ptr m_exception_ptr;
std::mutex m_exception_mutex;
Expand Down
2 changes: 1 addition & 1 deletion cpp/csp/python/PyPushPullInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PyPushPullInputAdapter : public PushPullInputAdapter
}

//override nextPullEvent so we can release GIL while we wait
PushPullInputAdapter::PullDataEvent * nextPullEvent() override
PushPullEvent * nextPullEvent() override
{
ReleaseGIL release;
return PushPullInputAdapter::nextPullEvent();
Expand Down
Loading