Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multithreaded CPU using single GPU in demo loop #774

Merged
merged 6 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion app/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ if(CELERITAS_BUILD_DEMOS)
demo-loop/demo-loop.cc
demo-loop/RunnerInputIO.json.cc
demo-loop/Runner.cc
demo-loop/RunnerOutput.cc
demo-loop/Transporter.cc
)

Expand Down Expand Up @@ -306,8 +307,15 @@ if(CELERITAS_BUILD_DEMOS)
"CELER_LOG=debug"
"CELER_DISABLE_DEVICE=1"
"CELER_DISABLE_PARALLEL=1"
${_omp_env}
)
if(CELERITAS_USE_OpenMP)
# TODO: update when OpenMP nested parallelism is enabled
if(CELERITAS_USE_ROOT)
list(APPEND _env "OMP_NUM_THREADS=1")
sethrj marked this conversation as resolved.
Show resolved Hide resolved
else()
list(APPEND _env "OMP_NUM_THREADS=4")
endif()
endif()
if(NOT CELERITAS_CORE_GEO STREQUAL "VecGeom")
list(APPEND _env "CELER_DISABLE_VECGEOM=1")
endif()
Expand Down
105 changes: 75 additions & 30 deletions app/demo-loop/Runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Runner::Runner(RunnerInput const& inp, SPOutputRegistry output)
this->build_diagnostics(inp);
this->build_step_collectors(inp);
this->build_transporter_input(inp);
this->build_primaries(inp);
this->build_events(inp);
use_device_ = inp.use_device;

if (root_manager_)
Expand All @@ -90,7 +90,8 @@ Runner::Runner(RunnerInput const& inp, SPOutputRegistry output)
write_to_root(*core_params_, root_manager_.get());
}

CELER_ENSURE(core_params_);
CELER_ASSERT(core_params_);
transporters_.resize(this->num_streams());
}

//---------------------------------------------------------------------------//
Expand All @@ -99,33 +100,36 @@ Runner::Runner(RunnerInput const& inp, SPOutputRegistry output)
*
* This will partition the input primaries among all the streams.
*/
auto Runner::operator()(StreamId stream_id) const -> RunnerResult
auto Runner::operator()(RunStreamEvent ids) -> RunnerResult
{
CELER_EXPECT(stream_id < this->num_streams());
CELER_EXPECT(ids.stream < this->num_streams());
CELER_EXPECT(ids.event < this->num_events());

auto transport = [this, stream_id]() -> std::unique_ptr<TransporterBase> {
// Thread-local transporter input
TransporterInput local_trans_inp = *transporter_input_;
local_trans_inp.stream_id = stream_id;
auto& transport = transporters_[ids.stream.get()];
if (!transport)
{
transport = [this, ids]() -> std::unique_ptr<TransporterBase> {
// Thread-local transporter input
TransporterInput local_trans_inp = *transporter_input_;
local_trans_inp.stream_id = ids.stream;

if (use_device_)
{
CELER_VALIDATE(celeritas::device(),
<< "CUDA device is unavailable but GPU run was "
"requested");
return std::make_unique<Transporter<MemSpace::device>>(
std::move(local_trans_inp));
}
else
{
return std::make_unique<Transporter<MemSpace::host>>(
std::move(local_trans_inp));
}
}();
if (use_device_)
{
CELER_VALIDATE(celeritas::device(),
<< "CUDA device is unavailable but GPU run was "
"requested");
return std::make_unique<Transporter<MemSpace::device>>(
std::move(local_trans_inp));
}
else
{
return std::make_unique<Transporter<MemSpace::host>>(
std::move(local_trans_inp));
}
}();
}

// TODO: partition primaries among streams
CELER_ASSERT(stream_id == StreamId{0});
return (*transport)(make_span(primaries_));
return (*transport)(make_span(events_[ids.event.get()]));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backward compatibility (so that we can keep comparing against our old regression results), can you add the ability to transport all events simultaneously? Maybe another operator() with no arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was thinking the same, will do.

}

//---------------------------------------------------------------------------//
Expand All @@ -137,6 +141,15 @@ StreamId::size_type Runner::num_streams() const
return core_params_->max_streams();
}

//---------------------------------------------------------------------------//
/*!
* Total number of events.
*/
size_type Runner::num_events() const
{
return events_.size();
}

//---------------------------------------------------------------------------//
void Runner::setup_globals(RunnerInput const& inp) const
{
Expand Down Expand Up @@ -300,6 +313,12 @@ void Runner::build_core_params(RunnerInput const& inp,
return std::make_shared<TrackInitParams>(std::move(input));
}();

// Store the number of simultaneous threads/tasks per process
params.max_streams = get_num_streams();
CELER_VALIDATE(inp.mctruth_filename.empty() || params.max_streams == 1,
<< "MC truth output is only supported with a single "
"stream.");

core_params_ = std::make_shared<CoreParams>(std::move(params));
}

Expand All @@ -323,11 +342,11 @@ void Runner::build_transporter_input(RunnerInput const& inp)

//---------------------------------------------------------------------------//
/*!
* Construct on all threads from a JSON input and shared output manager.
* Read events from a HepMC3 file or build using a primary generator.
*/
void Runner::build_primaries(RunnerInput const& inp)
void Runner::build_events(RunnerInput const& inp)
{
ScopedMem record_mem("Runner.build_primaries");
ScopedMem record_mem("Runner.build_events");
if (inp.primary_gen_options)
{
std::mt19937 rng;
Expand All @@ -336,7 +355,7 @@ void Runner::build_primaries(RunnerInput const& inp)
auto event = generate_event(rng);
while (!event.empty())
{
primaries_.insert(primaries_.end(), event.begin(), event.end());
events_.push_back(event);
event = generate_event(rng);
}
}
Expand All @@ -347,7 +366,7 @@ void Runner::build_primaries(RunnerInput const& inp)
auto event = read_event();
while (!event.empty())
{
primaries_.insert(primaries_.end(), event.begin(), event.end());
events_.push_back(event);
event = read_event();
}
}
Expand Down Expand Up @@ -429,5 +448,31 @@ void Runner::build_diagnostics(RunnerInput const& inp)
}
}

//---------------------------------------------------------------------------//
/*!
* Get the number of streams from the OMP_NUM_THREADS environment variable.
*
* The value of OMP_NUM_THREADS should be a list of positive integers, each of
* which sets the number of threads for the parallel region at the
* corresponding nested level. The number of streams is set to the first value
* in the list.
*
* \note For a multithreaded CPU run, if OMP_NUM_THREADS is set to a single
* value, the number of threads for each nested parallel region will be set to
* that value.
*/
int get_num_streams()
{
std::string const& nt_str = celeritas::getenv("OMP_NUM_THREADS");
if (!nt_str.empty())
{
auto num_threads = std::stoi(nt_str);
CELER_VALIDATE(num_threads > 0,
<< "nonpositive num_streams=" << num_threads);
return num_threads;
}
return 1;
}

//---------------------------------------------------------------------------//
} // namespace demo_loop
73 changes: 37 additions & 36 deletions app/demo-loop/Runner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
#pragma once

#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "corecel/Types.hh"
#include "corecel/sys/ThreadId.hh"
#include "celeritas/phys/Primary.hh"

#include "Transporter.hh"

namespace celeritas
{
class CoreParams;
Expand All @@ -29,47 +29,23 @@ namespace demo_loop
{
//---------------------------------------------------------------------------//
struct RunnerInput;
struct TransporterInput;

//---------------------------------------------------------------------------//
/*!
* Simulation timing results.
*
* TODO: maybe a timer diagnostic class could help out here?
* or another OutputRegistry.
*/
struct RunTimingResult
{
using real_type = celeritas::real_type;
using VecReal = std::vector<real_type>;
using MapStrReal = std::unordered_map<std::string, real_type>;

VecReal steps; //!< Real time per step
real_type total{}; //!< Total simulation time
real_type setup{}; //!< One-time initialization cost
MapStrReal actions{}; //!< Accumulated action timing
};

//---------------------------------------------------------------------------//
/*!
* Tallied result and timing from transporting a set of primaries.
*
* TODO: these should be migrated to OutputInterface classes.
* Results from transporting all events.
*/
struct RunnerResult
struct SimulationResult
{
//!@{
//! \name Type aliases
using size_type = celeritas::size_type;
using VecCount = std::vector<size_type>;
using real_type = celeritas::real_type;
//!@}

//// DATA ////

VecCount initializers; //!< Num starting track initializers
VecCount active; //!< Num tracks active at beginning of step
VecCount alive; //!< Num living tracks at end of step
RunTimingResult time; //!< Timing information
real_type total_time{}; //!< Total simulation time
real_type setup_time{}; //!< One-time initialization cost
std::vector<TransporterResult> events; //< Results tallied for each event
};

//---------------------------------------------------------------------------//
Expand All @@ -84,32 +60,51 @@ class Runner
public:
//!@{
//! \name Type aliases
using EventId = celeritas::EventId;
using StreamId = celeritas::StreamId;
using size_type = celeritas::size_type;
using Input = RunnerInput;
using RunnerResult = TransporterResult;
using SPOutputRegistry = std::shared_ptr<celeritas::OutputRegistry>;
//!@}

//! ID of the stream and event to be run
struct RunStreamEvent
{
StreamId stream{};
EventId event{};
};

public:
// Construct on all threads from a JSON input and shared output manager
Runner(RunnerInput const& inp, SPOutputRegistry output);

// Run on a single stream/thread, returning the transport result
RunnerResult operator()(StreamId s) const;
RunnerResult operator()(RunStreamEvent);
sethrj marked this conversation as resolved.
Show resolved Hide resolved

// Number of streams supported
StreamId::size_type num_streams() const;

// Total number of events
size_type num_events() const;

private:
//// TYPES ////

using UPTransporterBase = std::unique_ptr<TransporterBase>;
using VecEvent = std::vector<std::vector<celeritas::Primary>>;

//// DATA ////

std::shared_ptr<celeritas::CoreParams> core_params_;
std::shared_ptr<celeritas::RootFileManager> root_manager_;
std::shared_ptr<celeritas::StepCollector> step_collector_;

// Transporter inputs
// Transporter inputs and stream-local transporters
bool use_device_{};
std::shared_ptr<TransporterInput> transporter_input_;
std::vector<celeritas::Primary> primaries_;
VecEvent events_;
std::vector<UPTransporterBase> transporters_;

//// HELPER FUNCTIONS ////

Expand All @@ -118,8 +113,14 @@ class Runner
void build_step_collectors(RunnerInput const&);
void build_diagnostics(RunnerInput const&);
void build_transporter_input(RunnerInput const&);
void build_primaries(RunnerInput const&);
void build_events(RunnerInput const&);
};

//---------------------------------------------------------------------------//
// FREE FUNCTIONS
//---------------------------------------------------------------------------//
// Get the number of streams from the OMP_NUM_THREADS environment variable
int get_num_streams();

//---------------------------------------------------------------------------//
} // namespace demo_loop
Loading