Skip to content

Commit

Permalink
Parameterize flush by performputs implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Nov 24, 2020
1 parent 58b0d87 commit 74df9b9
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 72 deletions.
23 changes: 20 additions & 3 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -982,8 +982,13 @@ namespace detail

~BufferedActions( );

adios2::Engine & getEngine( );
adios2::Engine & requireActiveStep( );
void
finalize();

adios2::Engine &
getEngine();
adios2::Engine &
requireActiveStep();

template < typename BA > void enqueue( BA && ba );

Expand All @@ -998,7 +1003,14 @@ namespace detail
* e.g. ending a step.)
*/
void
flush( bool performDatasetPutGets, bool writeAttributes = false );
flush( bool writeAttributes = false );

template< typename F >
void
flush(
F && performPutsGets,
bool writeAttributes,
bool flushUnconditionally );

/**
* @brief Begin or end an ADIOS step.
Expand Down Expand Up @@ -1089,6 +1101,11 @@ namespace detail
auxiliary::Option< AttributeMap_t > m_availableAttributes;
auxiliary::Option< AttributeMap_t > m_availableVariables;

/*
* finalize() will set this true to avoid running twice.
*/
bool finalized = false;

void
configure_IO( ADIOS2IOHandlerImpl & impl );
};
Expand Down
161 changes: 92 additions & 69 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ ADIOS2IOHandlerImpl::flush()
{
if ( m_dirty.find( p.first ) != m_dirty.end( ) )
{
p.second->flush(
/* performDatasetPutGets = */ true,
/* writeAttributes = */ false );
p.second->flush( /* writeAttributes = */ false );
}
else
{
Expand Down Expand Up @@ -380,12 +378,15 @@ ADIOS2IOHandlerImpl::closeFile(
if ( it != m_fileData.end( ) )
{
/*
* Do not perform Puts and Gets manually, engine will do that
* upon being closed.
* No need to finalize unconditionally, destructor will take care
* of it.
*/
it->second->flush(
/* performDatasetPutGets = */ false,
/* writeAttributes = */ true );
[]( detail::BufferedActions & ba, adios2::Engine & ) {
ba.finalize();
},
/* writeAttributes = */ true,
/* flushUnconditinoally = */ false );
m_fileData.erase( it );
}
}
Expand Down Expand Up @@ -1527,6 +1528,16 @@ namespace detail

BufferedActions::~BufferedActions()
{
finalize();
}

void
BufferedActions::finalize()
{
if( finalized )
{
return;
}
// if write accessing, ensure that the engine is opened
// and that all attributes are written
// (attributes are written upon closing a step or a file
Expand Down Expand Up @@ -1559,9 +1570,12 @@ namespace detail
m_ADIOS.RemoveIO( m_IOName );
}
}
finalized = true;
}

void BufferedActions::configure_IO(ADIOS2IOHandlerImpl& impl){
void
BufferedActions::configure_IO( ADIOS2IOHandlerImpl & impl )
{
( void )impl;
static std::set< std::string > streamingEngines = {
"sst", "insitumpi", "inline", "staging"
Expand Down Expand Up @@ -1784,10 +1798,45 @@ namespace detail
}

void
BufferedActions::flush( bool performDatasetPutGets, bool writeAttributes )
BufferedActions::flush( bool writeAttributes )
{
flush(
[]( BufferedActions & ba, adios2::Engine & eng ) {
switch( ba.m_mode )
{
case adios2::Mode::Write:
eng.PerformPuts();
break;
case adios2::Mode::Read:
eng.PerformGets();
break;
case adios2::Mode::Append:
// TODO order?
eng.PerformGets();
eng.PerformPuts();
break;
default:
break;
}
},
writeAttributes,
/* flushUnconditionally = */ false );
}

template< typename F >
void
BufferedActions::flush(
F && performPutGets,
bool writeAttributes,
bool flushUnconditionally )
{
if( streamStatus == StreamStatus::StreamOver )
{
if( flushUnconditionally )
{
throw std::runtime_error(
"[ADIOS2] Cannot access engine since stream is over." );
}
return;
}
auto & eng = getEngine();
Expand All @@ -1797,8 +1846,13 @@ namespace detail
if( streamStatus == StreamStatus::OutsideOfStep )
{
if( m_buffer.empty() &&
( !writeAttributes || m_attributeWrites.empty() ) )
( !writeAttributes || m_attributeWrites.empty() ) &&
m_attributeReads.empty() )
{
if( flushUnconditionally )
{
performPutGets( *this, eng );
}
return;
}
else
Expand All @@ -1817,41 +1871,24 @@ namespace detail
pair.second.run( *this );
}
}
if( performDatasetPutGets )

for( auto & ba : m_buffer )
{
for( auto & ba : m_buffer )
{
ba->run( *this );
}
// Flush() does not necessarily perform
// deferred actions....
switch ( m_mode )
{
case adios2::Mode::Write:
eng.PerformPuts( );
break;
case adios2::Mode::Read:
eng.PerformGets( );
break;
case adios2::Mode::Append:
// TODO order?
eng.PerformGets();
eng.PerformPuts();
break;
default:
break;
}
m_buffer.clear();
ba->run( *this );
}

for( BufferedAttributeRead & task : m_attributeReads )
{
task.run( *this );
}
m_attributeReads.clear();
if( writeAttributes )
{
m_attributeWrites.clear();
}
performPutGets( *this, eng );

m_buffer.clear();

for( BufferedAttributeRead & task : m_attributeReads )
{
task.run( *this );
}
m_attributeReads.clear();
if( writeAttributes )
{
m_attributeWrites.clear();
}
}

Expand All @@ -1860,9 +1897,7 @@ namespace detail
{
if( useAdiosSteps == Steps::DontUseSteps )
{
flush(
/* performDatasetPutGets = */ true,
/* writeAttributes = */ false );
flush( /* writeAttributes = */ false );
return AdvanceStatus::OK;
}
switch( mode )
Expand All @@ -1882,22 +1917,12 @@ namespace detail
{
getEngine().BeginStep();
}
/*
* Do not perform Puts and Gets manually,
* engine will do that upon EndStep.
*/
flush(
/* performDatasetPutGets = */ false,
/* writeAttributes = */ true );
getEngine().EndStep();
m_buffer.clear();
/*
* Flush a second time to read attributes, now that they've been
* preloaded.
*/
flush(
/* performDatasetPutGets = */ true,
/* writeAttributes = */ false );
[]( BufferedActions &, adios2::Engine & eng ) {
eng.EndStep();
},
/* writeAttributes = */ true,
/* flushUnconditinoally = */ true );
streamStatus = StreamStatus::OutsideOfStep;
return AdvanceStatus::OK;
}
Expand All @@ -1911,15 +1936,13 @@ namespace detail
// return status is stored in m_lastStepStatus
if( streamStatus != StreamStatus::DuringStep )
{
/*
* Do not perform Puts and Gets,
* Engine is outside of a step anyway.
*/
flush(
/* performDatasetPutGets = */ false,
/* writeAttributes = */ false );
adiosStatus = getEngine().BeginStep();
m_buffer.clear();
[ &adiosStatus ](
BufferedActions &, adios2::Engine & engine ) {
adiosStatus = engine.BeginStep();
},
/* writeAttributes = */ false,
/* flushUnconditinoally = */ true );
if( adiosStatus == adios2::StepStatus::OK &&
m_mode == adios2::Mode::Read )
{
Expand Down

0 comments on commit 74df9b9

Please sign in to comment.