Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug where a node can be stopped when it hasn't started #164

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/cmake/modules/FindCSP.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# CSP_BASELIB_STATIC_LIBRARY
# CSP_BASKETLIB_LIBRARY
# CSP_BASKETLIB_STATIC_LIBRARY
# CSP_TESTLIB_LIBRARY
# CSP_MATH_LIBRARY
# CSP_MATH_STATIC_LIBRARY
# CSP_STATS_LIBRARY
Expand Down Expand Up @@ -74,6 +75,8 @@ find_library(CSP_BASELIB_STATIC_LIBRARY NAMES libbaselibimpl_static.a PATHS "${_
find_library(CSP_BASKETLIB_LIBRARY NAMES _cspbasketlibimpl.so PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)
find_library(CSP_BASKETLIB_STATIC_LIBRARY NAMES libbasketlibimpl_static.a PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)

find_library(CSP_TESTLIB_LIBRARY NAMES _csptestlibimpl.so PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)

find_library(CSP_MATH_LIBRARY NAMES _cspmathimpl.so PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)
find_library(CSP_MATH_STATIC_LIBRARY NAMES libmathimpl_static.a PATHS "${__csp_lib_path}" NO_DEFAULT_PATH)

Expand Down
2 changes: 1 addition & 1 deletion cpp/csp/engine/AdapterManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ManagedSimInputAdapter::ManagedSimInputAdapter( csp::Engine * engine,
{
}

AdapterManager::AdapterManager( csp::Engine * engine ) : m_engine( engine ), m_statusAdapter( nullptr )
AdapterManager::AdapterManager( csp::Engine * engine ) : m_engine( engine ), m_statusAdapter( nullptr ), m_started( false )
{
if( !m_engine -> isRootEngine() )
CSP_THROW( NotImplemented, "AdapterManager support is not currently available in dynamic graphs" );
Expand Down
4 changes: 4 additions & 0 deletions cpp/csp/engine/AdapterManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class AdapterManager : public EngineOwned

DateTime starttime() const { return m_starttime; }
DateTime endtime() const { return m_endtime; }

void setStarted() { m_started = true; }
bool started() const { return m_started; }

StatusAdapter *createStatusAdapter( CspTypePtr &type, PushMode pushMode );
void pushStatus( int64_t level, int64_t errCode, const std::string &errMsg, PushBatch *batch = nullptr ) const;
Expand All @@ -134,6 +137,7 @@ class AdapterManager : public EngineOwned
DateTime m_starttime;
DateTime m_endtime;
StatusAdapter * m_statusAdapter;
bool m_started;
};

inline void AdapterManager::scheduleTimerCB( DateTime next )
Expand Down
3 changes: 2 additions & 1 deletion cpp/csp/engine/Consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ namespace csp

Consumer::Consumer( Engine * engine ) : m_engine( engine ),
m_next( nullptr ),
m_rank( -1 )
m_rank( -1 ),
m_started( false )
{
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/csp/engine/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class Consumer
DateTime now() const { return rootEngine() -> now(); }
uint64_t cycleCount() const { return rootEngine() -> cycleCount(); }

void setStarted() { m_started = true; }
bool started() const { return m_started; }

//called when input timeseries has an event, schedules in
//step propagation. See if we can do better than virtual per tick...
virtual void handleEvent( InputId id )
Expand Down Expand Up @@ -126,6 +129,7 @@ class Consumer
Consumer * m_next;

int32_t m_rank;
bool m_started;
};

};
Expand Down
47 changes: 38 additions & 9 deletions cpp/csp/engine/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,36 @@ void Engine::start()
auto start = std::max( m_rootEngine -> now(), m_rootEngine -> startTime() );
auto end = m_rootEngine -> endTime();


//start up managers
for( auto & manager : m_adapterManagers )
{
manager -> start( start, end );
manager -> setStarted();
}

//start up output adapters
for( auto & adapter : m_outputAdapters )
{
adapter -> start();
adapter -> setStarted();
}

for( auto & entry : m_graphOutputs )
{
if( entry.second -> engine() == this )
entry.second -> start();
auto & graphOutputAdapter = entry.second;
if( graphOutputAdapter -> engine() == this )
{
graphOutputAdapter -> start();
graphOutputAdapter -> setStarted();
}
}

//start up input adapters
for( auto & adapter : m_inputAdapters )
{
adapter -> start( start, end );
adapter -> setStarted();
}

//see registerOwnedObject( AdapterManager ) above, we register adapter managers with root. At this point we dont
//need the list of mgrs created in a dynamic engine anymore, so we clear out the mem ( and effetively take them out of the stop() list for dynamic shutdown )
Expand All @@ -182,28 +194,45 @@ void Engine::start()

//startup nodes
for( auto & node : m_nodes )
{
node -> start();
node -> setStarted();
}
}

void Engine::stop()
{
// Ensure we only stop nodes/adapters that have started in the case an exception occurs during startup
for( auto & node : m_nodes )
node -> stop();
{
if( node -> started() )
node -> stop();
}

for( auto & adapter : m_inputAdapters )
adapter -> stop();
{
if( adapter -> started() )
adapter -> stop();
}

for( auto & entry : m_graphOutputs )
{
if( entry.second -> engine() == this )
entry.second -> stop();
auto & graphOutputAdapter = entry.second;
if( graphOutputAdapter -> started() && graphOutputAdapter -> engine() == this )
graphOutputAdapter -> stop();
}

for( auto & adapter : m_outputAdapters )
adapter -> stop();
{
if( adapter -> started() )
adapter -> stop();
}

for( auto & manager : m_adapterManagers )
manager -> stop();
{
if( manager -> started() )
manager -> stop();
}
}

}
3 changes: 2 additions & 1 deletion cpp/csp/engine/InputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ namespace csp
{

InputAdapter::InputAdapter( Engine *engine, const CspTypePtr &type, PushMode pushMode ) : m_rootEngine( engine -> rootEngine() ),
m_pushMode( pushMode )
m_pushMode( pushMode ),
m_started( false )
{
if( pushMode == PushMode::BURST )
init( CspArrayType::create( type ) );
Expand Down
4 changes: 4 additions & 0 deletions cpp/csp/engine/InputAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class InputAdapter : public TimeSeriesProvider, public EngineOwned

PushMode pushMode() const { return m_pushMode; }

void setStarted() { m_started = true; }
bool started() const { return m_started; }

//if adapter is BURST this will return the type of the data, rather than the BURST vector<Data>
const CspType * dataType() const
{
Expand All @@ -46,6 +49,7 @@ class InputAdapter : public TimeSeriesProvider, public EngineOwned
protected:
RootEngine * m_rootEngine;
PushMode m_pushMode;
bool m_started;
};

template<typename T>
Expand Down
6 changes: 5 additions & 1 deletion cpp/csp/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ target_link_libraries(cspmathimpl cspimpl mathimpl)
add_library(cspstatsimpl SHARED cspstatsimpl.cpp)
target_link_libraries(cspstatsimpl cspimpl statsimpl)

## Testlib c++ module
add_library(csptestlibimpl SHARED csptestlibimpl.cpp)
target_link_libraries(csptestlibimpl cspimpl)

## NumPy stats c++ module
add_library(npstatsimpl STATIC npstatsimpl.cpp)
add_library(cspnpstatsimpl SHARED cspnpstatsimpl.cpp)
target_link_libraries(cspnpstatsimpl cspimpl npstatsimpl)
target_include_directories(npstatsimpl PRIVATE ${NUMPY_INCLUDE_DIRS})
target_include_directories(cspnpstatsimpl PRIVATE ${NUMPY_INCLUDE_DIRS})

install(TARGETS csptypesimpl cspimpl cspbaselibimpl cspbasketlibimpl cspmathimpl cspstatsimpl cspnpstatsimpl
install(TARGETS csptypesimpl cspimpl cspbaselibimpl cspbasketlibimpl cspmathimpl cspstatsimpl csptestlibimpl cspnpstatsimpl
PUBLIC_HEADER DESTINATION include/csp/python
RUNTIME DESTINATION bin/
LIBRARY DESTINATION lib/
Expand Down
97 changes: 97 additions & 0 deletions cpp/csp/python/csptestlibimpl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include <Python.h>
#include <csp/engine/CppNode.h>
#include <csp/python/Conversions.h>
#include <csp/python/InitHelper.h>
#include <csp/python/PyCppNode.h>
#include <csp/python/PyObjectPtr.h>

namespace csp::cppnodes
{

// Expose C++ nodes for testing in Python
// Keep nodes for a specific test to a single namespace under testing

namespace testing
{

namespace stop_start_test
{

using namespace csp::python;

void setStatus( const DialectGenericType & obj_, const std::string & name )
{
PyObjectPtr obj = PyObjectPtr::own( toPython( obj_ ) );
PyObjectPtr attr = PyObjectPtr::own( PyUnicode_FromString( name.c_str() ) );
PyObject_SetAttr( obj.get(), attr.get(), Py_True );
}

DECLARE_CPPNODE( start_n1_set_value )
{
INIT_CPPNODE( start_n1_set_value ) {}

SCALAR_INPUT( DialectGenericType, obj_ );

START()
{
setStatus( obj_, "n1_started" );
}
INVOKE() {}

STOP()
{
setStatus( obj_, "n1_stopped" );
}
};
EXPORT_CPPNODE( start_n1_set_value );

DECLARE_CPPNODE( start_n2_throw )
{
INIT_CPPNODE( start_n2_throw ) {}

SCALAR_INPUT( DialectGenericType, obj_ );

START()
{
CSP_THROW( ValueError, "n2 start failed" );
}
INVOKE() {}

STOP()
{
setStatus( obj_, "n2_stopped" );
}
};
EXPORT_CPPNODE( start_n2_throw );

}

}

}

// Test nodes
REGISTER_CPPNODE( csp::cppnodes::testing::stop_start_test, start_n1_set_value );
REGISTER_CPPNODE( csp::cppnodes::testing::stop_start_test, start_n2_throw );

static PyModuleDef _csptestlibimpl_module = {
PyModuleDef_HEAD_INIT,
"_csptestlibimpl",
"_csptestlibimpl c++ module",
-1,
NULL, NULL, NULL, NULL, NULL
};

PyMODINIT_FUNC PyInit__csptestlibimpl(void)
{
PyObject* m;

m = PyModule_Create( &_csptestlibimpl_module);
if( m == NULL )
return NULL;

if( !csp::python::InitHelper::instance().execute( m ) )
return NULL;

return m;
}
20 changes: 13 additions & 7 deletions csp/impl/wiring/node_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,17 +841,23 @@ def _parse_impl(self):
self._startblock = [self.visit(node) for node in self._startblock]

init_block = node_proxy + ts_in_proxies + ts_out_proxies + ts_vars

# Yield before start block so we can setup stack frame before executing
startblock = [ast.Expr(value=ast.Yield(value=None))] + self._stateblock + self._startblock

start_and_body = startblock + [ast.While(test=ast.NameConstant(value=True), orelse=[], body=innerbody)]
startblock = self._stateblock + self._startblock
body = [ast.While(test=ast.NameConstant(value=True), orelse=[], body=innerbody)]

if self._stopblock:
self._stopblock = [self.visit(node) for node in self._stopblock]
# For stop we wrap start and body in a try / finally ( not the init block, if that fails it's unrecoverable )
start_and_body = [ast.Try(body=start_and_body, finalbody=self._stopblock, handlers=[], orelse=[])]

# For stop we wrap the body of a node in a try / finally
# If the init block fails it's unrecoverable, and if the start block raises we don't want to stop that specific node
start_and_body = startblock + [ast.Try(body=body, finalbody=self._stopblock, handlers=[], orelse=[])]

else:
start_and_body = startblock + body

# Yield before start block so we can setup stack frame before executing
# However, this initial yield shouldn't be within the try-finally block, since if a node does not start, it's stop() logic should not be invoked
# This avoids an issue where one node raises an exception upon start(), and then other nodes execute their stop() without having ever started
start_and_body = [ast.Expr(value=ast.Yield(value=None))] + start_and_body
newbody = init_block + start_and_body

newfuncdef = ast.FunctionDef(name=self._name, body=newbody, returns=None)
Expand Down
Loading