diff --git a/src/quo-vadis-pthread.cc b/src/quo-vadis-pthread.cc index 932d813..e05ec8d 100644 --- a/src/quo-vadis-pthread.cc +++ b/src/quo-vadis-pthread.cc @@ -129,7 +129,7 @@ qv_pthread_scopes_free( return QV_ERR_INVLD_ARG; } try { - qvi_scope_thfree(&scopes, nscopes); + qvi_scope_thdelete(&scopes, nscopes); return QV_SUCCESS; } qvi_catch_and_return(); diff --git a/src/qvi-bbuff-rmi.h b/src/qvi-bbuff-rmi.h index e5f9fcb..e27d682 100644 --- a/src/qvi-bbuff-rmi.h +++ b/src/qvi-bbuff-rmi.h @@ -30,7 +30,7 @@ #define QVI_BBUFF_RMI_H #include "qvi-common.h" -#include "qvi-bbuff.h" +#include "qvi-bbuff.h" // IWYU pragma: keep #include "qvi-hwloc.h" #include "qvi-hwpool.h" @@ -299,7 +299,7 @@ qvi_bbuff_rmi_pack_item( qvi_bbuff_t *buff, size_t data ) { - return qvi_bbuff_append(buff, &data, sizeof(data)); + return buff->append(&data, sizeof(data)); } /** @@ -310,7 +310,7 @@ qvi_bbuff_rmi_pack_item( qvi_bbuff_t *buff, int data ) { - return qvi_bbuff_append(buff, &data, sizeof(data)); + return buff->append(&data, sizeof(data)); } /** @@ -322,7 +322,7 @@ qvi_bbuff_rmi_pack_item( qv_scope_create_hints_t data ) { const int dai = (int)data; - return qvi_bbuff_append(buff, &dai, sizeof(dai)); + return buff->append(&dai, sizeof(dai)); } /** @@ -334,7 +334,7 @@ qvi_bbuff_rmi_pack_item( qv_hw_obj_type_t data ) { const int dai = (int)data; - return qvi_bbuff_append(buff, &dai, sizeof(dai)); + return buff->append(&dai, sizeof(dai)); } /** @@ -346,7 +346,7 @@ qvi_bbuff_rmi_pack_item( qv_device_id_type_t data ) { const int dai = (int)data; - return qvi_bbuff_append(buff, &dai, sizeof(dai)); + return buff->append(&dai, sizeof(dai)); } /** @@ -358,7 +358,7 @@ qvi_bbuff_rmi_pack_item( qv_scope_intrinsic_t data ) { const int dai = (int)data; - return qvi_bbuff_append(buff, &dai, sizeof(dai)); + return buff->append(&dai, sizeof(dai)); } #if QVI_SIZEOF_INT != QVI_SIZEOF_PID_T @@ -371,7 +371,7 @@ qvi_bbuff_rmi_pack_item( pid_t data ) { const int dai = (int)data; - return qvi_bbuff_append(buff, &dai, sizeof(dai)); + return buff->append(&dai, sizeof(dai)); } #endif @@ -380,7 +380,7 @@ qvi_bbuff_rmi_pack_item_impl( qvi_bbuff_t *buff, cstr_t data ) { - return qvi_bbuff_append(buff, data, strlen(data) + 1); + return buff->append(data, strlen(data) + 1); } /** @@ -438,9 +438,9 @@ qvi_bbuff_rmi_pack_item( // We store size then data so unpack has an easier time, but keep // the user interface order as data then size. size_t dsize = data.second; - const int rc = qvi_bbuff_append(buff, &dsize, sizeof(dsize)); - if (rc != QV_SUCCESS) return rc; - return qvi_bbuff_append(buff, data.first, dsize); + const int rc = buff->append(&dsize, sizeof(dsize)); + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; + return buff->append(data.first, dsize); } /** @@ -453,8 +453,8 @@ qvi_bbuff_rmi_pack_item_impl( ) { // Protect against null data. if (qvi_unlikely(!data)) { - return qvi_bbuff_append( - buff, QV_BUFF_RMI_NULL_CPUSET, + return buff->append( + QV_BUFF_RMI_NULL_CPUSET, strlen(QV_BUFF_RMI_NULL_CPUSET) + 1 ); } @@ -463,7 +463,7 @@ qvi_bbuff_rmi_pack_item_impl( int rc = qvi_hwloc_bitmap_asprintf(data, &datas); if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // We are sending the string representation of the cpuset. - rc = qvi_bbuff_append(buff, datas, strlen(datas) + 1); + rc = buff->append(datas, strlen(datas) + 1); free(datas); return rc; } @@ -511,7 +511,7 @@ qvi_bbuff_rmi_pack_item( ) { // Pack hints. const int rc = qvi_bbuff_rmi_pack_item(buff, data.hints); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; return qvi_bbuff_rmi_pack_item(buff, data.cpuset); } @@ -524,25 +524,24 @@ qvi_bbuff_rmi_pack_item( qvi_bbuff_t *buff, qvi_hwpool_dev_s *data ) { + // TODO(skg) Move to device code. // Pack device hints. int rc = qvi_bbuff_rmi_pack_item(buff, data->hints); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Pack device affinity. rc = qvi_bbuff_rmi_pack_item(buff, data->affinity); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Pack device type. rc = qvi_bbuff_rmi_pack_item(buff, data->type); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Pack device ID. rc = qvi_bbuff_rmi_pack_item(buff, data->m_id); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Pack device PCI bus ID. rc = qvi_bbuff_rmi_pack_item(buff, data->pci_bus_id); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Pack device UUID. - rc = qvi_bbuff_rmi_pack_item(buff, data->uuid); - if (rc != QV_SUCCESS) return rc; - return rc; + return qvi_bbuff_rmi_pack_item(buff, data->uuid); } /** @@ -583,7 +582,7 @@ qvi_bbuff_rmi_pack( Types&&... args ) { const int rc = qvi_bbuff_rmi_pack_item(buff, std::forward(arg)); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; return qvi_bbuff_rmi_pack(buff, std::forward(args)...); } @@ -871,6 +870,7 @@ qvi_bbuff_rmi_unpack_item( byte_t *buffpos, size_t *bytes_written ) { + // TODO(skg) Move to dev code. size_t bw = 0, total_bw = 0; int rc = qvi_bbuff_rmi_unpack_item( @@ -957,7 +957,7 @@ qvi_bbuff_rmi_unpack( (byte_t *)data, &bytes_written ); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; pos += bytes_written; return qvi_bbuff_rmi_unpack(pos, std::forward(args)...); } diff --git a/src/qvi-bbuff.cc b/src/qvi-bbuff.cc index 8271407..bf5fff7 100644 --- a/src/qvi-bbuff.cc +++ b/src/qvi-bbuff.cc @@ -17,98 +17,84 @@ #include "qvi-bbuff.h" #include "qvi-utils.h" -struct qvi_bbuff_s { - /** Minimum growth in bytes for resizes, etc. */ - static constexpr size_t min_growth = 256; - /** Current capacity of buffer. */ - size_t capacity = 0; - /** Amount of data already stored. */ - size_t size = 0; - /** Pointer to data backing store. */ - void *data = nullptr; - /** Constructor. */ - qvi_bbuff_s(void) - { - capacity = min_growth; - data = calloc(capacity, sizeof(byte_t)); - if (qvi_unlikely(!data)) throw qvi_runtime_error(); - } - /** Copy constructor. */ - qvi_bbuff_s( - const qvi_bbuff_s &src - ) : qvi_bbuff_s() - { - const int rc = qvi_bbuff_append(this, src.data, src.size); - if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error(); - } - /** Destructor. */ - ~qvi_bbuff_s(void) - { - if (data) free(data); - } -}; - -int -qvi_bbuff_new( - qvi_bbuff_t **buff -) { - return qvi_new(buff); +qvi_bbuff_s::qvi_bbuff_s(void) +{ + m_capacity = s_min_growth; + m_data = calloc(m_capacity, sizeof(byte_t)); + if (qvi_unlikely(!m_data)) throw qvi_runtime_error(); } -int -qvi_bbuff_dup( - const qvi_bbuff_t *const src, - qvi_bbuff_t **buff -) { - return qvi_dup(*src, buff); +qvi_bbuff_s::qvi_bbuff_s( + const qvi_bbuff_s &src +) : qvi_bbuff_s() +{ + const int rc = append(src.m_data, src.m_size); + if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error(); } -void -qvi_bbuff_delete( - qvi_bbuff_t **buff -) { - qvi_delete(buff); +qvi_bbuff_s::~qvi_bbuff_s(void) +{ + if (m_data) free(m_data); } -void * -qvi_bbuff_data( - qvi_bbuff_t *buff -) { - return buff->data; +size_t +qvi_bbuff_s::size(void) const +{ + return m_size; } -size_t -qvi_bbuff_size( - const qvi_bbuff_t *buff -) { - return buff->size; +void * +qvi_bbuff_s::data(void) +{ + return m_data; } int -qvi_bbuff_append( - qvi_bbuff_t *buff, +qvi_bbuff_s::append( const void *const data, size_t size ) { - const size_t req_capacity = size + buff->size; - if (req_capacity > buff->capacity) { + const size_t req_capacity = size + m_size; + if (req_capacity > m_capacity) { // New capacity. - const size_t new_capacity = req_capacity + buff->min_growth; + const size_t new_capacity = req_capacity + s_min_growth; void *new_data = calloc(new_capacity, sizeof(byte_t)); if (qvi_unlikely(!new_data)) return QV_ERR_OOR; // Memory allocation successful. - memmove(new_data, buff->data, buff->size); - free(buff->data); - buff->capacity = new_capacity; - buff->data = new_data; + memmove(new_data, m_data, m_size); + free(m_data); + m_capacity = new_capacity; + m_data = new_data; } - byte_t *dest = (byte_t *)buff->data; - dest += buff->size; + byte_t *dest = (byte_t *)m_data; + dest += m_size; memmove(dest, data, size); - buff->size += size; + m_size += size; return QV_SUCCESS; } +int +qvi_bbuff_new( + qvi_bbuff_t **buff +) { + return qvi_new(buff); +} + +int +qvi_bbuff_dup( + const qvi_bbuff_t &src, + qvi_bbuff_t **buff +) { + return qvi_dup(src, buff); +} + +void +qvi_bbuff_delete( + qvi_bbuff_t **buff +) { + qvi_delete(buff); +} + /* * vim: ft=cpp ts=4 sts=4 sw=4 expandtab */ diff --git a/src/qvi-bbuff.h b/src/qvi-bbuff.h index 67627ad..c6e0814 100644 --- a/src/qvi-bbuff.h +++ b/src/qvi-bbuff.h @@ -13,7 +13,7 @@ /** * @file qvi-bbuff.h * - * Base byte buffer infrastructure. + * Byte buffer infrastructure. */ #ifndef QVI_BBUFF_H @@ -21,9 +21,42 @@ #include "qvi-common.h" -/** - * - */ +struct qvi_bbuff_s { +private: + /** Minimum growth in bytes for resizes, etc. */ + static constexpr size_t s_min_growth = 256; + /** Current capacity of buffer. */ + size_t m_capacity = 0; + /** Amount of data already stored. */ + size_t m_size = 0; + /** Pointer to data backing store. */ + void *m_data = nullptr; +public: + /** Constructor. */ + qvi_bbuff_s(void); + /** Copy constructor. */ + qvi_bbuff_s( + const qvi_bbuff_s &src + ); + /** Destructor. */ + ~qvi_bbuff_s(void); + /** Returns the size of the data stored in the byte buffer. */ + size_t + size(void) const; + /** Appends data to the buffer. */ + int + append( + const void *const data, + size_t size + ); + /** + * Returns a raw pointer to the flat data buffer + * maintained internally by the byte buffer. + */ + void * + data(void); +}; + int qvi_bbuff_new( qvi_bbuff_t **buff @@ -31,44 +64,15 @@ qvi_bbuff_new( int qvi_bbuff_dup( - const qvi_bbuff_t *const src, + const qvi_bbuff_t &src, qvi_bbuff_t **buff ); -/** - * - */ void qvi_bbuff_delete( qvi_bbuff_t **buff ); -/** - * - */ -void * -qvi_bbuff_data( - qvi_bbuff_t *buff -); - -/** - * - */ -size_t -qvi_bbuff_size( - const qvi_bbuff_t *buff -); - -/** - * - */ -int -qvi_bbuff_append( - qvi_bbuff_t *buff, - const void *const data, - size_t size -); - #endif /* diff --git a/src/qvi-mpi.cc b/src/qvi-mpi.cc index 89446cb..b20ff5d 100644 --- a/src/qvi-mpi.cc +++ b/src/qvi-mpi.cc @@ -366,7 +366,7 @@ qvi_mpi_group_gather_bbuffs( bool *shared_alloc, qvi_bbuff_t ***rxbuffs ) { - const int send_count = (int)qvi_bbuff_size(txbuff); + const int send_count = (int)txbuff->size(); const int group_id = group->qvcomm.rank; const int group_size = group->qvcomm.size; @@ -401,7 +401,7 @@ qvi_mpi_group_gather_bbuffs( } mpirc = MPI_Gatherv( - qvi_bbuff_data(txbuff), send_count, MPI_UINT8_T, + txbuff->data(), send_count, MPI_UINT8_T, allbytes.data(), rxcounts.data(), displs.data(), MPI_UINT8_T, root, group->qvcomm.mpi_comm ); @@ -412,13 +412,13 @@ qvi_mpi_group_gather_bbuffs( // Root creates new buffers from data gathered from each participant. if (group_id == root) { // Zero initialize array of pointers to nullptr. - bbuffs = new qvi_bbuff_t*[group_size](); + bbuffs = new qvi_bbuff_t *[group_size](); // TODO(skg) Use dup. byte_t *bytepos = allbytes.data(); for (int i = 0; i < group_size; ++i) { rc = qvi_bbuff_new(&bbuffs[i]); if (rc != QV_SUCCESS) break; - rc = qvi_bbuff_append(bbuffs[i], bytepos, rxcounts[i]); + rc = bbuffs[i]->append(bytepos, rxcounts[i]); if (rc != QV_SUCCESS) break; bytepos += rxcounts[i]; } @@ -459,7 +459,7 @@ qvi_mpi_group_scatter_bbuffs( displs.resize(group_size); for (int i = 0; i < group_size; ++i) { - txcounts[i] = (int)qvi_bbuff_size(txbuffs[i]); + txcounts[i] = (int)txbuffs[i]->size(); displs[i] = total_bytes; total_bytes += txcounts[i]; } @@ -468,7 +468,7 @@ qvi_mpi_group_scatter_bbuffs( // Copy buffer data into flattened buffer. byte_t *bytepos = txbytes.data(); for (int i = 0; i < group_size; ++i) { - memmove(bytepos, qvi_bbuff_data(txbuffs[i]), txcounts[i]); + memmove(bytepos, txbuffs[i]->data(), txcounts[i]); bytepos += txcounts[i]; } } @@ -496,7 +496,7 @@ qvi_mpi_group_scatter_bbuffs( // Everyone creates new buffers from data received from root. rc = qvi_bbuff_new(&mybbuff); if (rc != QV_SUCCESS) goto out; - rc = qvi_bbuff_append(mybbuff, mybytes.data(), rxcount); + rc = mybbuff->append(mybytes.data(), rxcount); out: if (rc != QV_SUCCESS) { qvi_bbuff_delete(&mybbuff); diff --git a/src/qvi-omp.cc b/src/qvi-omp.cc index 8a52508..18d8b69 100644 --- a/src/qvi-omp.cc +++ b/src/qvi-omp.cc @@ -178,7 +178,7 @@ qvi_omp_group_gather_bbuffs( #pragma omp single copyprivate(bbuffs) bbuffs = new qvi_bbuff_t *[group_size](); - const int rc = qvi_bbuff_dup(txbuff, &bbuffs[group_rank]); + const int rc = qvi_bbuff_dup(*txbuff, &bbuffs[group_rank]); // Need to ensure that all threads have contributed to bbuffs. #pragma omp barrier if (rc != QV_SUCCESS) { @@ -211,7 +211,7 @@ qvi_omp_group_scatter_bbuffs( #pragma omp barrier qvi_bbuff_t *inbuff = (*tmp)[group->rank]; qvi_bbuff_t *mybbuff = nullptr; - const int rc = qvi_bbuff_dup(inbuff, &mybbuff); + const int rc = qvi_bbuff_dup(*inbuff, &mybbuff); #pragma omp barrier #pragma omp single delete tmp; diff --git a/src/qvi-process.cc b/src/qvi-process.cc index 87c8868..d898d50 100644 --- a/src/qvi-process.cc +++ b/src/qvi-process.cc @@ -75,7 +75,7 @@ qvi_process_group_gather_bbuffs( // Zero initialize array of pointers to nullptr. qvi_bbuff_t **bbuffs = new qvi_bbuff_t *[group_size](); - const int rc = qvi_bbuff_dup(txbuff, &bbuffs[0]); + const int rc = qvi_bbuff_dup(*txbuff, &bbuffs[0]); if (rc != QV_SUCCESS) { if (bbuffs) { qvi_bbuff_delete(&bbuffs[0]); @@ -103,7 +103,7 @@ qvi_process_group_scatter_bbuffs( } // There should always be only one at the root (us). qvi_bbuff_t *mybbuff = nullptr; - const int rc = qvi_bbuff_dup(txbuffs[root], &mybbuff); + const int rc = qvi_bbuff_dup(*txbuffs[root], &mybbuff); if (rc != QV_SUCCESS) { qvi_bbuff_delete(&mybbuff); } diff --git a/src/qvi-rmi.cc b/src/qvi-rmi.cc index 821f3f8..81e3cf7 100644 --- a/src/qvi-rmi.cc +++ b/src/qvi-rmi.cc @@ -246,7 +246,7 @@ buffer_append_header( #else QVI_UNUSED(picture); #endif - return qvi_bbuff_append(buff, &hdr, sizeof(hdr)); + return buff->append(&hdr, sizeof(hdr)); } static inline void * @@ -274,10 +274,10 @@ zmsg_init_from_bbuff( qvi_bbuff_t *bbuff, zmq_msg_t *zmsg ) { - const size_t buffer_size = qvi_bbuff_size(bbuff); + const size_t buffer_size = bbuff->size(); const int zrc = zmq_msg_init_data( zmsg, - qvi_bbuff_data(bbuff), + bbuff->data(), buffer_size, msg_free_byte_buffer_cb, bbuff @@ -409,7 +409,7 @@ rpc_req( // Cache buffer size here because our call to qvi_bbuff_size() after // zmsg_send() may be invalid because msg_free_byte_buffer_cb() may have // already been called. - buffer_size = (int)qvi_bbuff_size(buff); + buffer_size = (int)buff->size(); int nbytes_sent; qvrc = zmsg_send(zsock, &msg, &nbytes_sent); diff --git a/src/qvi-scope.cc b/src/qvi-scope.cc index 97c5cf2..59524df 100644 --- a/src/qvi-scope.cc +++ b/src/qvi-scope.cc @@ -144,7 +144,7 @@ qvi_scope_delete( } void -qvi_scope_thfree( +qvi_scope_thdelete( qv_scope_t ***kscopes, uint_t k ) { @@ -259,149 +259,6 @@ qvi_scope_bind_string( return rc; } -/** - * Split aggregation: a collection of data relevant to split operations - * requiring aggregated (e.g., global) knowledge to perform a split. - * - * NOTE: since splitting and mapping operations are performed by a single - * process, this structure does not support collective operations that require - * coordination between cooperating tasks. The structure for that is - * qvi_scope_coll_data_s. Typically, collective operations will fill in a - * qvi_scope_split_agg_s, but that isn't a requirement. - */ -struct qvi_scope_split_agg_s { - /** - * A pointer to my RMI. - */ - qvi_rmi_client_t *rmi = nullptr; - /** - * The base hardware pool we are splitting. - */ - qvi_hwpool_s *base_hwpool = nullptr; - /** - * The number of members that are part of the split. - */ - uint_t group_size = 0; - /** - * The number of pieces in the split. - */ - uint_t split_size = 0; - /** - * The potential hardware resource that we are splitting at. QV_HW_OBJ_LAST - * indicates that we are called from a split() context. Any other hardware - * resource type indicates that we are splitting at that type: called from a - * split_at() context. - */ - qv_hw_obj_type_t split_at_type{}; - /** - * Vector of task IDs, one for each member of the group. Note that the - * number of task IDs will always match the group size and that their array - * index corresponds to a task ID. It is handy to have the task IDs for - * splitting so we can query task characteristics during a splitting. - */ - std::vector taskids{}; - /** - * Vector of hardware pools, one for each member of the group. Note that the - * number of hardware pools will always match the group size and that their - * array index corresponds to a task ID: 0 ... group_size - 1. - */ - std::vector hwpools{}; - /** - * Vector of colors, one for each member of the group. Note that the number - * of colors will always match the group size and that their array index - * corresponds to a task ID. - */ - std::vector colors{}; - /** - * Vector of task affinities. - */ - qvi_hwloc_cpusets_t affinities{}; - /** - * Constructor. - */ - qvi_scope_split_agg_s(void) = default; - /** - * Destructor - */ - ~qvi_scope_split_agg_s(void) - { - for (auto &hwpool : hwpools) { - qvi_delete(&hwpool); - } - } - /** - * Minimally initializes instance. - */ - int - init( - qvi_rmi_client_t *rmi_a, - qvi_hwpool_s *base_hwpool_a, - uint_t group_size_a, - uint_t split_size_a, - qv_hw_obj_type_t split_at_type_a - ) { - rmi = rmi_a; - base_hwpool = base_hwpool_a; - group_size = group_size_a; - split_size = split_size_a; - split_at_type = split_at_type_a; - // To save memory we don't eagerly resize our vectors to group_size - // since most processes will not use the storage. For example, in the - // collective case the root ID process will be the only one needing - // group_size elements in our vectors. We'll let the call paths enforce - // appropriate vector sizing. - return QV_SUCCESS; - } -}; - -/** - * Collective split structure: a collection of data relevant to split operations - * requiring aggregated resource knowledge AND coordination between tasks in the - * parent scope to perform a split. - */ -struct qvi_scope_split_coll_s { - /** - * The root task ID used for collective operations. - * Note: We use 0 as the root because 0 will always exist. - */ - static constexpr int rootid = 0; - /** - * Points to the parent scope that we are splitting. - */ - qv_scope_t *parent_scope = nullptr; - /** - * My color. - */ - int mycolor = 0; - /** - * Stores group-global split information brought together by collective - * operations across the members in parent_scope. - */ - qvi_scope_split_agg_s gsplit{}; - /** - * Initializes instance. - */ - int - init( - qv_scope_t *parent_scope_a, - uint_t split_size_a, - int mycolor_a, - qv_hw_obj_type_t split_at_type_a - ) { - const int myid = parent_scope_a->group->rank(); - parent_scope = parent_scope_a; - mycolor = mycolor_a; - if (myid == qvi_scope_split_coll_s::rootid) { - const uint_t group_size = parent_scope->group->size(); - gsplit.init( - parent_scope->group->task()->rmi(), parent_scope->hwpool, - group_size, split_size_a, split_at_type_a - ); - } - return QV_SUCCESS; - } -}; - template static int gather_values( @@ -411,27 +268,28 @@ gather_values( std::vector &outvals ) { static_assert(std::is_trivially_copyable::value, ""); - - bool shared = false; const uint_t group_size = group->size(); - qvi_bbuff_t *txbuff = nullptr, **bbuffs = nullptr; + qvi_bbuff_t *txbuff = nullptr; int rc = qvi_bbuff_new(&txbuff); - if (rc != QV_SUCCESS) goto out; - - rc = qvi_bbuff_append( - txbuff, &invalue, sizeof(TYPE) - ); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; + rc = txbuff->append(&invalue, sizeof(TYPE)); + if (qvi_unlikely(rc != QV_SUCCESS)) { + qvi_bbuff_delete(&txbuff); + return rc; + } + // Gather the values to the root. + bool shared = false; + qvi_bbuff_t **bbuffs = nullptr; rc = group->gather(txbuff, root, &shared, &bbuffs); - if (rc != QV_SUCCESS) goto out; - + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; + // The root fills in the output. if (group->rank() == root) { outvals.resize(group_size); // Unpack the values. for (uint_t i = 0; i < group_size; ++i) { - outvals[i] = *(TYPE *)qvi_bbuff_data(bbuffs[i]); + outvals[i] = *(TYPE *)bbuffs[i]->data(); } } out: @@ -444,7 +302,7 @@ gather_values( } } qvi_bbuff_delete(&txbuff); - if (rc != QV_SUCCESS) { + if (qvi_unlikely(rc != QV_SUCCESS)) { // If something went wrong, just zero-initialize the values. outvals = {}; } @@ -458,17 +316,15 @@ gather_hwpools( qvi_hwpool_s *txpool, std::vector &rxpools ) { - bool shared = false; const uint_t group_size = group->size(); - qvi_bbuff_t *txbuff = nullptr, **bbuffs = nullptr; - - int rc = qvi_bbuff_new(&txbuff); - if (rc != QV_SUCCESS) goto out; - - rc = txpool->pack(txbuff); - if (rc != QV_SUCCESS) goto out; - - rc = group->gather(txbuff, root, &shared, &bbuffs); + // Pack the hardware pool into a buffer. + qvi_bbuff_t txbuff; + int rc = txpool->pack(&txbuff); + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; + // Gather the values to the root. + bool shared = false; + qvi_bbuff_t **bbuffs = nullptr; + rc = group->gather(&txbuff, root, &shared, &bbuffs); if (rc != QV_SUCCESS) goto out; if (group->rank() == root) { @@ -476,9 +332,9 @@ gather_hwpools( // Unpack the hwpools. for (uint_t i = 0; i < group_size; ++i) { rc = qvi_bbuff_rmi_unpack( - qvi_bbuff_data(bbuffs[i]), &rxpools[i] + bbuffs[i]->data(), &rxpools[i] ); - if (rc != QV_SUCCESS) break; + if (qvi_unlikely(rc != QV_SUCCESS)) break; } } out: @@ -490,7 +346,6 @@ gather_hwpools( delete[] bbuffs; } } - qvi_bbuff_delete(&txbuff); if (rc != QV_SUCCESS) { // If something went wrong, just zero-initialize the pools. rxpools = {}; @@ -509,29 +364,27 @@ scatter_values( static_assert(std::is_trivially_copyable::value, ""); int rc = QV_SUCCESS; - std::vector txbuffs{}; qvi_bbuff_t *rxbuff = nullptr; + std::vector txbuffs(0); if (root == group->rank()) { const uint_t group_size = group->size(); txbuffs.resize(group_size); // Pack the values. for (uint_t i = 0; i < group_size; ++i) { rc = qvi_bbuff_new(&txbuffs[i]); - if (rc != QV_SUCCESS) break; + if (qvi_unlikely(rc != QV_SUCCESS)) break; - rc = qvi_bbuff_append( - txbuffs[i], &values[i], sizeof(TYPE) - ); - if (rc != QV_SUCCESS) break; + rc = txbuffs[i]->append(&values[i], sizeof(TYPE)); + if (qvi_unlikely(rc != QV_SUCCESS)) break; } - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; } rc = group->scatter(txbuffs.data(), root, &rxbuff); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; - *value = *(TYPE *)qvi_bbuff_data(rxbuff); + *value = *(TYPE *)rxbuff->data(); out: for (auto &buff : txbuffs) { qvi_bbuff_delete(&buff); @@ -552,7 +405,7 @@ scatter_hwpools( qvi_hwpool_s **pool ) { int rc = QV_SUCCESS; - std::vector txbuffs{}; + std::vector txbuffs(0); qvi_bbuff_t *rxbuff = nullptr; if (root == group->rank()) { @@ -572,7 +425,7 @@ scatter_hwpools( rc = group->scatter(txbuffs.data(), root, &rxbuff); if (rc != QV_SUCCESS) goto out; - rc = qvi_bbuff_rmi_unpack(qvi_bbuff_data(rxbuff), pool); + rc = qvi_bbuff_rmi_unpack(rxbuff->data(), pool); out: for (auto &buff : txbuffs) { qvi_bbuff_delete(&buff); @@ -593,8 +446,7 @@ bcast_value( ) { static_assert(std::is_trivially_copyable::value, ""); - std::vector values{}; - + std::vector values; if (root == group->rank()) { values.resize(group->size()); std::fill(values.begin(), values.end(), *value); @@ -602,6 +454,149 @@ bcast_value( return scatter_values(group, root, values, value); } +/** + * Split aggregation: a collection of data relevant to split operations + * requiring aggregated (e.g., global) knowledge to perform a split. + * + * NOTE: since splitting and mapping operations are performed by a single + * process, this structure does not support collective operations that require + * coordination between cooperating tasks. The structure for that is + * qvi_scope_coll_data_s. Typically, collective operations will fill in a + * qvi_scope_split_agg_s, but that isn't a requirement. + */ +struct qvi_scope_split_agg_s { + /** + * A pointer to my RMI. + */ + qvi_rmi_client_t *rmi = nullptr; + /** + * The base hardware pool we are splitting. + */ + qvi_hwpool_s *base_hwpool = nullptr; + /** + * The number of members that are part of the split. + */ + uint_t group_size = 0; + /** + * The number of pieces in the split. + */ + uint_t split_size = 0; + /** + * The potential hardware resource that we are splitting at. QV_HW_OBJ_LAST + * indicates that we are called from a split() context. Any other hardware + * resource type indicates that we are splitting at that type: called from a + * split_at() context. + */ + qv_hw_obj_type_t split_at_type{}; + /** + * Vector of task IDs, one for each member of the group. Note that the + * number of task IDs will always match the group size and that their array + * index corresponds to a task ID. It is handy to have the task IDs for + * splitting so we can query task characteristics during a splitting. + */ + std::vector taskids{}; + /** + * Vector of hardware pools, one for each member of the group. Note that the + * number of hardware pools will always match the group size and that their + * array index corresponds to a task ID: 0 ... group_size - 1. + */ + std::vector hwpools{}; + /** + * Vector of colors, one for each member of the group. Note that the number + * of colors will always match the group size and that their array index + * corresponds to a task ID. + */ + std::vector colors{}; + /** + * Vector of task affinities. + */ + qvi_hwloc_cpusets_t affinities{}; + /** + * Constructor. + */ + qvi_scope_split_agg_s(void) = default; + /** + * Destructor + */ + ~qvi_scope_split_agg_s(void) + { + for (auto &hwpool : hwpools) { + qvi_delete(&hwpool); + } + } + /** + * Minimally initializes instance. + */ + int + init( + qvi_rmi_client_t *rmi_a, + qvi_hwpool_s *base_hwpool_a, + uint_t group_size_a, + uint_t split_size_a, + qv_hw_obj_type_t split_at_type_a + ) { + rmi = rmi_a; + base_hwpool = base_hwpool_a; + group_size = group_size_a; + split_size = split_size_a; + split_at_type = split_at_type_a; + // To save memory we don't eagerly resize our vectors to group_size + // since most processes will not use the storage. For example, in the + // collective case the root ID process will be the only one needing + // group_size elements in our vectors. We'll let the call paths enforce + // appropriate vector sizing. + return QV_SUCCESS; + } +}; + +/** + * Collective split structure: a collection of data relevant to split operations + * requiring aggregated resource knowledge AND coordination between tasks in the + * parent scope to perform a split. + */ +struct qvi_scope_split_coll_s { + /** + * The root task ID used for collective operations. + * Note: We use 0 as the root because 0 will always exist. + */ + static constexpr int rootid = 0; + /** + * Points to the parent scope that we are splitting. + */ + qv_scope_t *parent_scope = nullptr; + /** + * My color. + */ + int mycolor = 0; + /** + * Stores group-global split information brought together by collective + * operations across the members in parent_scope. + */ + qvi_scope_split_agg_s gsplit{}; + /** + * Initializes instance. + */ + int + init( + qv_scope_t *parent_scope_a, + uint_t split_size_a, + int mycolor_a, + qv_hw_obj_type_t split_at_type_a + ) { + const int myid = parent_scope_a->group->rank(); + parent_scope = parent_scope_a; + mycolor = mycolor_a; + if (myid == qvi_scope_split_coll_s::rootid) { + const uint_t group_size = parent_scope->group->size(); + gsplit.init( + parent_scope->group->task()->rmi(), parent_scope->hwpool, + group_size, split_size_a, split_at_type_a + ); + } + return QV_SUCCESS; + } +}; + static int scope_split_coll_gather( qvi_scope_split_coll_s &splitcoll @@ -665,7 +660,7 @@ scope_split_coll_scatter( splitcoll.gsplit.colors, colorp ); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; return scatter_hwpools( splitcoll.parent_scope->group, @@ -686,7 +681,7 @@ qvi_scope_split_agg_cpuset_dup( qvi_hwloc_bitmap_s &result ) { // This shouldn't happen. - if (splitagg.hwpools.size() == 0) qvi_abort(); + assert(splitagg.hwpools.size() != 0); result = splitagg.hwpools[0]->cpuset(); return QV_SUCCESS; @@ -1264,7 +1259,7 @@ qvi_scope_thsplit( ithchildren[i] = child; } if (rc != QV_SUCCESS) { - qvi_scope_thfree(&ithchildren, k); + qvi_scope_thdelete(&ithchildren, k); } else { // Subtract one to account for the parent's diff --git a/src/qvi-scope.h b/src/qvi-scope.h index 1accd91..b0811c2 100644 --- a/src/qvi-scope.h +++ b/src/qvi-scope.h @@ -55,7 +55,7 @@ qvi_scope_delete( * Frees scope resources and container created by qvi_scope_thsplit*. */ void -qvi_scope_thfree( +qvi_scope_thdelete( qv_scope_t ***kscopes, uint_t k );