Skip to content

Commit

Permalink
Add option to limit number of buffered beam slices (#1026)
Browse files Browse the repository at this point in the history
* pdf beam init

* add ref ratio in z for pdf

* add option to limit buffered beam slices

* fix assert

* add more detail to doc
  • Loading branch information
AlexanderSinn authored Nov 1, 2023
1 parent 643dc02 commit a8e9a88
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 16 deletions.
13 changes: 13 additions & 0 deletions docs/source/run/parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ General parameters
Setting this option to `1` is necessary to take advantage of GPU-Enabled MPI, however for this
additional enviroment variables need to be set depending on the system.

* ``hipace.comms_buffer_max_leading_slices`` (`int`) optional (default `inf`)
How many slices of beam particles can be received and stored in advance.

* ``hipace.comms_buffer_max_trailing_slices`` (`int`) optional (default `inf`)
How many slices of beam particles can be stored before being sent. Using
``comms_buffer_max_leading_slices`` and ``comms_buffer_max_trailing_slices`` will in principle
limit the amount of asynchronousness in the parallel communication and may thus reduce performance.
However it may be necessary to set these parameters to avoid all slices accumulating on a single
rank that would run out of memory (out of CPU or GPU memory depending on ``hipace.comms_buffer_on_gpu``).
If there are more time steps than ranks, these parameters must be chosen such that between all
ranks there is enough capacity to store every slice to avoid a deadlock, i.e.
:math:`(comms_buffer_max_leading_slices + comms_buffer_max_trailing_slices) * nranks > nslices`.

* ``hipace.do_tiling`` (`bool`) optional (default `true`)
Whether to use tiling, when running on CPU.
Currently, this option only affects plasma operations (gather, push and deposition).
Expand Down
4 changes: 4 additions & 0 deletions src/Hipace.H
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ public:
amrex::ParserExecutor<3> m_salame_target_func;
/** Whether MPI communication buffers should be allocated in device memory */
bool m_comms_buffer_on_gpu = false;
/** How many slices of beam particles can be received in advance */
int m_comms_buffer_max_leading_slices = std::numeric_limits<int>::max();
/** How many slices of beam particles can be stored before being sent */
int m_comms_buffer_max_trailing_slices = std::numeric_limits<int>::max();

private:

Expand Down
13 changes: 12 additions & 1 deletion src/Hipace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,18 @@ Hipace::Hipace () :

queryWithParser(pph, "background_density_SI", m_background_density_SI);
queryWithParser(pph, "comms_buffer_on_gpu", m_comms_buffer_on_gpu);
queryWithParser(pph, "comms_buffer_max_leading_slices", m_comms_buffer_max_leading_slices);
queryWithParser(pph, "comms_buffer_max_trailing_slices", m_comms_buffer_max_trailing_slices);

MakeGeometry();

AMREX_ALWAYS_ASSERT_WITH_MESSAGE(
(((double(m_comms_buffer_max_leading_slices) + m_comms_buffer_max_trailing_slices)
* amrex::ParallelDescriptor::NProcs()) > m_3D_geom[0].Domain().length(2))
|| (m_max_step <= amrex::ParallelDescriptor::NProcs()),
"comms_buffer_max_leading_slices and comms_buffer_max_trailing_slices must be large enough"
" to distribute all slices between all ranks if there are more timesteps than ranks");

m_use_laser = m_multi_laser.m_use_laser;

queryWithParser(pph, "collisions", m_collision_names);
Expand Down Expand Up @@ -203,7 +212,9 @@ Hipace::InitData ()
m_multi_beam.get_nbeams(),
!m_comms_buffer_on_gpu,
m_use_laser,
m_use_laser ? m_multi_laser.getSlices()[0].box() : amrex::Box{});
m_use_laser ? m_multi_laser.getSlices()[0].box() : amrex::Box{},
m_comms_buffer_max_leading_slices,
m_comms_buffer_max_trailing_slices);

amrex::ParmParse pph("hipace");
bool do_output_input = false;
Expand Down
6 changes: 4 additions & 2 deletions src/utils/MultiBuffer.H
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public:

// initialize MultiBuffer and open initial receive requests
void initialize (int nslices, int nbeams, bool buffer_on_host, bool use_laser,
amrex::Box laser_box);
amrex::Box laser_box, int max_leading_slices, int max_trailing_slices);

// receive data from previous rank and unpack it into MultiBeam and MultiLaser
void get_data (int slice, MultiBeam& beams, MultiLaser& laser, int beam_slice);
Expand Down Expand Up @@ -104,6 +104,8 @@ private:
bool m_use_laser = false;
int m_laser_ncomp = 4;
amrex::Box m_laser_slice_box {};
int m_max_leading_slices = std::numeric_limits<int>::max();
int m_max_trailing_slices = std::numeric_limits<int>::max();

// parameters to send physical time
amrex::Real m_time_send_buffer = 0.;
Expand All @@ -123,7 +125,7 @@ private:
void free_buffer (int slice);

// function containing main progress loop to deal with asynchronous MPI requests
void make_progress (int slice, bool is_blocking);
void make_progress (int slice, bool is_blocking, int current_slice);

// write MultiBeam sizes into the metadata array
void write_metadata (int slice, MultiBeam& beams, int beam_slice);
Expand Down
36 changes: 23 additions & 13 deletions src/utils/MultiBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ void MultiBuffer::free_buffer (int slice) {
}

void MultiBuffer::initialize (int nslices, int nbeams, bool buffer_on_host, bool use_laser,
amrex::Box laser_box) {
amrex::Box laser_box, int max_leading_slices,
int max_trailing_slices) {

m_comm = amrex::ParallelDescriptor::Communicator();
const int rank_id = amrex::ParallelDescriptor::MyProc();
Expand All @@ -72,6 +73,9 @@ void MultiBuffer::initialize (int nslices, int nbeams, bool buffer_on_host, bool
m_tag_buffer_start = 1;
m_tag_metadata_start = m_tag_buffer_start + m_nslices;

m_max_leading_slices = max_leading_slices;
m_max_trailing_slices = max_trailing_slices;

for (int p = 0; p < comm_progress::nprogress; ++p) {
m_async_metadata_slice[p] = m_nslices - 1;
m_async_data_slice[p] = m_nslices - 1;
Expand All @@ -96,7 +100,7 @@ void MultiBuffer::initialize (int nslices, int nbeams, bool buffer_on_host, bool

// open initial receives
for (int i = m_nslices-1; i >= 0; --i) {
make_progress(i, false);
make_progress(i, false, m_nslices-1);
}
}

Expand Down Expand Up @@ -168,7 +172,13 @@ MultiBuffer::~MultiBuffer() {
#endif
}

void MultiBuffer::make_progress (int slice, bool is_blocking) {
void MultiBuffer::make_progress (int slice, bool is_blocking, int current_slice) {

const bool is_blocking_send = is_blocking ||
(m_nslices + slice - current_slice) % m_nslices > m_max_trailing_slices;
const bool is_blocking_recv = is_blocking;
const bool skip_recv = !is_blocking_recv &&
(m_nslices - slice + current_slice) % m_nslices > m_max_leading_slices;

if (m_is_serial) {
if (is_blocking) {
Expand Down Expand Up @@ -213,7 +223,7 @@ void MultiBuffer::make_progress (int slice, bool is_blocking) {
}

if (m_datanodes[slice].m_metadata_progress == comm_progress::send_started) {
if (is_blocking) {
if (is_blocking_send) {
MPI_Wait(&(m_datanodes[slice].m_metadata_request), MPI_STATUS_IGNORE);
m_datanodes[slice].m_metadata_progress = comm_progress::sent;
} else {
Expand All @@ -225,7 +235,7 @@ void MultiBuffer::make_progress (int slice, bool is_blocking) {
}
}

if (m_datanodes[slice].m_metadata_progress == comm_progress::sent) {
if (m_datanodes[slice].m_metadata_progress == comm_progress::sent && !skip_recv) {
MPI_Irecv(
get_metadata_location(slice),
get_metadata_size(),
Expand All @@ -238,7 +248,7 @@ void MultiBuffer::make_progress (int slice, bool is_blocking) {
}

if (m_datanodes[slice].m_metadata_progress == comm_progress::receive_started) {
if (is_blocking) {
if (is_blocking_recv) {
MPI_Wait(&(m_datanodes[slice].m_metadata_request), MPI_STATUS_IGNORE);
m_datanodes[slice].m_metadata_progress = comm_progress::received;
} else {
Expand All @@ -251,7 +261,7 @@ void MultiBuffer::make_progress (int slice, bool is_blocking) {
}

if (m_datanodes[slice].m_progress == comm_progress::send_started) {
if (is_blocking) {
if (is_blocking_send) {
MPI_Wait(&(m_datanodes[slice].m_request), MPI_STATUS_IGNORE);
free_buffer(slice);
m_datanodes[slice].m_progress = comm_progress::sent;
Expand Down Expand Up @@ -290,7 +300,7 @@ void MultiBuffer::make_progress (int slice, bool is_blocking) {
}

if (m_datanodes[slice].m_progress == comm_progress::receive_started) {
if (is_blocking) {
if (is_blocking_recv) {
MPI_Wait(&(m_datanodes[slice].m_request), MPI_STATUS_IGNORE);
m_datanodes[slice].m_progress = comm_progress::received;
} else {
Expand All @@ -302,7 +312,7 @@ void MultiBuffer::make_progress (int slice, bool is_blocking) {
}
}

if (is_blocking) {
if (is_blocking_recv) {
AMREX_ALWAYS_ASSERT(m_datanodes[slice].m_metadata_progress == comm_progress::received);
AMREX_ALWAYS_ASSERT(m_datanodes[slice].m_progress == comm_progress::received);
}
Expand All @@ -324,7 +334,7 @@ void MultiBuffer::get_data (int slice, MultiBeam& beams, MultiLaser& laser, int
}
} else {
// receive and unpack buffer
make_progress(slice, true);
make_progress(slice, true, slice);
if (m_datanodes[slice].m_buffer_size != 0) {
unpack_data(slice, beams, laser, beam_slice);
free_buffer(slice);
Expand Down Expand Up @@ -352,7 +362,7 @@ void MultiBuffer::put_data (int slice, MultiBeam& beams, MultiLaser& laser, int
m_datanodes[slice].m_metadata_progress = comm_progress::ready_to_send;
}

make_progress(slice, false);
make_progress(slice, false, slice);

// make asynchronous progress for metadata
// only check slices that have a chance of making progress
Expand Down Expand Up @@ -382,7 +392,7 @@ void MultiBuffer::put_data (int slice, MultiBeam& beams, MultiLaser& laser, int
for (int i = m_async_metadata_slice[p]; i!=slice; (i==0) ? i=m_nslices-1 : --i) {
m_async_metadata_slice[p] = i;
if (m_datanodes[i].m_metadata_progress < p) {
make_progress(i, false);
make_progress(i, false, slice);
}
if (m_datanodes[i].m_metadata_progress < p) {
break;
Expand Down Expand Up @@ -418,7 +428,7 @@ void MultiBuffer::put_data (int slice, MultiBeam& beams, MultiLaser& laser, int
for (int i = m_async_data_slice[p]; i!=slice; (i==0) ? i=m_nslices-1 : --i) {
m_async_data_slice[p] = i;
if (m_datanodes[i].m_progress < p) {
make_progress(i, false);
make_progress(i, false, slice);
}
if (m_datanodes[i].m_progress < p) {
break;
Expand Down

0 comments on commit a8e9a88

Please sign in to comment.