Skip to content

Commit

Permalink
Fix resetting of CUDA streams when running through accel (#927)
Browse files Browse the repository at this point in the history
* Add output for creating and destroying streas
* Fix accidental destruction of streams when setting device locally
* Implement a hackish way to reset streams at the end of a geant4 run
* Use async copies for detector step collection and track sorting
* Use `activate_device_local` instead of initializing another device
* Require that activate_device be set only once
* Use stream synchronize in action sequence for diagnostics
* Mark stream as possibly unused
  • Loading branch information
sethrj committed Sep 7, 2023
1 parent c95fc54 commit 4ef9c46
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 71 deletions.
57 changes: 23 additions & 34 deletions src/accel/SharedParams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,23 @@ SharedParams::SharedParams(SetupOptions const& options)
ScopedMem record_mem("SharedParams.construct");
ScopedTimeLog scoped_time;

// Initialize device and other "global" data
SharedParams::initialize_device(options);
// Initialize CUDA (CUDA environment variables control the preferred
// device)
celeritas::activate_device();

if (celeritas::device() && CELERITAS_CORE_GEO == CELERITAS_CORE_GEO_VECGEOM)
{
// Heap size must be set before creating VecGeom device instance; and
// let's just set the stack size as well
if (options.cuda_stack_size > 0)
{
celeritas::set_cuda_stack_size(options.cuda_stack_size);
}
if (options.cuda_heap_size > 0)
{
celeritas::set_cuda_heap_size(options.cuda_heap_size);
}
}

// Construct core data
this->initialize_core(options);
Expand All @@ -184,11 +199,9 @@ SharedParams::SharedParams(SetupOptions const& options)
* properties) in single-thread mode has "thread" storage in a multithreaded
* application. It must be initialized on all threads.
*/
void SharedParams::InitializeWorker(SetupOptions const& options)
void SharedParams::InitializeWorker(SetupOptions const&)
{
CELER_LOG_LOCAL(status) << "Initializing worker thread";
ScopedTimeLog scoped_time;
return SharedParams::initialize_device(options);
celeritas::activate_device_local();
}

//---------------------------------------------------------------------------//
Expand All @@ -209,37 +222,13 @@ void SharedParams::Finalize()
CELER_LOG_LOCAL(debug) << "Resetting shared parameters";
*this = {};

CELER_ENSURE(!*this);
}

//---------------------------------------------------------------------------//
/*!
* Initialize GPU device on each thread.
*
* This is thread safe and must be called from every worker thread.
*/
void SharedParams::initialize_device(SetupOptions const& options)
{
if (Device::num_devices() == 0)
if (celeritas::device())
{
// No GPU is enabled so no global initialization is needed
return;
// Reset streams before the static destructor does
celeritas::device().create_streams(0);
}

// Initialize CUDA (you'll need to use CUDA environment variables to
// control the preferred device)
celeritas::activate_device(Device{0});

// Heap size must be set before creating VecGeom device instance; and
// let's just set the stack size as well
if (options.cuda_stack_size > 0)
{
celeritas::set_cuda_stack_size(options.cuda_stack_size);
}
if (options.cuda_heap_size > 0)
{
celeritas::set_cuda_heap_size(options.cuda_heap_size);
}
CELER_ENSURE(!*this);
}

//---------------------------------------------------------------------------//
Expand Down
1 change: 0 additions & 1 deletion src/accel/SharedParams.hh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class SharedParams

//// HELPER FUNCTIONS ////

static void initialize_device(SetupOptions const& options);
void initialize_core(SetupOptions const& options);
void try_output() const;
};
Expand Down
11 changes: 10 additions & 1 deletion src/celeritas/global/detail/ActionSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
#include "corecel/Types.hh"
#include "corecel/cont/EnumArray.hh"
#include "corecel/cont/Range.hh"
#include "corecel/sys/Device.hh"
#include "corecel/sys/ScopedProfiling.hh"
#include "corecel/sys/Stopwatch.hh"
#include "corecel/sys/Stream.hh"
#include "celeritas/global/ActionInterface.hh"

#include "../ActionRegistry.hh"
#include "../CoreState.hh"

namespace celeritas
{
Expand Down Expand Up @@ -90,6 +93,12 @@ void ActionSequence::begin_run(CoreParams const& params, CoreState<M>& state)
template<MemSpace M>
void ActionSequence::execute(CoreParams const& params, CoreState<M>& state)
{
[[maybe_unused]] Stream::StreamT stream = nullptr;
if (M == MemSpace::device && options_.sync)
{
stream = celeritas::device().stream(state.stream_id()).get();
}

ScopedProfiling profile_this{"step"};
if (M == MemSpace::host || options_.sync)
{
Expand All @@ -101,7 +110,7 @@ void ActionSequence::execute(CoreParams const& params, CoreState<M>& state)
actions_[i]->execute(params, state);
if (M == MemSpace::device)
{
CELER_DEVICE_CALL_PREFIX(DeviceSynchronize());
CELER_DEVICE_CALL_PREFIX(StreamSynchronize(stream));
}
accum_time_[i] += get_time();
}
Expand Down
15 changes: 12 additions & 3 deletions src/celeritas/track/detail/TrackSortUtils.cu
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,26 @@ void count_tracks_per_action(
auto start = device_pointer_cast(make_observer(offsets.data()));
thrust::fill(start, start + offsets.size(), ThreadId{});
CELER_DEVICE_CHECK_ERROR();
auto* stream = celeritas::device().stream(states.stream_id).get();
CELER_LAUNCH_KERNEL(tracks_per_action,
celeritas::device().default_block_size(),
states.size(),
celeritas::device().stream(states.stream_id).get(),
stream,
states,
offsets,
states.size(),
order);

Span<ThreadId> sout = out[AllItems<ThreadId, MemSpace::host>{}];
Copier<ThreadId, MemSpace::host> copy_to_host{sout};
copy_to_host(MemSpace::device, offsets);
CELER_DEVICE_CALL_PREFIX(
MemcpyAsync(sout.data(),
offsets.data(),
offsets.size() * sizeof(ThreadId),
CELER_DEVICE_PREFIX(MemcpyDeviceToHost),
stream));

// Copies must be complete before backfilling
CELER_DEVICE_CALL_PREFIX(StreamSynchronize(stream));
backfill_action_count(sout, states.size());
}
}
Expand Down
21 changes: 17 additions & 4 deletions src/celeritas/user/DetectorSteps.cu
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <thrust/execution_policy.h>
#include <thrust/iterator/counting_iterator.h>

#include "corecel/Assert.hh"
#include "corecel/Macros.hh"
#include "corecel/data/Collection.hh"
#include "corecel/data/Copier.hh"
#include "corecel/sys/Device.hh"
Expand Down Expand Up @@ -119,7 +121,10 @@ struct HasDetector

//---------------------------------------------------------------------------//
template<class T>
void copy_field(std::vector<T>* dst, StateRef<T> const& src, size_type num_valid)
void copy_field(std::vector<T>* dst,
StateRef<T> const& src,
size_type num_valid,
Stream::StreamT stream)
{
if (src.empty() || num_valid == 0)
{
Expand All @@ -130,8 +135,12 @@ void copy_field(std::vector<T>* dst, StateRef<T> const& src, size_type num_valid
dst->resize(num_valid);

// Copy all items from valid threads
Copier<T, MemSpace::host> copy{{dst->data(), num_valid}};
copy(MemSpace::device, {src.data().get(), num_valid});
CELER_DEVICE_CALL_PREFIX(
MemcpyAsync(dst->data(),
src.data().get(),
num_valid * sizeof(T),
CELER_DEVICE_PREFIX(MemcpyDeviceToHost),
stream));
}

//---------------------------------------------------------------------------//
Expand Down Expand Up @@ -165,8 +174,9 @@ void copy_steps<MemSpace::device>(
gather_step(state, num_valid);

// Resize and copy if the fields are present
auto* stream = celeritas::device().stream(state.stream_id).get();
#define DS_ASSIGN(FIELD) \
copy_field(&(output->FIELD), state.scratch.FIELD, num_valid)
copy_field(&(output->FIELD), state.scratch.FIELD, num_valid, stream)

DS_ASSIGN(detector);
DS_ASSIGN(track_id);
Expand All @@ -187,6 +197,9 @@ void copy_steps<MemSpace::device>(
DS_ASSIGN(energy_deposition);
#undef DS_ASSIGN

// Copies must be complete before returning
CELER_DEVICE_CALL_PREFIX(StreamSynchronize(stream));

CELER_ENSURE(output->detector.size() == num_valid);
CELER_ENSURE(output->track_id.size() == num_valid);
}
Expand Down
51 changes: 23 additions & 28 deletions src/corecel/sys/Device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@ namespace
{
//---------------------------------------------------------------------------//
// HELPER FUNCTIONS
//---------------------------------------------------------------------------//
std::mutex& device_setter_mutex()
{
static std::mutex m;
return m;
}

//---------------------------------------------------------------------------//
/*!
* Active CUDA device for Celeritas calls on the local process.
Expand All @@ -50,9 +43,9 @@ std::mutex& device_setter_mutex()
* and
* https://github.com/celeritas-project/celeritas/pull/149#discussion_r578000062
*
* We might need to add a "thread_local" annotation corresponding to a
* multithreaded celeritas option. This class will always be thread safe to
* read (if the instance isn't being modified by other threads).
* The device should be *activated* by the main thread, and \c
* activate_device_local should be called on other threads to set up the
* local CUDA context.
*/
Device& global_device()
{
Expand Down Expand Up @@ -142,6 +135,8 @@ Device::Device(int id)
{
CELER_EXPECT(id >= 0 && id < Device::num_devices());

CELER_LOG_LOCAL(debug) << "Constructing device ID " << id;

unsigned int max_threads_per_block = 0;
#if CELER_USE_DEVICE
# if CELERITAS_USE_CUDA
Expand Down Expand Up @@ -225,8 +220,8 @@ Device::Device(int id)
*/
StreamId::size_type Device::num_streams() const
{
CELER_EXPECT(streams_);

if (!streams_)
return 0;
return streams_->size();
}

Expand Down Expand Up @@ -270,19 +265,23 @@ Device const& device()

//---------------------------------------------------------------------------//
/*!
* Activate the given device.
* Activate the global celeritas device.
*
* The given device must be set (true result) unless no device has yet been
* enabled -- this allows \c make_device to create "null" devices
* when CUDA is disabled.
*
* \note This function is thread safe, and even though the global device is
* shared across threads, it should be called from each thread to correctly
* initialize CUDA.
* This function may be called once only, because the global device propagates
* into local states (e.g. where memory is allocated) all over Celeritas.
*/
void activate_device(Device&& device)
{
CELER_EXPECT(device || !global_device());
static std::mutex m;
std::lock_guard<std::mutex> scoped_lock{m};
Device& d = global_device();
CELER_VALIDATE(!d,
<< "celeritas::activate_device may be called only once per "
"application");

if (!device)
return;
Expand All @@ -291,14 +290,8 @@ void activate_device(Device&& device)
<< device.device_id() << " of "
<< Device::num_devices();
ScopedTimeLog scoped_time(&self_logger(), 1.0);
Device& d = global_device();
{
// Lock *after* getting the pointer to the global_device, because
// the global_device function (in debug mode) also uses this lock.
std::lock_guard<std::mutex> scoped_lock{device_setter_mutex()};
CELER_DEVICE_CALL_PREFIX(SetDevice(device.device_id()));
d = std::move(device);
}
CELER_DEVICE_CALL_PREFIX(SetDevice(device.device_id()));
d = std::move(device);

// Call cudaFree to wake up the device, making other timers more accurate
CELER_DEVICE_CALL_PREFIX(Free(nullptr));
Expand Down Expand Up @@ -336,13 +329,15 @@ void activate_device(MpiCommunicator const& comm)
* See
* https://developer.nvidia.com/blog/cuda-pro-tip-always-set-current-device-avoid-multithreading-bugs
*
* \pre activate_device was called to set \c device()
* \pre activate_device was called or no device is intended to be used
*/
void activate_device_local()
{
if (device())
Device& d = global_device();
if (d)
{
CELER_DEVICE_CALL_PREFIX(SetDevice(device().device_id()));
CELER_LOG_LOCAL(debug) << "Activating device " << d.device_id();
CELER_DEVICE_CALL_PREFIX(SetDevice(d.device_id()));
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/corecel/sys/Stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <iostream>

#include "corecel/Assert.hh"
#include "corecel/io/Logger.hh"
#include "celeritas/Types.hh"

namespace celeritas
Expand All @@ -22,6 +23,13 @@ namespace celeritas
Stream::Stream()
{
CELER_DEVICE_CALL_PREFIX(StreamCreate(&stream_));
#if CUDART_VERSION >= 12000
unsigned long long stream_id = -1;
CELER_CUDA_CALL(cudaStreamGetId(stream_, &stream_id));
CELER_LOG_LOCAL(debug) << "Created stream ID " << stream_id;
#else
CELER_LOG_LOCAL(debug) << "Created stream " << static_cast<void*>(stream_);
#endif
}

//---------------------------------------------------------------------------//
Expand All @@ -35,6 +43,8 @@ Stream::~Stream()
try
{
CELER_DEVICE_CALL_PREFIX(StreamDestroy(stream_));
CELER_LOG_LOCAL(debug)
<< "Destroyed stream " << static_cast<void*>(stream_);
}
catch (RuntimeError const& e)
{
Expand Down

0 comments on commit 4ef9c46

Please sign in to comment.