From 18caa6365330d99dd9c88065a0e2206b6ae275cc Mon Sep 17 00:00:00 2001 From: Rob Ambalu Date: Tue, 25 Nov 2025 15:08:07 -0500 Subject: [PATCH] PushPullInputAdapter - improve message passing queue to use shared SRMWLockFree queue instead of individual mutex locked std::queue Signed-off-by: Rob Ambalu --- cpp/csp/engine/CMakeLists.txt | 1 + cpp/csp/engine/PushPullEvent.h | 34 ++++++++ cpp/csp/engine/PushPullInputAdapter.cpp | 96 ++++++++++++++--------- cpp/csp/engine/PushPullInputAdapter.h | 60 +++++++------- cpp/csp/engine/RootEngine.h | 10 ++- cpp/csp/python/PyPushPullInputAdapter.cpp | 2 +- 6 files changed, 128 insertions(+), 75 deletions(-) create mode 100644 cpp/csp/engine/PushPullEvent.h diff --git a/cpp/csp/engine/CMakeLists.txt b/cpp/csp/engine/CMakeLists.txt index ce035b691..b14a88b5c 100644 --- a/cpp/csp/engine/CMakeLists.txt +++ b/cpp/csp/engine/CMakeLists.txt @@ -41,6 +41,7 @@ set(ENGINE_PUBLIC_HEADERS OutputAdapter.h PendingPushEvents.h Profiler.h + PushPullEvent.h PushEvent.h PullInputAdapter.h PushInputAdapter.h diff --git a/cpp/csp/engine/PushPullEvent.h b/cpp/csp/engine/PushPullEvent.h new file mode 100644 index 000000000..c40968632 --- /dev/null +++ b/cpp/csp/engine/PushPullEvent.h @@ -0,0 +1,34 @@ +#ifndef _IN_CSP_ENGINE_PUSHPULLEVENT_H +#define _IN_CSP_ENGINE_PUSHPULLEVENT_H + +namespace csp +{ + +class PushPullInputAdapter; + +struct PushPullEvent +{ + PushPullEvent( PushPullInputAdapter *adapter_, DateTime time_ ) : time( time_ ), + adapter( adapter_ ), + next( nullptr ) + {} + + DateTime time; + PushPullInputAdapter * adapter; + PushPullEvent * next; +}; + +template +struct TypedPushPullEvent : public PushPullEvent +{ + TypedPushPullEvent( PushPullInputAdapter *adapter, DateTime time, + T &&d ) : PushPullEvent( adapter, time ), + data( std::forward( d ) ) + {} + + typename std::remove_reference::type data; +}; + +} + +#endif diff --git a/cpp/csp/engine/PushPullInputAdapter.cpp b/cpp/csp/engine/PushPullInputAdapter.cpp index 5dd1c86aa..1c7635b57 100644 --- a/cpp/csp/engine/PushPullInputAdapter.cpp +++ b/cpp/csp/engine/PushPullInputAdapter.cpp @@ -6,51 +6,66 @@ PushPullInputAdapter::PushPullInputAdapter( Engine *engine, CspTypePtr &type, Pu PushGroup *group, bool adjustOutOfOrderTime ) : PushInputAdapter(engine, type, pushMode, group), m_nextPullEvent(nullptr), + m_tailEvent(nullptr), m_notifiedEndOfPull(false), m_adjustOutOfOrderTime(adjustOutOfOrderTime) +{ +} + +PushPullInputAdapter::~PushPullInputAdapter() { //free up any unused events - while( m_nextPullEvent ) + PushPullEvent * event = nextPullEvent(); + while( event ) { - delete m_nextPullEvent; - m_nextPullEvent = nextPullEvent(); + delete event; + event = nextPullEvent(); } } void PushPullInputAdapter::start( DateTime start, DateTime end ) { - m_nextPullEvent = nextPullEvent(); - if( m_nextPullEvent ) - { - m_timerHandle = rootEngine() -> scheduleCallback( m_nextPullEvent -> time, - [this]() { return processNextPullEvent() ? nullptr : this; } ); - } + auto * nextEvent = nextPullEvent(); + if( nextEvent ) + scheduleNextPullEvent( nextEvent ); } void PushPullInputAdapter::stop() { rootEngine() -> cancelCallback( m_timerHandle ); //shouldnt need to lock at this point - m_threadQueue.emplace( nullptr ); + auto * replayCompleteEvent = new PushPullEvent( this, DateTime::NONE() ); + rootEngine() -> pushPullEventQueue().push( replayCompleteEvent ); +} + +void PushPullInputAdapter::scheduleNextPullEvent( PushPullEvent * nextEvent ) +{ + //Note that we make nextEvent mutable in the lambda since we need to be able to update it in processNextPullEvent + //which can return false to force a rescheduled re-attempt with a new event pointer + m_timerHandle = rootEngine() -> scheduleCallback( nextEvent -> time, + [this, nextEvent]() mutable + { + return processNextPullEvent( nextEvent ) ? nullptr : this; + } ); } -bool PushPullInputAdapter::processNextPullEvent() +bool PushPullInputAdapter::processNextPullEvent( PushPullEvent *& nextEvent ) { bool consumed = switchCspType( dataType(), - [ this ]( auto tag ) + [ this, &nextEvent ]( auto tag ) { using T = typename decltype(tag)::type; - TypedPullDataEvent *tevent = static_cast *>( m_nextPullEvent ); + TypedPushPullEvent *tevent = static_cast *>( nextEvent ); bool consumed = consumeTick( tevent -> data ); assert( consumed ); delete tevent; - while( ( m_nextPullEvent = nextPullEvent() ) && - m_nextPullEvent -> time == rootEngine() -> now() ) + while( ( nextEvent = nextPullEvent() ) && + nextEvent -> time == rootEngine() -> now() ) { - tevent = static_cast *>( m_nextPullEvent ); + tevent = static_cast *>( nextEvent ); consumed = consumeTick( tevent -> data ); if( !consumed ) return false; @@ -60,38 +75,41 @@ bool PushPullInputAdapter::processNextPullEvent() return true; } ); - if( consumed && m_nextPullEvent ) - { - m_timerHandle = rootEngine() -> scheduleCallback( m_nextPullEvent->time, - [this]() { return processNextPullEvent() ? nullptr : this; } ); - } + if( consumed && nextEvent ) + scheduleNextPullEvent( nextEvent ); return consumed; } -PushPullInputAdapter::PullDataEvent * PushPullInputAdapter::nextPullEvent() +PushPullEvent * PushPullInputAdapter::nextPullEvent() { - //spin while we wait for data - while( m_poppedPullEvents.empty() ) + while( m_nextPullEvent == nullptr ) { - std::lock_guard g( m_queueMutex ); - m_threadQueue.swap( m_poppedPullEvents ); + //Any PushPullInputAdapter instance can update events on any other adapter + PushPullEvent * event = rootEngine() -> pushPullEventQueue().popAll(); + while( event ) + { + PushPullEvent * next = event -> next; + event -> adapter -> setNextPushPullEvent( event ); + event = next; + } } - auto * event = m_poppedPullEvents.front(); - m_poppedPullEvents.pop(); + //DateTime of None is the sentinel value for replay complete + if( m_nextPullEvent -> time.isNone() ) + return nullptr; + + auto * event = m_nextPullEvent; + m_nextPullEvent = m_nextPullEvent -> next; - if( event ) - { - //Always force time before start to start. There are two possibilities: - //- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime - //- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime - if( unlikely( event -> time < rootEngine() -> startTime() ) ) - event -> time = rootEngine() -> startTime(); - - if( m_adjustOutOfOrderTime ) - event -> time = std::max( event -> time, rootEngine() -> now() ); - } + //Always force time before start to start. There are two possibilities: + //- User asked to replay from EARLIEST, so they should get all ticks replayed and we cant replay before starttime + //- User asked to replay from STARTTIME in which case, if the adapter is written correctly, we shouldnt get ticks before starttime + if( unlikely( event -> time < rootEngine() -> startTime() ) ) + event -> time = rootEngine() -> startTime(); + + if( m_adjustOutOfOrderTime ) + event -> time = std::max( event -> time, rootEngine() -> now() ); return event; } diff --git a/cpp/csp/engine/PushPullInputAdapter.h b/cpp/csp/engine/PushPullInputAdapter.h index 2cd46bee7..b3fa9f756 100644 --- a/cpp/csp/engine/PushPullInputAdapter.h +++ b/cpp/csp/engine/PushPullInputAdapter.h @@ -2,18 +2,19 @@ #define _IN_CSP_ENGINE_PUSHPULLINPUTADAPTER_H #include -#include +#include namespace csp { + //A variation of PushInputAdapter that lets you schedule historical data as well. Used for adapters //that can replay history and switch to realtime seamlessly ( ie kafka ) - class PushPullInputAdapter : public PushInputAdapter { public: PushPullInputAdapter( Engine * engine, CspTypePtr & type, PushMode pushMode, PushGroup * group = nullptr, bool adjustOutOfOrderTime = false ); + ~PushPullInputAdapter(); template void pushTick( bool live, DateTime time, T &&value, PushBatch *batch = nullptr ); @@ -24,38 +25,35 @@ class PushPullInputAdapter : public PushInputAdapter void stop() override; protected: - - struct PullDataEvent - { - DateTime time; - }; - virtual PullDataEvent * nextPullEvent(); + virtual PushPullEvent * nextPullEvent(); bool flaggedLive() const { return m_notifiedEndOfPull; } -private: - template - struct TypedPullDataEvent : public PullDataEvent + void setNextPushPullEvent( PushPullEvent * event ) { - TypedPullDataEvent( DateTime t, T && d ) : PullDataEvent{ t }, - data( std::forward( d ) ) - {} - - typename std::remove_reference::type data; - }; - - bool processNextPullEvent(); + if( !m_nextPullEvent ) + m_nextPullEvent = event; + else + { + assert( m_tailEvent ); + assert( m_nextPullEvent ); + m_tailEvent -> next = event; + } - using QueueT = std::queue; - std::mutex m_queueMutex; - QueueT m_threadQueue; - QueueT m_poppedPullEvents; + m_tailEvent = event; + event -> next = nullptr; + } + +private: + bool processNextPullEvent( PushPullEvent *& nextEvent ); + void scheduleNextPullEvent( PushPullEvent * nextEvent ); + Scheduler::Handle m_timerHandle; - PullDataEvent * m_nextPullEvent; + PushPullEvent * m_nextPullEvent; + PushPullEvent * m_tailEvent; bool m_notifiedEndOfPull; //flagged when we're done pushing pull values bool m_adjustOutOfOrderTime; - }; inline void PushPullInputAdapter::flagReplayComplete() @@ -63,8 +61,8 @@ inline void PushPullInputAdapter::flagReplayComplete() if( unlikely( !m_notifiedEndOfPull ) ) { m_notifiedEndOfPull = true; - std::lock_guard g( m_queueMutex ); - m_threadQueue.emplace( nullptr ); + auto * replayCompleteEvent = new PushPullEvent( this, DateTime::NONE() ); + rootEngine() -> pushPullEventQueue().push( replayCompleteEvent ); } } @@ -84,12 +82,8 @@ inline void PushPullInputAdapter::pushTick( bool live, DateTime time, T &&value, if( unlikely( m_notifiedEndOfPull ) ) CSP_THROW( RuntimeException, "PushPullInputAdapter tried to push a sim tick after live tick" ); - //TBD allocators - PullDataEvent * event = new TypedPullDataEvent( time, std::forward(value) ); - { - std::lock_guard g( m_queueMutex ); - m_threadQueue.emplace( event ); - } + PushPullEvent * event = new TypedPushPullEvent( this, time, std::forward(value) ); + rootEngine() -> pushPullEventQueue().push( event ); } } diff --git a/cpp/csp/engine/RootEngine.h b/cpp/csp/engine/RootEngine.h index 746704580..d1e0be342 100644 --- a/cpp/csp/engine/RootEngine.h +++ b/cpp/csp/engine/RootEngine.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -35,7 +36,8 @@ class EndCycleListener class RootEngine : public Engine { - using PushEventQueue = SRMWLockFreeQueue; + using PushEventQueue = SRMWLockFreeQueue; + using PushPullEventQueue = SRMWLockFreeQueue; public: RootEngine( const Dictionary & ); @@ -87,6 +89,8 @@ class RootEngine : public Engine bool interrupted() const; + PushPullEventQueue & pushPullEventQueue() { return m_pushPullEventQueue; } + protected: enum State { NONE, STARTING, RUNNING, SHUTDOWN, DONE }; using EndCycleListeners = std::vector; @@ -129,7 +133,9 @@ class RootEngine : public Engine bool m_inRealtime; int m_initSignalCount; - PushEventQueue m_pushEventQueue; + PushEventQueue m_pushEventQueue; + //This queue is managed entirely from the PushPullInputAdapter + PushPullEventQueue m_pushPullEventQueue; std::exception_ptr m_exception_ptr; std::mutex m_exception_mutex; diff --git a/cpp/csp/python/PyPushPullInputAdapter.cpp b/cpp/csp/python/PyPushPullInputAdapter.cpp index 1fa12538e..784822f2f 100644 --- a/cpp/csp/python/PyPushPullInputAdapter.cpp +++ b/cpp/csp/python/PyPushPullInputAdapter.cpp @@ -22,7 +22,7 @@ class PyPushPullInputAdapter : public PushPullInputAdapter } //override nextPullEvent so we can release GIL while we wait - PushPullInputAdapter::PullDataEvent * nextPullEvent() override + PushPullEvent * nextPullEvent() override { ReleaseGIL release; return PushPullInputAdapter::nextPullEvent();