diff --git a/CMakeExt/Threading.cmake b/CMakeExt/Threading.cmake index 70fb22f47..73836e50c 100644 --- a/CMakeExt/Threading.cmake +++ b/CMakeExt/Threading.cmake @@ -14,9 +14,9 @@ if (ENABLE_THREADSUPPORT) if (DART_SYNC_BUILTINS) MESSAGE(STATUS "Found builtin __sync_add_and_fetch") set(CMAKE_C_FLAGS - "${CMAKE_C_FLAGS_RELEASE} -DDART_HAVE_SYNC_BUILTINS") + "${CMAKE_C_FLAGS} -DDART_HAVE_SYNC_BUILTINS") set(CMAKE_CXX_FLAGS - "${CMAKE_CXX_FLAGS_RELEASE} -DDART_HAVE_SYNC_BUILTINS") + "${CMAKE_CXX_FLAGS} -DDART_HAVE_SYNC_BUILTINS") else() # error out for now MESSAGE(STATUS "Compiling builtin __sync_add_and_fetch failed with error " diff --git a/CMakeLists.txt b/CMakeLists.txt index 3081deaaf..2fe91e67b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -99,6 +99,8 @@ option(ENABLE_HDF5 "Specify whether HDF5 features are enabled" on) option(ENABLE_NASTYMPI "Specify whether the NastyMPI proxy should be enabled" off) +option(ENABLE_DART_PROGRESS + "Specify whether DART explicitely triggers progress" off) if (BUILD_COVERAGE_TESTS) set(BUILD_TESTS TRUE CACHE BOOLEAN @@ -264,6 +266,8 @@ message(INFO "Unified RMA memory model: (ENABLE_UNIFIED_MEMORY_MODEL) " ${ENABLE_UNIFIED_MEMORY_MODEL}) message(INFO "MPI shared windows: (ENABLE_SHARED_WINDOWS) " ${ENABLE_SHARED_WINDOWS}) +message(INFO "Explicit DART progress: (ENABLE_DART_PROGRESS) " + ${ENABLE_DART_PROGRESS}) message(INFO "Enable multithreading: (ENABLE_THREADSUPPORT) " ${ENABLE_THREADSUPPORT}) message(INFO "Default index type long: (ENABLE_DEFAULT_INDEX_TYPE_LONG) " diff --git a/build.debug.sh b/build.debug.sh index 6a2cd2345..141d7303e 100755 --- a/build.debug.sh +++ b/build.debug.sh @@ -67,6 +67,7 @@ rm -Rf $BUILD_DIR/* -DENABLE_ASSERTIONS=ON \ \ -DENABLE_SHARED_WINDOWS=ON \ + -DENABLE_DART_PROGRESS=ON \ -DENABLE_UNIFIED_MEMORY_MODEL=ON \ -DENABLE_DEFAULT_INDEX_TYPE_LONG=ON \ \ diff --git a/build.dev.sh b/build.dev.sh index 8637f59f6..dfd89c236 100755 --- a/build.dev.sh +++ b/build.dev.sh @@ -67,6 +67,7 @@ rm -Rf $BUILD_DIR/* -DENABLE_ASSERTIONS=ON \ \ -DENABLE_SHARED_WINDOWS=ON \ + -DENABLE_DART_PROGRESS=ON \ -DENABLE_UNIFIED_MEMORY_MODEL=ON \ -DENABLE_DEFAULT_INDEX_TYPE_LONG=ON \ \ diff --git a/build.minimal.sh b/build.minimal.sh index 4f7be9a5c..aaaf085d5 100755 --- a/build.minimal.sh +++ b/build.minimal.sh @@ -59,6 +59,7 @@ rm -Rf $BUILD_DIR/* -DENABLE_ASSERTIONS=ON \ \ -DENABLE_SHARED_WINDOWS=OFF \ + -DENABLE_DART_PROGRESS=OFF \ -DENABLE_UNIFIED_MEMORY_MODEL=ON \ -DENABLE_DEFAULT_INDEX_TYPE_LONG=OFF \ \ diff --git a/dart-if/include/dash/dart/if/dart_communication.h b/dart-if/include/dash/dart/if/dart_communication.h index 26004704b..4d52e3e2d 100644 --- a/dart-if/include/dash/dart/if/dart_communication.h +++ b/dart-if/include/dash/dart/if/dart_communication.h @@ -26,6 +26,18 @@ extern "C" { #define DART_INTERFACE_ON /** \endcond */ +/* + * Set of flags to be passed to \ref dart_get, \ref dart_get_blocking, and + * \ref dart_get_handle to control their behavior. + */ +typedef enum { + /// Empty flag + DART_FLAG_NONE = 0, + /// Enforce odering of communication operations + DART_FLAG_ORDERED = 1 +} dart_communication_flags_t; + + /** * \name Collective operations * Collective operations involving all units of a given team. @@ -170,8 +182,10 @@ dart_ret_t dart_allgatherv( * * \param sendbuf The buffer containing the data to be sent by each unit. * \param recvbuf The buffer to hold the received data. - * \param nelem Number of elements sent by each process and received from each unit. - * \param dtype The data type of values in \c sendbuf and \c recvbuf to use in \c op. + * \param nelem Number of elements sent by each process and received from + * each unit. + * \param dtype The data type of values in \c sendbuf and \c recvbuf to use + * in \c op. * \param op The reduction operation to perform. * \param team The team to participate in the allreduce. * @@ -192,8 +206,10 @@ dart_ret_t dart_allreduce( * DART Equivalent to MPI_Reduce. * * \param sendbuf Buffer containing \c nelem elements to reduce using \c op. - * \param recvbuf Buffer of size \c nelem to store the result of the element-wise operation \c op in. - * \param nelem The number of elements of type \c dtype in \c sendbuf and \c recvbuf. + * \param recvbuf Buffer of size \c nelem to store the result of the + * element-wise operation \c op in. + * \param nelem The number of elements of type \c dtype in \c sendbuf + * and \c recvbuf. * \param dtype The data type of values stored in \c sendbuf and \c recvbuf. * \param op The reduce operation to perform. * \param root The unit receiving the reduced values. @@ -333,6 +349,8 @@ dart_ret_t dart_compare_and_swap( * \param gptr A global pointer determining the source of the get operation. * \param nelem The number of elements of type \c dtype to transfer. * \param dtype The data type of the values in buffer \c dest. + * \param flags Flags controling the behavior of \c dart_get, + * see \ref dart_communication_flags_t. * * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. * @@ -343,7 +361,8 @@ dart_ret_t dart_get( void * dest, dart_gptr_t gptr, size_t nelem, - dart_datatype_t dtype) DART_NOTHROW; + dart_datatype_t dtype, + int flags) DART_NOTHROW; /** * 'REGULAR' variant of dart_put. @@ -352,6 +371,9 @@ dart_ret_t dart_get( * is guaranteed. A later flush operation is needed to guarantee * local and remote completion. * + * However, a \ref dart_get or \ref dart_get_blocking following this put will + * be guaranteed to read the updated value. + * * \param gptr A global pointer determining the target of the put operation. * \param src The local source buffer to load the data from. * \param nelem The number of elements of type \c dtype to transfer. @@ -370,14 +392,16 @@ dart_ret_t dart_put( /** - * Guarantee completion of all outstanding operations involving a segment on a certain unit + * Guarantee completion of all outstanding operations involving a segment on + * a certain unit. * * Guarantees local and remote completion of all pending puts and * gets on a certain memory allocation / window / segment for the * target unit specified in gptr. * Similar to \c MPI_Win_flush(). * - * \param gptr Global pointer identifying the segment and unit to complete outstanding operations for. + * \param gptr Global pointer identifying the segment and unit to complete + * outstanding operations for. * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. * * \threadsafe @@ -463,7 +487,10 @@ typedef struct dart_handle_struct * dart_handle_t; * \param gptr Global pointer being the source of the data transfer. * \param nelem The number of elements of \c dtype in buffer \c dest. * \param dtype The data type of the values in buffer \c dest. - * \param[out] handle Pointer to DART handle to instantiate for later use with \c dart_wait, \c dart_wait_all etc. + * \param flags Flags controling the behavior of \c dart_get_handle, + * see \ref dart_communication_flags_t. + * \param[out] handle Pointer to DART handle to instantiate for later use + * with \c dart_wait, \c dart_wait_all etc. * * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. * @@ -475,7 +502,8 @@ dart_ret_t dart_get_handle( dart_gptr_t gptr, size_t nelem, dart_datatype_t dtype, - dart_handle_t * handle) DART_NOTHROW; + dart_handle_t * handle, + int flags) DART_NOTHROW; /** * 'HANDLE' variant of dart_put. @@ -483,11 +511,15 @@ dart_ret_t dart_get_handle( * dart_wait*() call or a fence/flush operation is needed to guarantee * completion. * + * The update will not be visible in a following call to \ref dart_get variant + * unless the operation has been completed. + * * \param gptr Global pointer being the target of the data transfer. * \param src Local source memory to transfer data from. * \param nelem The number of elements of type \c dtype to transfer. * \param dtype The data type of the values in buffer \c dest. - * \param[out] handle Pointer to DART handle to instantiate for later use with \c dart_wait, \c dart_wait_all etc. + * \param[out] handle Pointer to DART handle to instantiate for later use with + * \c dart_wait, \c dart_wait_all etc. * * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. * @@ -606,6 +638,8 @@ dart_ret_t dart_testall_local( * \param gptr Global pointer being the source of the data transfer. * \param nelem The number of elements of type \c dtype to transfer. * \param dtype The data type of the values in buffer \c dest. + * \param flags Flags controling the behavior of \c dart_get_blocking, + * see \ref dart_communication_flags_t. * * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. * @@ -616,7 +650,8 @@ dart_ret_t dart_get_blocking( void * dest, dart_gptr_t gptr, size_t nelem, - dart_datatype_t dtype) DART_NOTHROW; + dart_datatype_t dtype, + int flags) DART_NOTHROW; /** * 'BLOCKING' variant of dart_put. diff --git a/dart-if/include/dash/dart/if/dart_globmem.h b/dart-if/include/dash/dart/if/dart_globmem.h index 5969b042d..20ef7ec15 100644 --- a/dart-if/include/dash/dart/if/dart_globmem.h +++ b/dart-if/include/dash/dart/if/dart_globmem.h @@ -322,6 +322,13 @@ dart_ret_t dart_team_memalloc_aligned( dart_datatype_t dtype, dart_gptr_t * gptr) DART_NOTHROW; + +dart_ret_t dart_team_memalloc_aligned_full( + dart_team_t teamid, + size_t nelem, + dart_datatype_t dtype, + dart_gptr_t * gptr) DART_NOTHROW; + /** * Collective function to free global memory previously allocated * using \ref dart_team_memalloc_aligned. diff --git a/dart-impl/mpi/CMakeLists.txt b/dart-impl/mpi/CMakeLists.txt index bc9119560..79514590e 100644 --- a/dart-impl/mpi/CMakeLists.txt +++ b/dart-impl/mpi/CMakeLists.txt @@ -39,6 +39,8 @@ set(ENABLE_PAPI ${ENABLE_PAPI} PARENT_SCOPE) set(ENABLE_HDF5 ${ENABLE_HDF5} PARENT_SCOPE) +set(ENABLE_DART_PROGRESS ${ENABLE_DART_PROGRESS} + PARENT_SCOPE) message(STATUS "MPI include path: " ${MPI_INCLUDE_PATH}) message(STATUS "MPI libraries:") @@ -143,6 +145,11 @@ if (NOT ENABLE_SHARED_WINDOWS) ${ADDITIONAL_COMPILE_FLAGS} -DDART_MPI_DISABLE_SHARED_WINDOWS) endif() +if (ENABLE_DART_PROGRESS) + set(ADDITIONAL_COMPILE_FLAGS + ${ADDITIONAL_COMPILE_FLAGS} -DDART_ENABLE_PROGRESS) +endif() + if(MPI_COMPILE_FLAGS) set (ADDITIONAL_COMPILE_FLAGS ${ADDITIONAL_COMPILE_FLAGS} ${MPI_COMPILE_FLAGS}) diff --git a/dart-impl/mpi/include/dash/dart/mpi/dart_globmem_priv.h b/dart-impl/mpi/include/dash/dart/mpi/dart_globmem_priv.h index 14b5efb13..ab8e1f4fe 100644 --- a/dart-impl/mpi/include/dash/dart/mpi/dart_globmem_priv.h +++ b/dart-impl/mpi/include/dash/dart/mpi/dart_globmem_priv.h @@ -6,8 +6,5 @@ /* Global object for one-sided communication on memory region allocated with 'local allocation'. */ extern MPI_Win dart_win_local_alloc DART_INTERNAL; -#if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) -extern MPI_Win dart_sharedmem_win_local_alloc DART_INTERNAL; -#endif #endif /* DART__MPI__DART_GLOBMEM_PRIV_H__ */ diff --git a/dart-impl/mpi/include/dash/dart/mpi/dart_segment.h b/dart-impl/mpi/include/dash/dart/mpi/dart_segment.h index ec99e9704..ae46b3b27 100644 --- a/dart-impl/mpi/include/dash/dart/mpi/dart_segment.h +++ b/dart-impl/mpi/include/dash/dart/mpi/dart_segment.h @@ -8,6 +8,8 @@ #ifndef DART_SEGMENT_H_ #define DART_SEGMENT_H_ #include +#include +#include #include #include @@ -18,13 +20,16 @@ typedef int16_t dart_segid_t; typedef struct { - dart_segid_t segid; /* seg_id determines a global pointer uniquely */ size_t size; - MPI_Aint * disp; /* address set of memory location of all units in certain team. */ - char ** baseptr; - char * selfbaseptr; - MPI_Win win; - uint16_t flags; + MPI_Aint * disp; /* offsets at all units in the team */ + char ** baseptr; /* baseptr of all units in the sharedmem group */ + char * selfbaseptr; /* baseptr of the current unit */ + MPI_Win shmwin; /* sharedmem window */ + MPI_Win win; /* window used to access this segment */ + uint16_t flags; /* 16 bit flags */ + dart_segid_t segid; /* ID of the segment, globally unique in a team */ + bool dirty; /* whether the segment has pending writes */ + bool isshm; /* whether this is a shared memory segment */ } dart_segment_info_t; // forward declaration to make the compiler happy @@ -50,10 +55,60 @@ typedef struct { } dart_segmentdata_t; typedef enum { + DART_SEGMENT_LOCAL_ALLOC, DART_SEGMENT_ALLOC, DART_SEGMENT_REGISTER } dart_segment_type; +struct dart_seghash_elem { + dart_seghash_elem_t *next; + dart_segment_info_t data; +}; + +static inline int hash_segid(dart_segid_t segid) +{ + /* Simply use the lower bits of the segment ID. + * Since segment IDs are allocated continuously, this is likely to cause + * collisions starting at (segment number == DART_SEGMENT_HASH_SIZE) + * TODO: come up with a random distribution to account for random free'd + * segments? + * */ + return (abs(segid) % DART_SEGMENT_HASH_SIZE); +} + +static inline +MPI_Aint +dart_segment_disp(dart_segment_info_t *seginfo, dart_team_unit_t team_unit_id) +{ + return (seginfo->disp != NULL) ? seginfo->disp[team_unit_id.id] : 0; +} + +/** + * Returns the segment info for the segment with ID \c segid. + */ +static inline dart_segment_info_t * dart_segment_get_info( + dart_segmentdata_t *segdata, + dart_segid_t segid) +{ + int slot = hash_segid(segid); + dart_seghash_elem_t *elem = segdata->hashtab[slot]; + + while (elem != NULL) { + if (elem->data.segid == segid) { + break; + } + elem = elem->next; + } + +// if (elem == NULL) { +// DART_LOG_ERROR("dart_segment__get_segment : " +// "Invalid segment ID %i on team %i", +// segid, segdata->team_id); +// return NULL; +// } + + return &(elem->data); +} /** * Initialize the segment data hash table. @@ -80,6 +135,13 @@ dart_segment_register( dart_segmentdata_t *segdata, dart_segment_info_t *seg) DART_INTERNAL; +/** + * Returns the segment info for the segment with ID \c segid. + */ +//dart_segment_info_t * dart_segment_get_info( +// dart_segmentdata_t *segdata, +// dart_segid_t segid) DART_INTERNAL; + #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) @@ -92,7 +154,7 @@ dart_segment_register( * \retval non-negative integer Search successfully. * \retval negative integer Failure. */ -dart_ret_t dart_segment_get_win( +dart_ret_t dart_segment_get_shmwin( dart_segmentdata_t * segdata, int16_t seg_id, MPI_Win * win) DART_INTERNAL; @@ -104,6 +166,16 @@ dart_ret_t dart_segment_get_baseptr( char ** baseptr_s) DART_INTERNAL; #endif +dart_ret_t dart_segment_get_dirty( + dart_segmentdata_t * segdata, + int16_t segid, + bool * dirty) DART_INTERNAL; + +dart_ret_t dart_segment_set_dirty( + dart_segmentdata_t * segdata, + int16_t segid, + bool dirty) DART_INTERNAL; + dart_ret_t dart_segment_get_selfbaseptr( dart_segmentdata_t * segdata, int16_t seg_id, diff --git a/dart-impl/mpi/include/dash/dart/mpi/dart_team_private.h b/dart-impl/mpi/include/dash/dart/mpi/dart_team_private.h index 19a3bc036..082cd6b33 100644 --- a/dart-impl/mpi/include/dash/dart/mpi/dart_team_private.h +++ b/dart-impl/mpi/include/dash/dart/mpi/dart_team_private.h @@ -194,11 +194,6 @@ typedef struct dart_team_data { } dart_team_data_t; - -#if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) - -extern char* *dart_sharedmem_local_baseptr_set DART_INTERNAL; -#endif /* @brief Initiate the free-team-list and allocated-team-list. * * This call will be invoked within dart_init(), and the free teamlist consist of diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index c2525fd5f..27a33cd18 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -29,6 +30,14 @@ #include #include +#define CHECK_MPI_RET(__call, __name) \ + do { \ + if (dart__unlikely(__call != MPI_SUCCESS)) { \ + DART_LOG_ERROR("%s ! %s failed!", __func__, __name); \ + return DART_ERR_OTHER; \ + } \ + } while (0) + int dart__mpi__datatype_sizes[DART_TYPE_COUNT]; dart_ret_t @@ -46,80 +55,116 @@ dart__mpi__datatype_init() return DART_OK; } + #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) static dart_ret_t get_shared_mem( - dart_team_data_t * team_data, - void * dest, - dart_gptr_t gptr, - size_t nelem, - dart_datatype_t dtype) + const dart_team_data_t * team_data, + const dart_segment_info_t * seginfo, + void * dest, + uint64_t offset, + dart_team_unit_t unitid, + size_t nelem, + dart_datatype_t dtype) { - int16_t seg_id = gptr.segid; - uint64_t offset = gptr.addr_or_offs.offset; - DART_LOG_DEBUG("dart_get: shared windows enabled"); - dart_team_unit_t luid = team_data->sharedmem_tab[gptr.unitid]; - char * baseptr; - /* - * Use memcpy if the target is in the same node as the calling unit: - */ - DART_LOG_DEBUG("dart_get: shared memory segment, seg_id:%d", - seg_id); - if (seg_id) { - if (dart_segment_get_baseptr( - &team_data->segdata, seg_id, luid, &baseptr) != DART_OK) { - DART_LOG_ERROR("dart_get ! " - "dart_adapt_transtable_get_baseptr failed"); - return DART_ERR_INVAL; - } - } else { - baseptr = dart_sharedmem_local_baseptr_set[luid.id]; - } + DART_LOG_DEBUG("dart_get: using shared memory window in segment %d enabled", + seginfo->segid); + dart_team_unit_t luid = team_data->sharedmem_tab[unitid.id]; + char * baseptr = seginfo->baseptr[luid.id]; + baseptr += offset; DART_LOG_DEBUG( "dart_get: memcpy %zu bytes", nelem * dart__mpi__datatype_sizeof(dtype)); - memcpy((char*)dest, baseptr, nelem * dart__mpi__datatype_sizeof(dtype)); + memcpy(dest, baseptr, nelem * dart__mpi__datatype_sizeof(dtype)); + return DART_OK; +} + +static dart_ret_t put_shared_mem( + const dart_team_data_t * team_data, + const dart_segment_info_t * seginfo, + const void * src, + uint64_t offset, + dart_team_unit_t unitid, + size_t nelem, + dart_datatype_t dtype) +{ + DART_LOG_DEBUG("dart_get: using shared memory window in segment %d enabled", + seginfo->segid); + dart_team_unit_t luid = team_data->sharedmem_tab[unitid.id]; + char * baseptr = seginfo->baseptr[luid.id]; + + baseptr += offset; + DART_LOG_DEBUG( + "dart_get: memcpy %zu bytes", nelem * dart__mpi__datatype_sizeof(dtype)); + memcpy(baseptr, src, nelem * dart__mpi__datatype_sizeof(dtype)); return DART_OK; } #endif // !defined(DART_MPI_DISABLE_SHARED_WINDOWS) +#define CLEAN_SEGMENT(__seginfo) \ + do { \ + if (seginfo->dirty) { \ + CHECK_MPI_RET( \ + MPI_Win_flush_all(__seginfo->win), \ + "MPI_Win_flush_all"); \ + seginfo->dirty = false; \ + } \ + } while (0) + dart_ret_t dart_get( void * dest, dart_gptr_t gptr, size_t nelem, - dart_datatype_t dtype) + dart_datatype_t dtype, + int flags) { MPI_Win win; MPI_Datatype mpi_dtype = dart__mpi__datatype(dtype); uint64_t offset = gptr.addr_or_offs.offset; int16_t seg_id = gptr.segid; dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); + dart_team_t teamid = gptr.teamid; - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_get ! failed: gptr.unitid < 0"); - return DART_ERR_INVAL; - } + // shortcut + if (nelem == 0) return DART_OK; + + DART_ASSERT_MSG(team_unit_id.id >= 0, "dart_put ! failed: gptr.unitid < 0"); /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { - DART_LOG_ERROR("dart_get ! failed: nelem > INT_MAX"); - return DART_ERR_INVAL; - } + DART_ASSERT_MSG(nelem < INT_MAX, "dart_put ! failed: nelem > INT_MAX"); - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_get ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + DART_ASSERT_MSG(team_data != NULL, "dart_put ! failed: Unknown team %i!"); + + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + + DART_ASSERT_MSG(seginfo != NULL, "dart_get_blocking ! " + "Unknown segment %i on team %i"); DART_LOG_DEBUG("dart_get() uid:%d o:%"PRIu64" s:%d t:%d nelem:%zu", - team_unit_id.id, offset, seg_id, gptr.teamid, nelem); + team_unit_id.id, offset, seg_id, teamid, nelem); + + if (flags & DART_FLAG_ORDERED) { + CLEAN_SEGMENT(seginfo); + } + + if (team_data->unitid == team_unit_id.id) { + // use direct memcpy if we are on the same unit + memcpy(dest, seginfo->selfbaseptr + offset, + nelem * dart__mpi__datatype_sizeof(dtype)); + DART_LOG_DEBUG("dart_get_blocking: memcpy nelem:%zu " + "source (coll.): offset:%lu -> dest: %p", + nelem, offset, dest); + return DART_OK; + } #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) DART_LOG_DEBUG("dart_get: shared windows enabled"); - if (seg_id >= 0 && team_data->sharedmem_tab[gptr.unitid].id >= 0) { - return get_shared_mem(team_data, dest, gptr, nelem, dtype); + if (seginfo->isshm && team_data->sharedmem_tab[team_unit_id.id].id >= 0) { + return get_shared_mem(team_data, seginfo, dest, offset, + team_unit_id, nelem, dtype); } #else DART_LOG_DEBUG("dart_get: shared windows disabled"); @@ -129,66 +174,21 @@ dart_ret_t dart_get( * MPI shared windows disabled or target and calling unit are on different * nodes, use MPI_Get: */ - if (seg_id) { - MPI_Aint disp_s; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - return DART_ERR_INVAL; - } - - if (team_data->unitid == team_unit_id.id) { - // use direct memcpy if we are on the same unit - memcpy(dest, ((void*)disp_s) + offset, - nelem * dart__mpi__datatype_sizeof(dtype)); - DART_LOG_TRACE("dart_get: memcpy nelem:%zu " - "source (coll.): disp:%"PRId64" -> dest:%p", - nelem, offset, dest); - return DART_OK; - } - - offset += disp_s; - win = team_data->window; - DART_LOG_TRACE("dart_get: nelem:%zu " - "source (coll.): win:%"PRIu64" unit:%d disp:%"PRId64" " - "-> dest:%p", - nelem, (unsigned long)win, team_unit_id.id, offset, dest); - - } else { - - if (team_data->unitid == team_unit_id.id) { - // use direct memcpy if we are on the same unit - memcpy(dest, dart_mempool_localalloc + offset, - nelem * dart__mpi__datatype_sizeof(dtype)); - DART_LOG_TRACE("dart_get: memcpy nelem:%zu " - "source (local): disp:%"PRId64" -> dest:%p", - nelem, offset, dest); - return DART_OK; - } - - win = dart_win_local_alloc; - DART_LOG_TRACE("dart_get: nelem:%zu " - "source (local): win:%"PRIu64" unit:%d disp:%"PRId64" " - "-> dest:%p", - nelem, (unsigned long)win, team_unit_id.id, offset, dest); - } + win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); DART_LOG_TRACE("dart_get: MPI_Get"); - if (MPI_Get(dest, - nelem, - mpi_dtype, - team_unit_id.id, - offset, - nelem, - mpi_dtype, - win) - != MPI_SUCCESS) { - DART_LOG_ERROR("dart_get ! MPI_Rget failed"); - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Get(dest, + nelem, + mpi_dtype, + team_unit_id.id, + offset, + nelem, + mpi_dtype, + win), + "MPI_GET"); DART_LOG_DEBUG("dart_get > finished"); return DART_OK; @@ -200,78 +200,67 @@ dart_ret_t dart_put( size_t nelem, dart_datatype_t dtype) { - MPI_Win win; MPI_Datatype mpi_dtype = dart__mpi__datatype(dtype); uint64_t offset = gptr.addr_or_offs.offset; int16_t seg_id = gptr.segid; dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); + dart_team_t teamid = gptr.teamid; - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_put ! failed: gptr.unitid < 0"); - return DART_ERR_INVAL; - } + // shortcut + if (nelem == 0) return DART_OK; + + DART_ASSERT_MSG(team_unit_id.id >= 0, "dart_put ! failed: gptr.unitid < 0"); /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { - DART_LOG_ERROR("dart_put ! failed: nelem > INT_MAX"); - return DART_ERR_INVAL; - } - - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_put ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } + DART_ASSERT_MSG(nelem < INT_MAX, "dart_put ! failed: nelem > INT_MAX"); - if (seg_id) { - - MPI_Aint disp_s; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - return DART_ERR_INVAL; - } - - /* copy data directly if we are on the same unit */ - if (team_unit_id.id == team_data->unitid) { - memcpy(((void*)disp_s) + offset, src, - nelem * dart__mpi__datatype_sizeof(dtype)); - DART_LOG_DEBUG("dart_put: memcpy nelem:%zu (from global allocation)" - "offset: %"PRIu64"", nelem, offset); - return DART_OK; - } + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + DART_ASSERT_MSG(team_data != NULL, "dart_put ! failed: Unknown team %i!"); - win = team_data->window; - offset += disp_s; + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); - } else { + DART_ASSERT_MSG(seginfo != NULL, "dart_get_blocking ! " + "Unknown segment %i on team %i"); - /* copy data directly if we are on the same unit */ - if (team_unit_id.id == team_data->unitid) { - memcpy(dart_mempool_localalloc + offset, src, - nelem * dart__mpi__datatype_sizeof(dtype)); - DART_LOG_DEBUG("dart_put: memcpy nelem:%zu (from local allocation)" - "offset: %"PRIu64"", nelem, offset); - return DART_OK; - } + /* copy data directly if we are on the same unit */ + if (team_unit_id.id == team_data->unitid) { + memcpy(seginfo->selfbaseptr + offset, src, + nelem * dart__mpi__datatype_sizeof(dtype)); + DART_LOG_DEBUG("dart_put: memcpy nelem:%zu (from global allocation)" + "offset: %"PRIu64"", nelem, offset); + return DART_OK; + } - win = dart_win_local_alloc; +#if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) + DART_LOG_DEBUG("dart_put: shared windows enabled"); + if (seginfo->isshm && team_data->sharedmem_tab[team_unit_id.id].id >= 0) { + return put_shared_mem(team_data, seginfo, src, offset, + team_unit_id, nelem, dtype); } +#else + DART_LOG_DEBUG("dart_get_blocking: shared windows disabled"); +#endif /* !defined(DART_MPI_DISABLE_SHARED_WINDOWS) */ + // source on another node or shared memory windows disabled + MPI_Win win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); - MPI_Put( - src, - nelem, - mpi_dtype, - team_unit_id.id, - offset, - nelem, - mpi_dtype, - win); + CHECK_MPI_RET( + MPI_Put( + src, + nelem, + mpi_dtype, + team_unit_id.id, + offset, + nelem, + mpi_dtype, + win), + "MPI_Put"); + + seginfo->dirty = true; return DART_OK; } @@ -283,16 +272,16 @@ dart_ret_t dart_accumulate( dart_datatype_t dtype, dart_operation_t op) { - MPI_Win win; MPI_Datatype mpi_dtype; MPI_Op mpi_op; dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); - uint64_t offset = gptr.addr_or_offs.offset; - int16_t seg_id = gptr.segid; - mpi_dtype = dart__mpi__datatype(dtype); - mpi_op = dart__mpi__op(op); + uint64_t offset = gptr.addr_or_offs.offset; + int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; + mpi_dtype = dart__mpi__datatype(dtype); + mpi_op = dart__mpi__op(op); - if (gptr.unitid < 0) { + if (dart__unlikely(team_unit_id.id < 0)) { DART_LOG_ERROR("dart_accumulate ! failed: gptr.unitid < 0"); return DART_ERR_INVAL; } @@ -303,51 +292,45 @@ dart_ret_t dart_accumulate( /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { + if (dart__unlikely(nelem > INT_MAX)) { DART_LOG_ERROR("dart_accumulate ! failed: nelem > INT_MAX"); return DART_ERR_INVAL; } - if (seg_id) { - MPI_Aint disp_s; - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_accumulate ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_put ! failed: Unknown team %i!", teamid); + return DART_ERR_INVAL; + } - win = team_data->window; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - DART_LOG_ERROR("dart_accumulate ! " - "dart_adapt_transtable_get_disp failed"); - return DART_ERR_INVAL; - } - offset += disp_s; - DART_LOG_TRACE("dart_accumulate: nelem:%zu (from collective allocation) " - "target unit: %d offset: %"PRIu64"", - nelem, team_unit_id.id, offset); - } else { - win = dart_win_local_alloc; - DART_LOG_TRACE("dart_accumulate: nelem:%zu (from local allocation) " - "target unit: %d offset: %"PRIu64"", - nelem, team_unit_id.id, offset); - } - - MPI_Accumulate( - values, // Origin address - nelem, // Number of entries in buffer - mpi_dtype, // Data type of each buffer entry - team_unit_id.id, // Rank of target - offset, // Displacement from start of window to beginning - // of target buffer - nelem, // Number of entries in target buffer - mpi_dtype, // Data type of each entry in target buffer - mpi_op, // Reduce operation - win); + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + if (dart__unlikely(seginfo == NULL)) { + DART_LOG_ERROR("dart_get_blocking ! " + "Unknown segment %i on team %i", seg_id, teamid); + return DART_ERR_INVAL; + } + + CLEAN_SEGMENT(seginfo); + + MPI_Win win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); + + CHECK_MPI_RET( + MPI_Accumulate( + values, // Origin address + nelem, // Number of entries in buffer + mpi_dtype, // Data type of each buffer entry + team_unit_id.id, // Rank of target + offset, // Displacement from start of window to beginning + // of target buffer + nelem, // Number of entries in target buffer + mpi_dtype, // Data type of each entry in target buffer + mpi_op, // Reduce operation + win), + "MPI_Accumulate"); + + seginfo->dirty = true; DART_LOG_DEBUG("dart_accumulate > finished"); return DART_OK; @@ -360,16 +343,16 @@ dart_ret_t dart_fetch_and_op( dart_datatype_t dtype, dart_operation_t op) { - MPI_Win win; MPI_Datatype mpi_dtype; MPI_Op mpi_op; dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); - uint64_t offset = gptr.addr_or_offs.offset; - int16_t seg_id = gptr.segid; - mpi_dtype = dart__mpi__datatype(dtype); - mpi_op = dart__mpi__op(op); + uint64_t offset = gptr.addr_or_offs.offset; + int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; + mpi_dtype = dart__mpi__datatype(dtype); + mpi_op = dart__mpi__op(op); - if (gptr.unitid < 0) { + if (dart__unlikely(team_unit_id.id < 0)) { DART_LOG_ERROR("dart_fetch_and_op ! failed: gptr.unitid < 0"); return DART_ERR_INVAL; } @@ -377,46 +360,41 @@ dart_ret_t dart_fetch_and_op( DART_LOG_DEBUG("dart_fetch_and_op() dtype:%d op:%d unit:%d " "offset:%"PRIu64" segid:%d", dtype, op, team_unit_id.id, - gptr.addr_or_offs.offset, gptr.segid); - if (seg_id) { + gptr.addr_or_offs.offset, seg_id); - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_fetch_and_op ! failed: Unknown team %i!", - gptr.teamid); - return DART_ERR_INVAL; - } + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_put ! failed: Unknown team %i!", teamid); + return DART_ERR_INVAL; + } + + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + if (dart__unlikely(seginfo == NULL)) { + DART_LOG_ERROR("dart_get_blocking ! " + "Unknown segment %i on team %i", seg_id, teamid); + return DART_ERR_INVAL; + } + + CLEAN_SEGMENT(seginfo); + + MPI_Win win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); + + CHECK_MPI_RET( + MPI_Fetch_and_op( + value, // Origin address + result, // Result address + mpi_dtype, // Data type of each buffer entry + team_unit_id.id, // Rank of target + offset, // Displacement from start of window to beginning + // of target buffer + mpi_op, // Reduce operation + win), + "MPI_Fetch_and_op"); + + seginfo->dirty = true; - MPI_Aint disp_s; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - DART_LOG_ERROR("dart_fetch_and_op ! " - "dart_adapt_transtable_get_disp failed"); - return DART_ERR_INVAL; - } - offset += disp_s; - win = team_data->window; - DART_LOG_TRACE("dart_fetch_and_op: (from coll. allocation) " - "target unit: %d offset: %"PRIu64, - team_unit_id.id, offset); - } else { - win = dart_win_local_alloc; - DART_LOG_TRACE("dart_fetch_and_op: (from local allocation) " - "target unit: %d offset: %"PRIu64, - team_unit_id.id, offset); - } - MPI_Fetch_and_op( - value, // Origin address - result, // Result address - mpi_dtype, // Data type of each buffer entry - team_unit_id.id, // Rank of target - offset, // Displacement from start of window to beginning - // of target buffer - mpi_op, // Reduce operation - win); DART_LOG_DEBUG("dart_fetch_and_op > finished"); return DART_OK; } @@ -428,13 +406,13 @@ dart_ret_t dart_compare_and_swap( void * result, dart_datatype_t dtype) { - MPI_Win win; dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); - uint64_t offset = gptr.addr_or_offs.offset; - int16_t seg_id = gptr.segid; + uint64_t offset = gptr.addr_or_offs.offset; + int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; MPI_Datatype mpi_dtype = dart__mpi__datatype(dtype); - if (gptr.unitid < 0) { + if (dart__unlikely(team_unit_id.id < 0)) { DART_LOG_ERROR("dart_compare_and_swap ! failed: gptr.unitid < 0"); return DART_ERR_INVAL; } @@ -448,41 +426,39 @@ dart_ret_t dart_compare_and_swap( DART_LOG_TRACE("dart_compare_and_swap() dtype:%d unit:%d offset:%"PRIu64, dtype, team_unit_id.id, gptr.addr_or_offs.offset); - if (seg_id) { - MPI_Aint disp_s; + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_put ! failed: Unknown team %i!", teamid); + return DART_ERR_INVAL; + } - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_get_handle ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + if (dart__unlikely(seginfo == NULL)) { + DART_LOG_ERROR("dart_get_blocking ! " + "Unknown segment %i on team %i", seg_id, teamid); + return DART_ERR_INVAL; + } - win = team_data->window; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - DART_LOG_ERROR("dart_accumulate ! " - "dart_adapt_transtable_get_disp failed"); - return DART_ERR_INVAL; - } - offset += disp_s; - DART_LOG_TRACE("dart_compare_and_swap: target unit: %d offset: %"PRIu64"", - team_unit_id.id, offset); - } else { - win = dart_win_local_alloc; - DART_LOG_TRACE("dart_compare_and_swap: target unit: %d offset: %"PRIu64"", - team_unit_id.id, offset); - } - MPI_Compare_and_swap( + CLEAN_SEGMENT(seginfo); + + MPI_Win win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); + + + CHECK_MPI_RET( + MPI_Compare_and_swap( value, compare, result, mpi_dtype, team_unit_id.id, offset, - win); + win), + "MPI_Compare_and_swap"); + + seginfo->dirty = true; + DART_LOG_DEBUG("dart_compare_and_swap > finished"); return DART_OK; } @@ -494,57 +470,61 @@ dart_ret_t dart_get_handle( dart_gptr_t gptr, size_t nelem, dart_datatype_t dtype, - dart_handle_t * handle) + dart_handle_t * handle, + int flags) { - MPI_Datatype mpi_type = dart__mpi__datatype(dtype); - MPI_Win win; - dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); - uint64_t offset = gptr.addr_or_offs.offset; - int16_t seg_id = gptr.segid; + MPI_Datatype mpi_type = dart__mpi__datatype(dtype); + MPI_Win win; + dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); + uint64_t offset = gptr.addr_or_offs.offset; + int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; *handle = NULL; - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_get_handle ! failed: gptr.unitid < 0"); - return DART_ERR_INVAL; - } + // shortcut + if (nelem == 0) return DART_OK; + + DART_ASSERT_MSG(team_unit_id.id >= 0, "dart_put ! failed: gptr.unitid < 0"); /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { - DART_LOG_ERROR("dart_get_handle ! failed: nelem > INT_MAX"); - return DART_ERR_INVAL; - } + DART_ASSERT_MSG(nelem < INT_MAX, "dart_put ! failed: nelem > INT_MAX"); - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_get_handle ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + DART_ASSERT_MSG(team_data != NULL, "dart_put ! failed: Unknown team %i!"); + + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + + DART_ASSERT_MSG(seginfo != NULL, "dart_get_blocking ! " + "Unknown segment %i on team %i"); + + if (flags & DART_FLAG_ORDERED) { + CLEAN_SEGMENT(seginfo); } + win = seginfo->win; + DART_LOG_DEBUG("dart_get_handle() uid:%d o:%"PRIu64" s:%d t:%d, nelem:%zu", - team_unit_id.id, offset, seg_id, gptr.teamid, nelem); + team_unit_id.id, offset, seg_id, teamid, nelem); DART_LOG_TRACE("dart_get_handle: allocated handle:%p", (void *)(*handle)); #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) DART_LOG_DEBUG("dart_get_handle: shared windows enabled"); - if (seg_id >= 0 && team_data->sharedmem_tab[gptr.unitid].id >= 0) { - dart_ret_t ret = get_shared_mem(team_data, dest, gptr, nelem, dtype); + if (seginfo->isshm && team_data->sharedmem_tab[team_unit_id.id].id >= 0) { + dart_ret_t ret = get_shared_mem(team_data, seginfo, dest, offset, + team_unit_id, nelem, dtype); /* * Mark request as completed: */ *handle = malloc(sizeof(struct dart_handle_struct)); (*handle)->request = MPI_REQUEST_NULL; - if (seg_id != 0) { - (*handle)->dest = team_unit_id.id; - (*handle)->win = team_data->window; - } else { - (*handle)->dest = team_unit_id.id; - (*handle)->win = dart_win_local_alloc; - } + (*handle)->dest = team_unit_id.id; + (*handle)->win = win; return ret; } #else @@ -554,53 +534,24 @@ dart_ret_t dart_get_handle( * MPI shared windows disabled or target and calling unit are on different * nodes, use MPI_RGet: */ - if (seg_id) { - /* - * The memory accessed is allocated with collective allocation. - */ - win = team_data->window; - - MPI_Aint disp_s; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - DART_LOG_ERROR( - "dart_get_handle ! dart_adapt_transtable_get_disp failed"); - return DART_ERR_INVAL; - } - offset += disp_s; + offset += dart_segment_disp(seginfo, team_unit_id); - DART_LOG_DEBUG("dart_get_handle: -- %zu elements (collective allocation) " - "from %d at offset %"PRIu64"", - nelem, team_unit_id.id, offset); - } else { - /* - * The memory accessed is allocated with local allocation. - */ - DART_LOG_DEBUG("dart_get_handle: -- %zu elements (local allocation) " - "from %d at offset %"PRIu64"", - nelem, team_unit_id.id, offset); - win = dart_win_local_alloc; - } DART_LOG_DEBUG("dart_get_handle: -- MPI_Rget"); MPI_Request mpi_req; - int mpi_ret = MPI_Rget( - dest, // origin address - nelem, // origin count - mpi_type, // origin data type - team_unit_id.id, // target rank - offset, // target disp in window - nelem, // target count - mpi_type, // target data type - win, // window - &mpi_req); - if (mpi_ret != MPI_SUCCESS) { - DART_LOG_ERROR("dart_get_handle ! MPI_Rget failed"); - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Rget( + dest, // origin address + nelem, // origin count + mpi_type, // origin data type + team_unit_id.id, // target rank + offset, // target disp in window + nelem, // target count + mpi_type, // target data type + win, // window + &mpi_req), + "MPI_Rget"); + *handle = malloc(sizeof(struct dart_handle_struct)); (*handle)->dest = team_unit_id.id; (*handle)->request = mpi_req; @@ -623,75 +574,52 @@ dart_ret_t dart_put_handle( dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); uint64_t offset = gptr.addr_or_offs.offset; int16_t seg_id = gptr.segid; - MPI_Win win; + dart_team_t teamid = gptr.teamid; *handle = NULL; - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_put_handle ! failed: gptr.unitid < 0"); - return DART_ERR_INVAL; - } + // shortcut + if (nelem == 0) return DART_OK; + + DART_ASSERT_MSG(team_unit_id.id >= 0, "dart_put ! failed: gptr.unitid < 0"); /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { - DART_LOG_ERROR("dart_put_handle ! failed: nelem > INT_MAX"); - return DART_ERR_INVAL; - } + DART_ASSERT_MSG(nelem < INT_MAX, "dart_put ! failed: nelem > INT_MAX"); + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + DART_ASSERT_MSG(team_data != NULL, "dart_put ! failed: Unknown team %i!"); - if (seg_id != 0) { - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_put_handle ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } - - win = team_data->window; + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); - MPI_Aint disp_s; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - return DART_ERR_INVAL; - } - offset += disp_s; + DART_ASSERT_MSG(seginfo != NULL, + "dart_get_blocking ! Unknown segment %i on team %i"); - DART_LOG_DEBUG("dart_put_handle: nelem:%zu dtype:%d" - "(from collective allocation) " - "target_unit:%d offset:%"PRIu64"", - nelem, dtype, team_unit_id.id, offset); - } else { - win = dart_win_local_alloc; - DART_LOG_DEBUG("dart_put_handle: nlem:%zu dtype:%d" - "(from local allocation) " - "target_unit:%d offset:%"PRIu64"", - nelem, dtype, team_unit_id.id, offset); - } + MPI_Win win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); DART_LOG_DEBUG("dart_put_handle: MPI_RPut"); - int ret = MPI_Rput( - src, - nelem, - mpi_type, - team_unit_id.id, - offset, - nelem, - mpi_type, - win, - &mpi_req); + CHECK_MPI_RET( + MPI_Rput( + src, + nelem, + mpi_type, + team_unit_id.id, + offset, + nelem, + mpi_type, + win, + &mpi_req), + "MPI_Rput"); + + seginfo->dirty = true; - if (ret != MPI_SUCCESS) { - DART_LOG_ERROR("dart_put_handle ! MPI_Rput failed"); - return DART_ERR_INVAL; - } *handle = malloc(sizeof(struct dart_handle_struct)); - (*handle) -> dest = team_unit_id.id; - (*handle) -> request = mpi_req; - (*handle) -> win = win; + (*handle)->dest = team_unit_id.id; + (*handle)->request = mpi_req; + (*handle)->win = win; return DART_OK; } @@ -711,139 +639,71 @@ dart_ret_t dart_put_blocking( dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); uint64_t offset = gptr.addr_or_offs.offset; int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_put_blocking ! failed: gptr.unitid < 0"); - return DART_ERR_INVAL; - } + // shortcut + if (nelem == 0) return DART_OK; + + DART_ASSERT_MSG(team_unit_id.id >= 0, "dart_put ! failed: gptr.unitid < 0"); /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { - DART_LOG_ERROR("dart_put_blocking ! failed: nelem > INT_MAX"); - return DART_ERR_INVAL; - } + DART_ASSERT_MSG(nelem < INT_MAX, "dart_put ! failed: nelem > INT_MAX"); - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_put_blocking ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + DART_ASSERT_MSG(team_data != NULL, "dart_put ! failed: Unknown team %i!"); - DART_LOG_DEBUG("dart_put_blocking() uid:%d o:%"PRIu64" s:%d t:%d, nelem:%zu", - team_unit_id.id, offset, seg_id, gptr.teamid, nelem); + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + + DART_ASSERT_MSG(seginfo != NULL, "dart_get_blocking ! " + "Unknown segment %i on team %i"); + + + /* copy data directly if we are on the same unit */ + if (team_unit_id.id == team_data->unitid) { + memcpy(seginfo->selfbaseptr + offset, src, + nelem * dart__mpi__datatype_sizeof(dtype)); + DART_LOG_DEBUG("dart_put: memcpy nelem:%zu (from global allocation)" + "offset: %"PRIu64"", nelem, offset); + return DART_OK; + } #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) DART_LOG_DEBUG("dart_put_blocking: shared windows enabled"); - if (seg_id >= 0) { - /* - * Use memcpy if the target is in the same node as the calling unit: - * The value of i will be the target's relative ID in teamid. - */ - dart_team_unit_t luid = team_data->sharedmem_tab[gptr.unitid]; - if (luid.id >= 0) { - char * baseptr; - DART_LOG_DEBUG("dart_put_blocking: shared memory segment, seg_id:%d", - seg_id); - if (seg_id) { - if (dart_segment_get_baseptr( - &team_data->segdata, - seg_id, - luid, - &baseptr) != DART_OK) { - DART_LOG_ERROR("dart_put_blocking ! " - "dart_adapt_transtable_get_baseptr failed"); - return DART_ERR_INVAL; - } - } else { - baseptr = dart_sharedmem_local_baseptr_set[luid.id]; - } - baseptr += offset; - DART_LOG_DEBUG("dart_put_blocking: memcpy %zu bytes", - nelem * dart__mpi__datatype_sizeof(dtype)); - memcpy(baseptr, src, nelem * dart__mpi__datatype_sizeof(dtype)); - return DART_OK; - } + if (seginfo->isshm && team_data->sharedmem_tab[team_unit_id.id].id >= 0) { + return put_shared_mem(team_data, seginfo, src, offset, + team_unit_id, nelem, dtype); } #else - DART_LOG_DEBUG("dart_put_blocking: shared windows disabled"); + DART_LOG_DEBUG("dart_get_blocking: shared windows disabled"); #endif /* !defined(DART_MPI_DISABLE_SHARED_WINDOWS) */ - /* - * MPI shared windows disabled or target and calling unit are on different - * nodes, use MPI_Rput: - */ - if (seg_id) { - MPI_Aint disp_s; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - DART_LOG_ERROR("dart_put_blocking ! " - "dart_adapt_transtable_get_disp failed"); - return DART_ERR_INVAL; - } - /* copy data directly if we are on the same unit */ - if (team_unit_id.id == team_data->unitid) { - memcpy(((void*)disp_s) + offset, src, - nelem*dart__mpi__datatype_sizeof(dtype)); - DART_LOG_DEBUG("dart_put: memcpy nelem:%zu " - "target unit: %d offset: %"PRIu64"", - nelem, team_unit_id.id, offset); - return DART_OK; - } + win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); - win = team_data->window; - offset += disp_s; - DART_LOG_DEBUG("dart_put_blocking: nelem:%zu " - "target (coll.): win:%p unit:%d offset:%lu " - "<- source: %p", - nelem, win, team_unit_id.id, - offset, src); - - } else { - - /* copy data directly if we are on the same unit */ - if (team_unit_id.id == team_data->unitid) { - memcpy(dart_mempool_localalloc + offset, src, - nelem * dart__mpi__datatype_sizeof(dtype)); - DART_LOG_DEBUG("dart_put: memcpy nelem:%zu offset: %"PRIu64"", - nelem, offset); - return DART_OK; - } - win = dart_win_local_alloc; - DART_LOG_DEBUG("dart_put_blocking: nelem:%zu " - "target (local): win:%p unit:%d offset:%lu " - "<- source: %p", - nelem, win, team_unit_id.id, - offset, src); - } + DART_LOG_DEBUG("dart_put_blocking() uid:%d o:%"PRIu64" s:%d t:%d, nelem:%zu", + team_unit_id.id, offset, seg_id, teamid, nelem); /* * Using MPI_Put as MPI_Win_flush is required to ensure remote completion. */ DART_LOG_DEBUG("dart_put_blocking: MPI_Put"); - if (MPI_Put(src, - nelem, - mpi_dtype, - team_unit_id.id, - offset, - nelem, - mpi_dtype, - win) - != MPI_SUCCESS) { - DART_LOG_ERROR("dart_put_blocking ! MPI_Put failed"); - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Put(src, + nelem, + mpi_dtype, + team_unit_id.id, + offset, + nelem, + mpi_dtype, + win), + "MPI_Put"); DART_LOG_DEBUG("dart_put_blocking: MPI_Win_flush"); - if (MPI_Win_flush(team_unit_id.id, win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_put_blocking ! MPI_Win_flush failed"); - return DART_ERR_INVAL; - } + CHECK_MPI_RET(MPI_Win_flush(team_unit_id.id, win), "MPI_Win_flush"); DART_LOG_DEBUG("dart_put_blocking > finished"); return DART_OK; @@ -856,123 +716,91 @@ dart_ret_t dart_get_blocking( void * dest, dart_gptr_t gptr, size_t nelem, - dart_datatype_t dtype) + dart_datatype_t dtype, + int flags) { MPI_Win win; MPI_Datatype mpi_dtype = dart__mpi__datatype(dtype); dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); uint64_t offset = gptr.addr_or_offs.offset; int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_get_blocking ! failed: gptr.unitid < 0"); - return DART_ERR_INVAL; - } + // shortcut + if (nelem == 0) return DART_OK; + + DART_ASSERT_MSG(team_unit_id.id >= 0, "dart_put ! failed: gptr.unitid < 0"); /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { - DART_LOG_ERROR("dart_get_blocking ! failed: nelem > INT_MAX"); - return DART_ERR_INVAL; - } + DART_ASSERT_MSG(nelem < INT_MAX, "dart_put ! failed: nelem > INT_MAX"); - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_get_blocking ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + DART_ASSERT_MSG(team_data != NULL, "dart_put ! failed: Unknown team %i!"); + + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + + DART_ASSERT_MSG(seginfo != NULL, "dart_get_blocking ! " + "Unknown segment %i on team %i"); DART_LOG_DEBUG("dart_get_blocking() uid:%d " "o:%"PRIu64" s:%d t:%u, nelem:%zu", team_unit_id.id, - offset, seg_id, gptr.teamid, nelem); + offset, seg_id, teamid, nelem); + + if (flags & DART_FLAG_ORDERED) { + CLEAN_SEGMENT(seginfo); + } + + if (team_data->unitid == team_unit_id.id) { + // use direct memcpy if we are on the same unit + memcpy(dest, seginfo->selfbaseptr + offset, + nelem * dart__mpi__datatype_sizeof(dtype)); + DART_LOG_DEBUG("dart_get_blocking: memcpy nelem:%zu " + "source (coll.): offset:%lu -> dest: %p", + nelem, offset, dest); + return DART_OK; + } #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) DART_LOG_DEBUG("dart_get_blocking: shared windows enabled"); - if (seg_id >= 0 && team_data->sharedmem_tab[gptr.unitid].id >= 0) { - return get_shared_mem(team_data, dest, gptr, nelem, dtype); + if (seginfo->isshm && team_data->sharedmem_tab[team_unit_id.id].id >= 0) { + return get_shared_mem(team_data, seginfo, dest, offset, + team_unit_id, nelem, dtype); } #else DART_LOG_DEBUG("dart_get_blocking: shared windows disabled"); #endif /* !defined(DART_MPI_DISABLE_SHARED_WINDOWS) */ + /* * MPI shared windows disabled or target and calling unit are on different * nodes, use MPI_Rget: */ - if (seg_id) { - MPI_Aint disp_s; - if (dart_segment_get_disp( - &team_data->segdata, - seg_id, - team_unit_id, - &disp_s) != DART_OK) { - DART_LOG_ERROR("dart_get_blocking ! " - "dart_adapt_transtable_get_disp failed"); - return DART_ERR_INVAL; - } - if (team_data->unitid == team_unit_id.id) { - // use direct memcpy if we are on the same unit - memcpy(dest, ((void*)disp_s) + offset, - nelem * dart__mpi__datatype_sizeof(dtype)); - DART_LOG_DEBUG("dart_get_blocking: memcpy nelem:%zu " - "source (coll.): offset:%lu -> dest: %p", - nelem, offset, dest); - return DART_OK; - } - - win = team_data->window; - offset += disp_s; - DART_LOG_DEBUG("dart_get_blocking: nelem:%zu " - "source (coll.): win:%p unit:%d offset:%lu " - "-> dest: %p", - nelem, win, team_unit_id.id, - offset, dest); - - } else { - - if (team_data->unitid == team_unit_id.id) { - /* use direct memcpy if we are on the same unit */ - memcpy(dest, dart_mempool_localalloc + offset, - nelem * dart__mpi__datatype_sizeof(dtype)); - DART_LOG_DEBUG("dart_get_blocking: memcpy nelem:%zu " - "source (coll.): offset:%lu -> dest: %p", - nelem, offset, dest); - return DART_OK; - } - - win = dart_win_local_alloc; - DART_LOG_DEBUG("dart_get_blocking: nelem:%zu " - "source (local): win:%p unit:%d offset:%lu " - "-> dest: %p", - nelem, win, team_unit_id.id, - offset, dest); - } + win = seginfo->win; + offset += dart_segment_disp(seginfo, team_unit_id); /* * Using MPI_Get as MPI_Win_flush is required to ensure remote completion. */ DART_LOG_DEBUG("dart_get_blocking: MPI_Rget"); MPI_Request req; - if (MPI_Rget(dest, - nelem, - mpi_dtype, - team_unit_id.id, - offset, - nelem, - mpi_dtype, - win, - &req) - != MPI_SUCCESS) { - DART_LOG_ERROR("dart_get_blocking ! MPI_Rget failed"); - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Rget(dest, + nelem, + mpi_dtype, + team_unit_id.id, + offset, + nelem, + mpi_dtype, + win, + &req), "MPI_Rget"); + DART_LOG_DEBUG("dart_get_blocking: MPI_Wait"); - if (MPI_Wait(&req, MPI_STATUS_IGNORE) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_get_blocking ! MPI_Wait failed"); - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Wait(&req, MPI_STATUS_IGNORE), "MPI_Wait"); DART_LOG_DEBUG("dart_get_blocking > finished"); return DART_OK; @@ -983,46 +811,56 @@ dart_ret_t dart_get_blocking( dart_ret_t dart_flush( dart_gptr_t gptr) { - MPI_Win win; - MPI_Comm comm = DART_COMM_WORLD; +// MPI_Win win; +// MPI_Comm comm = DART_COMM_WORLD; dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; DART_LOG_DEBUG("dart_flush() gptr: " "unitid:%d offset:%"PRIu64" segid:%d teamid:%d", gptr.unitid, gptr.addr_or_offs.offset, gptr.segid, gptr.teamid); - if (gptr.unitid < 0) { + if (dart__unlikely(team_unit_id.id < 0)) { DART_LOG_ERROR("dart_flush ! failed: gptr.unitid < 0"); return DART_ERR_INVAL; } - if (seg_id) { - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_flush ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } - win = team_data->window; - comm = team_data->comm; - } else { - win = dart_win_local_alloc; + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_flush ! failed: Unknown team %i!", teamid); + return DART_ERR_INVAL; } - DART_LOG_TRACE("dart_flush: MPI_Win_flush"); - if (MPI_Win_flush(team_unit_id.id, win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_flush ! MPI_Win_flush failed!"); - return DART_ERR_OTHER; - } - DART_LOG_TRACE("dart_flush: MPI_Win_sync"); - if (MPI_Win_sync(win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_flush ! MPI_Win_sync failed!"); - return DART_ERR_OTHER; + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + if (dart__unlikely(seginfo == NULL)) { + DART_LOG_ERROR("dart_get_blocking ! " + "Unknown segment %i on team %i", seg_id, teamid); + return DART_ERR_INVAL; } +#ifdef DART_ENABLE_PROGRESS // trigger progress int flag; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE); + MPI_Comm comm = team_data->comm; + CHECK_MPI_RET( + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE), + "MPI_Iprobe"); +#endif + + if (!seginfo->dirty) return DART_OK; + + MPI_Win win = seginfo->win; + + DART_LOG_TRACE("dart_flush: MPI_Win_flush"); + CHECK_MPI_RET( + MPI_Win_flush(team_unit_id.id, win), "MPI_Win_flush"); + DART_LOG_TRACE("dart_flush: MPI_Win_sync"); + CHECK_MPI_RET( + MPI_Win_sync(win), "MPI_Win_sync"); + + seginfo->dirty = false; DART_LOG_DEBUG("dart_flush > finished"); return DART_OK; @@ -1031,44 +869,50 @@ dart_ret_t dart_flush( dart_ret_t dart_flush_all( dart_gptr_t gptr) { - MPI_Win win; - MPI_Comm comm = DART_COMM_WORLD; - int16_t seg_id = gptr.segid; + int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; + DART_LOG_DEBUG("dart_flush_all() gptr: " "unitid:%d offset:%"PRIu64" segid:%d teamid:%d", gptr.unitid, gptr.addr_or_offs.offset, gptr.segid, gptr.teamid); - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_flush_all ! failed: gptr.unitid < 0"); + + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_flush ! failed: Unknown team %i!", teamid); return DART_ERR_INVAL; } - if (seg_id) { - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_flush_all ! failed: Unknown team %i!", gptr.teamid); - return DART_ERR_INVAL; - } - - win = team_data->window; - comm = team_data->comm; - } else { - win = dart_win_local_alloc; - } - DART_LOG_TRACE("dart_flush_all: MPI_Win_flush_all"); - if (MPI_Win_flush_all(win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_flush_all ! MPI_Win_flush_all failed!"); - return DART_ERR_OTHER; - } - DART_LOG_TRACE("dart_flush_all: MPI_Win_sync"); - if (MPI_Win_sync(win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_flush_all ! MPI_Win_sync failed!"); - return DART_ERR_OTHER; + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + if (dart__unlikely(seginfo == NULL)) { + DART_LOG_ERROR("dart_get_blocking ! " + "Unknown segment %i on team %i", seg_id, teamid); + return DART_ERR_INVAL; } +#ifdef DART_ENABLE_PROGRESS // trigger progress int flag; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE); + MPI_Comm comm = team_data->comm; + CHECK_MPI_RET( + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE), + "MPI_Iprobe"); +#endif + + if (!seginfo->dirty) return DART_OK; + + MPI_Win win = seginfo->win; + + DART_LOG_TRACE("dart_flush_all: MPI_Win_flush_all"); + CHECK_MPI_RET( + MPI_Win_flush_all(win), "MPI_Win_flush"); + DART_LOG_TRACE("dart_flush_all: MPI_Win_sync"); + CHECK_MPI_RET( + MPI_Win_sync(win), "MPI_Win_sync"); + + + seginfo->dirty = false; DART_LOG_DEBUG("dart_flush_all > finished"); return DART_OK; @@ -1077,9 +921,8 @@ dart_ret_t dart_flush_all( dart_ret_t dart_flush_local( dart_gptr_t gptr) { - MPI_Win win; - MPI_Comm comm = DART_COMM_WORLD; - int16_t seg_id = gptr.segid; + int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; dart_team_unit_t team_unit_id = DART_TEAM_UNIT_ID(gptr.unitid); DART_LOG_DEBUG("dart_flush_local() gptr: " @@ -1087,36 +930,45 @@ dart_ret_t dart_flush_local( gptr.unitid, gptr.addr_or_offs.offset, gptr.segid, gptr.teamid); - if (gptr.unitid < 0) { + if (dart__unlikely(team_unit_id.id < 0)) { DART_LOG_ERROR("dart_flush_local ! failed: gptr.unitid < 0"); return DART_ERR_INVAL; } - if (seg_id) { - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_flush_local ! failed: Unknown team %i!", gptr.segid); - return DART_ERR_INVAL; - } - - win = team_data->window; - comm = team_data->comm; - DART_LOG_DEBUG("dart_flush_local() win:%"PRIu64" seg:%d unit:%d", - (unsigned long)win, seg_id, team_unit_id.id); - } else { - win = dart_win_local_alloc; - DART_LOG_DEBUG("dart_flush_local() lwin:%"PRIu64" seg:%d unit:%d", - (unsigned long)win, seg_id, team_unit_id.id); + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_flush ! failed: Unknown team %i!", teamid); + return DART_ERR_INVAL; } - DART_LOG_TRACE("dart_flush_local: MPI_Win_flush_local"); - if (MPI_Win_flush_local(team_unit_id.id, win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_flush_all ! MPI_Win_flush_local failed!"); - return DART_ERR_OTHER; + + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + if (dart__unlikely(seginfo == NULL)) { + DART_LOG_ERROR("dart_get_blocking ! " + "Unknown segment %i on team %i", seg_id, teamid); + return DART_ERR_INVAL; } + +#ifdef DART_ENABLE_PROGRESS // trigger progress int flag; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE); + MPI_Comm comm = team_data->comm; + CHECK_MPI_RET( + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE), + "MPI_Iprobe"); +#endif + + if (!seginfo->dirty) return DART_OK; + + MPI_Win win = seginfo->win; + DART_LOG_TRACE("dart_flush_local: MPI_Win_flush_local"); + CHECK_MPI_RET( + MPI_Win_flush_local(team_unit_id.id, win), + "MPI_Win_flush_local"); + + seginfo->dirty = false; + DART_LOG_DEBUG("dart_flush_local > finished"); return DART_OK; @@ -1125,40 +977,46 @@ dart_ret_t dart_flush_local( dart_ret_t dart_flush_local_all( dart_gptr_t gptr) { - MPI_Win win; - MPI_Comm comm = DART_COMM_WORLD; - int16_t seg_id = gptr.segid; + int16_t seg_id = gptr.segid; + dart_team_t teamid = gptr.teamid; DART_LOG_DEBUG("dart_flush_local_all() gptr: " "unitid:%d offset:%"PRIu64" segid:%d teamid:%d", gptr.unitid, gptr.addr_or_offs.offset, gptr.segid, gptr.teamid); - if (gptr.unitid < 0) { - DART_LOG_ERROR("dart_flush_local_all ! failed: gptr.unitid < 0"); + + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_flush ! failed: Unknown team %i!", teamid); return DART_ERR_INVAL; } - - if (seg_id) { - dart_team_data_t *team_data = dart_adapt_teamlist_get(gptr.teamid); - if (team_data == NULL) { - DART_LOG_ERROR("dart_flush_local_all ! failed: Unknown team %i!", - gptr.teamid); - return DART_ERR_INVAL; - } - win = team_data->window; - comm = team_data->comm; - } else { - win = dart_win_local_alloc; - } - if (MPI_Win_flush_local_all(win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_flush_all ! MPI_Win_flush_local_all failed!"); - return DART_ERR_OTHER; + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), seg_id); + if (dart__unlikely(seginfo == NULL)) { + DART_LOG_ERROR("dart_get_blocking ! " + "Unknown segment %i on team %i", seg_id, teamid); + return DART_ERR_INVAL; } +#ifdef DART_ENABLE_PROGRESS // trigger progress int flag; - MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE); + MPI_Comm comm = team_data->comm; + CHECK_MPI_RET( + MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, MPI_STATUS_IGNORE), + "MPI_Iprobe"); +#endif + + if (!seginfo->dirty) return DART_OK; + + + MPI_Win win = seginfo->win; + CHECK_MPI_RET( + MPI_Win_flush_local_all(win), + "MPI_Win_flush_local_all"); + + seginfo->dirty = false; DART_LOG_DEBUG("dart_flush_local_all > finished"); return DART_OK; @@ -1226,11 +1084,8 @@ dart_ret_t dart_wait( return DART_ERR_INVAL; } DART_LOG_DEBUG("dart_wait: -- MPI_Win_flush"); - mpi_ret = MPI_Win_flush(handle->dest, handle->win); - if (mpi_ret != MPI_SUCCESS) { - DART_LOG_DEBUG("dart_wait ! MPI_Win_flush failed"); - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Win_flush(handle->dest, handle->win), "MPI_Win_flush"); } else { DART_LOG_TRACE("dart_wait: handle->request: MPI_REQUEST_NULL"); } @@ -1545,11 +1400,9 @@ static int _dart_barrier_count = 0; dart_ret_t dart_barrier( dart_team_t teamid) { - MPI_Comm comm; - DART_LOG_DEBUG("dart_barrier() barrier count: %d", _dart_barrier_count); - if (teamid == DART_UNDEFINED_TEAM_ID) { + if (dart__unlikely(teamid == DART_UNDEFINED_TEAM_ID)) { DART_LOG_ERROR("dart_barrier ! failed: team may not be DART_UNDEFINED_TEAM_ID"); return DART_ERR_INVAL; } @@ -1557,17 +1410,17 @@ dart_ret_t dart_barrier( _dart_barrier_count++; dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); - if (team_data == NULL) { + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_barrier ! failed: Unknown team: %d", teamid); return DART_ERR_INVAL; } + /* Fetch proper communicator from teams. */ - comm = team_data->comm; - if (MPI_Barrier(comm) == MPI_SUCCESS) { - DART_LOG_DEBUG("dart_barrier > finished"); - return DART_OK; - } - DART_LOG_DEBUG("dart_barrier ! MPI_Barrier failed"); - return DART_ERR_INVAL; + CHECK_MPI_RET( + MPI_Barrier(team_data->comm), "MPI_Barrier"); + + DART_LOG_DEBUG("dart_barrier > MPI_Barrier finished"); + return DART_OK; } dart_ret_t dart_bcast( @@ -1583,12 +1436,12 @@ dart_ret_t dart_bcast( DART_LOG_TRACE("dart_bcast() root:%d team:%d nelem:%"PRIu64"", root.id, teamid, nelem); - if (root.id < 0) { + if (dart__unlikely(root.id < 0)) { DART_LOG_ERROR("dart_bcast ! failed: root < 0"); return DART_ERR_INVAL; } - if (teamid == DART_UNDEFINED_TEAM_ID) { + if (dart__unlikely(teamid == DART_UNDEFINED_TEAM_ID)) { DART_LOG_ERROR("dart_bcast ! failed: team may not be DART_UNDEFINED_TEAM_ID"); return DART_ERR_INVAL; } @@ -1596,23 +1449,21 @@ dart_ret_t dart_bcast( /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { + if (dart__unlikely(nelem > INT_MAX)) { DART_LOG_ERROR("dart_bcast ! failed: nelem > INT_MAX"); return DART_ERR_INVAL; } dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); - if (team_data == NULL) { + if (dart__unlikely(team_data == NULL)) { DART_LOG_ERROR("dart_bcast ! root:%d -> team:%d " "dart_adapt_teamlist_convert failed", root.id, teamid); return DART_ERR_INVAL; } comm = team_data->comm; - if (MPI_Bcast(buf, nelem, mpi_dtype, root.id, comm) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_bcast ! root:%d -> team:%d " - "MPI_Bcast failed", root.id, teamid); - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Bcast(buf, nelem, mpi_dtype, root.id, comm), "MPI_Bcast"); + DART_LOG_TRACE("dart_bcast > root:%d team:%d nelem:%zu finished", root.id, teamid, nelem); return DART_OK; @@ -1629,12 +1480,12 @@ dart_ret_t dart_scatter( MPI_Datatype mpi_dtype = dart__mpi__datatype(dtype); MPI_Comm comm; - if (root.id < 0) { + if (dart__unlikely(root.id < 0)) { DART_LOG_ERROR("dart_scatter ! failed: root < 0"); return DART_ERR_INVAL; } - if (teamid == DART_UNDEFINED_TEAM_ID) { + if (dart__unlikely(teamid == DART_UNDEFINED_TEAM_ID)) { DART_LOG_ERROR("dart_scatter ! failed: team may not be DART_UNDEFINED_TEAM_ID"); return DART_ERR_INVAL; } @@ -1642,27 +1493,28 @@ dart_ret_t dart_scatter( /* * MPI uses offset type int, do not copy more than INT_MAX elements: */ - if (nelem > INT_MAX) { + if (dart__unlikely(nelem > INT_MAX)) { DART_LOG_ERROR("dart_scatter ! failed: nelem > INT_MAX"); return DART_ERR_INVAL; } dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); - if (team_data == NULL) { + if (dart__unlikely(team_data == NULL)) { + DART_LOG_ERROR("dart_scatter ! failed: unknown team %d", teamid); return DART_ERR_INVAL; } comm = team_data->comm; - if (MPI_Scatter( - sendbuf, - nelem, - mpi_dtype, - recvbuf, - nelem, - mpi_dtype, - root.id, - comm) != MPI_SUCCESS) { - return DART_ERR_INVAL; - } + CHECK_MPI_RET( + MPI_Scatter( + sendbuf, + nelem, + mpi_dtype, + recvbuf, + nelem, + mpi_dtype, + root.id, + comm), + "MPI_Scatter"); return DART_OK; } diff --git a/dart-impl/mpi/src/dart_globmem.c b/dart-impl/mpi/src/dart_globmem.c index 4dc9657f1..185fa66f9 100644 --- a/dart-impl/mpi/src/dart_globmem.c +++ b/dart-impl/mpi/src/dart_globmem.c @@ -32,10 +32,6 @@ * TODO: add this window to the team_data for DART_TEAM_ALL as segment 0. */ MPI_Win dart_win_local_alloc; -#if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) -MPI_Win dart_sharedmem_win_local_alloc; -char** dart_sharedmem_local_baseptr_set; -#endif dart_ret_t dart_gptr_getaddr(const dart_gptr_t gptr, void **addr) { @@ -231,6 +227,8 @@ dart_team_memalloc_aligned( * !!! (because the shared array has not been allocated correctly)." * !!! * !!! Reproduced on SuperMUC and mpich3.1 on projekt03. + * Related support ticket of MPICH: + * http://trac.mpich.org/projects/mpich/ticket/2178 * * !!! BUG IN OPENMPI 1.10.5 and 2.0.2 * !!! @@ -240,8 +238,6 @@ dart_team_memalloc_aligned( * !!! The issue has been reported. * !!! * - * Related support ticket of MPICH: - * http://trac.mpich.org/projects/mpich/ticket/2178 */ MPI_Comm sharedmem_comm = team_data->sharedmem_comm; @@ -346,8 +342,11 @@ dart_team_memalloc_aligned( } segment->size = nbytes; segment->flags = 0; - segment->win = sharedmem_win; + segment->shmwin = sharedmem_win; + segment->win = team_data->window; segment->selfbaseptr = sub_mem; + segment->dirty = false; + segment->isshm = true; /* -- Updating infos on gptr -- */ @@ -368,6 +367,75 @@ dart_team_memalloc_aligned( return DART_OK; } + +dart_ret_t dart_team_memalloc_aligned_full( + dart_team_t teamid, + size_t nelem, + dart_datatype_t dtype, + dart_gptr_t * gptr) +{ + char *baseptr; + MPI_Win win; + dart_unit_t gptr_unitid = 0; // the team-local ID 0 has the beginning + int dtype_size = dart__mpi__datatype_sizeof(dtype); + MPI_Aint nbytes = nelem * dtype_size; + size_t team_size; + *gptr = DART_GPTR_NULL; + + DART_LOG_TRACE("dart_team_memalloc_aligned : dts:%i nelem:%zu nbytes:%zu", + dtype_size, nelem, nbytes); + + dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); + if (team_data == NULL) { + DART_LOG_ERROR("dart_team_memalloc_aligned ! Unknown team %i", teamid); + return DART_ERR_INVAL; + } + + MPI_Comm comm = team_data->comm; + + dart_segment_info_t *segment = dart_segment_alloc( + &team_data->segdata, DART_SEGMENT_ALLOC); + + if (MPI_Win_allocate( + nbytes, 1, MPI_INFO_NULL, + team_data->comm, &baseptr, &win) != MPI_SUCCESS) { + DART_LOG_ERROR("dart_team_memfree: MPI_Win_allocate failed"); + return DART_ERR_OTHER; + } + + if (MPI_Win_lock_all(0, win) != MPI_SUCCESS) { + DART_LOG_ERROR("dart_team_memfree: MPI_Win_lock_all failed"); + return DART_ERR_OTHER; + } + + if (segment->baseptr != NULL) { + free(segment->baseptr); + segment->baseptr = NULL; + } + + if (segment->disp != NULL) { + free(segment->disp); + segment->disp = NULL; + } + + segment->flags = 0; + segment->selfbaseptr = baseptr; + segment->size = nbytes; + segment->shmwin = MPI_WIN_NULL; + segment->win = win; + segment->dirty = false; + segment->isshm = false; + + + gptr->segid = segment->segid; + gptr->unitid = gptr_unitid; + gptr->teamid = teamid; + gptr->flags = 0; + gptr->addr_or_offs.offset = 0; + + return DART_OK; +} + dart_ret_t dart_team_memfree( dart_gptr_t gptr) { @@ -386,34 +454,47 @@ dart_ret_t dart_team_memfree( return DART_ERR_INVAL; } - MPI_Win win = team_data->window; - - if (dart_segment_get_selfbaseptr(&team_data->segdata, segid, &sub_mem) != DART_OK) { - DART_LOG_ERROR("dart_team_memfree ! Unknown segment %i", segid); + dart_segment_info_t *seginfo = dart_segment_get_info( + &(team_data->segdata), segid); + if (seginfo == NULL) { + DART_LOG_ERROR("dart_team_memfree ! " + "Unknown segment %i on team %i", segid, teamid); return DART_ERR_INVAL; } - /* Detach the window associated with sub-memory to be freed */ - if (sub_mem != NULL) { - MPI_Win_detach(win, sub_mem); - } + + MPI_Win win = seginfo->win; + sub_mem = seginfo->selfbaseptr; /* Free the window's associated sub-memory */ + if (seginfo->isshm) { + /* Detach the window associated with sub-memory to be freed */ + if (sub_mem != NULL) { + MPI_Win_detach(win, sub_mem); + } #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) - MPI_Win sharedmem_win; - if (dart_segment_get_win(&team_data->segdata, segid, &sharedmem_win) != DART_OK) { - return DART_ERR_OTHER; - } - if (MPI_Win_free(&sharedmem_win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_team_memfree: MPI_Win_free failed"); - return DART_ERR_OTHER; - } + MPI_Win sharedmem_win = seginfo->shmwin; + if (MPI_Win_free(&sharedmem_win) != MPI_SUCCESS) { + DART_LOG_ERROR("dart_team_memfree: MPI_Win_free failed"); + return DART_ERR_OTHER; + } #else - if (MPI_Free_mem(sub_mem) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_team_memfree: MPI_Free_mem failed"); - return DART_ERR_OTHER; - } + if (MPI_Free_mem(sub_mem) != MPI_SUCCESS) { + DART_LOG_ERROR("dart_team_memfree: MPI_Free_mem failed"); + return DART_ERR_OTHER; + } #endif + } else { + // full allocation + if (MPI_Win_unlock_all(seginfo->win) != MPI_SUCCESS) { + DART_LOG_ERROR("dart_team_memfree: MPI_Win_unlock_all failed"); + return DART_ERR_OTHER; + } + if (MPI_Win_free(&seginfo->win) != MPI_SUCCESS) { + DART_LOG_ERROR("dart_team_memfree: MPI_Win_free failed"); + return DART_ERR_OTHER; + } + } #if defined(DART_ENABLE_LOGGING) dart_team_unit_t unitid; @@ -450,7 +531,8 @@ dart_team_memregister_aligned( dart_team_data_t *team_data = dart_adapt_teamlist_get(teamid); if (team_data == NULL) { - DART_LOG_ERROR("dart_team_memregister_aligned ! failed: Unknown team %i!", teamid); + DART_LOG_ERROR("dart_team_memregister_aligned ! failed: Unknown team %i!", + teamid); return DART_ERR_INVAL; } @@ -475,7 +557,8 @@ dart_team_memregister_aligned( MPI_Allgather(&disp, 1, MPI_AINT, disp_set, 1, MPI_AINT, comm); segment->size = nbytes; - segment->win = MPI_WIN_NULL; + segment->shmwin = MPI_WIN_NULL; + segment->win = team_data->window; segment->selfbaseptr = (char *)addr; segment->flags = 0; @@ -538,16 +621,19 @@ dart_team_memregister( segment->disp = malloc(size * sizeof(MPI_Aint)); } MPI_Aint * disp_set = segment->disp; - MPI_Comm comm = team_data->comm; - MPI_Win win = team_data->window; + MPI_Comm comm = team_data->comm; + MPI_Win win = team_data->window; MPI_Win_attach(win, addr, nbytes); MPI_Get_address(addr, &disp); MPI_Allgather(&disp, 1, MPI_AINT, disp_set, 1, MPI_AINT, comm); - segment->size = nbytes; - segment->win = MPI_WIN_NULL; + segment->size = nbytes; + segment->shmwin = MPI_WIN_NULL; + segment->win = team_data->window; segment->selfbaseptr = (char *)addr; segment->flags = 0; + segment->dirty = false; + segment->isshm = false; gptr->unitid = gptr_unitid; @@ -589,7 +675,8 @@ dart_team_memderegister( win = team_data->window; - if (dart_segment_get_selfbaseptr(&team_data->segdata, segid, &sub_mem) != DART_OK) { + if (dart_segment_get_selfbaseptr( + &team_data->segdata, segid, &sub_mem) != DART_OK) { DART_LOG_ERROR("dart_team_memderegister ! Unknown segment %i", segid); return DART_ERR_INVAL; } diff --git a/dart-impl/mpi/src/dart_initialization.c b/dart-impl/mpi/src/dart_initialization.c index 0456df9f4..08c469597 100644 --- a/dart-impl/mpi/src/dart_initialization.c +++ b/dart-impl/mpi/src/dart_initialization.c @@ -27,38 +27,11 @@ static int _init_by_dart = 0; static int _dart_initialized = 0; static -dart_ret_t do_init() +dart_ret_t create_local_alloc(dart_team_data_t *team_data) { - /* Initialize the teamlist. */ - dart_adapt_teamlist_init(); - - dart_next_availteamid = DART_TEAM_ALL; - - if (MPI_Comm_dup(MPI_COMM_WORLD, &dart_comm_world) != MPI_SUCCESS) { - DART_LOG_ERROR("Failed to duplicate MPI_COMM_WORLD"); - return DART_ERR_OTHER; - } - - dart_ret_t ret = dart_adapt_teamlist_alloc(DART_TEAM_ALL); - if (ret != DART_OK) { - DART_LOG_ERROR("dart_adapt_teamlist_alloc failed"); - return DART_ERR_OTHER; - } - - if (dart__mpi__datatype_init() != DART_OK) { - return DART_ERR_OTHER; - } - - dart_team_data_t *team_data = dart_adapt_teamlist_get(DART_TEAM_ALL); - - dart_next_availteamid++; - - team_data->comm = DART_COMM_WORLD; - - MPI_Comm_rank(team_data->comm, &team_data->unitid); - MPI_Comm_size(team_data->comm, &team_data->size); - dart_localpool = dart_buddy_new(DART_LOCAL_ALLOC_SIZE); + MPI_Win dart_sharedmem_win_local_alloc; + char* *dart_sharedmem_local_baseptr_set = NULL; #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) @@ -123,6 +96,7 @@ dart_ret_t do_init() MPI_INFO_NULL, &dart_mempool_localalloc); #endif + /* Create a single global win object for dart local * allocation based on the above allocated shared memory. * @@ -135,6 +109,68 @@ dart_ret_t do_init() DART_COMM_WORLD, &dart_win_local_alloc); + /* Start an access epoch on dart_win_local_alloc, and later + * on all the units can access the memory region allocated + * by the local allocation function through + * dart_win_local_alloc. */ + MPI_Win_lock_all(0, dart_win_local_alloc); + + + /* put the localalloc in the segment table */ + dart_segment_info_t *segment = dart_segment_alloc( + &team_data->segdata, DART_SEGMENT_LOCAL_ALLOC); + segment->dirty = false; + segment->flags = 1; + segment->segid = 0; + segment->size = DART_LOCAL_ALLOC_SIZE; + segment->baseptr = dart_sharedmem_local_baseptr_set; + segment->win = dart_win_local_alloc; + segment->shmwin = dart_sharedmem_win_local_alloc; + segment->selfbaseptr = dart_mempool_localalloc; + segment->isshm = true; + // addressing in this window is relative, no need to store displacements + segment->disp = NULL; + + return DART_OK; +} + +static +dart_ret_t do_init() +{ + /* Initialize the teamlist. */ + dart_adapt_teamlist_init(); + + dart_next_availteamid = DART_TEAM_ALL; + + if (MPI_Comm_dup(MPI_COMM_WORLD, &dart_comm_world) != MPI_SUCCESS) { + DART_LOG_ERROR("Failed to duplicate MPI_COMM_WORLD"); + return DART_ERR_OTHER; + } + + dart_ret_t ret = dart_adapt_teamlist_alloc(DART_TEAM_ALL); + if (ret != DART_OK) { + DART_LOG_ERROR("dart_adapt_teamlist_alloc failed"); + return DART_ERR_OTHER; + } + + if (dart__mpi__datatype_init() != DART_OK) { + return DART_ERR_OTHER; + } + + dart_team_data_t *team_data = dart_adapt_teamlist_get(DART_TEAM_ALL); + + dart_next_availteamid++; + + team_data->comm = DART_COMM_WORLD; + + MPI_Comm_rank(team_data->comm, &team_data->unitid); + MPI_Comm_size(team_data->comm, &team_data->size); + + ret = create_local_alloc(team_data); + if (ret != DART_OK) { + return ret; + } + /* Create a dynamic win object for all the dart collective * allocation based on MPI_COMM_WORLD. Return in win. */ MPI_Win win; @@ -142,12 +178,6 @@ dart_ret_t do_init() MPI_INFO_NULL, DART_COMM_WORLD, &win); team_data->window = win; - /* Start an access epoch on dart_win_local_alloc, and later - * on all the units can access the memory region allocated - * by the local allocation function through - * dart_win_local_alloc. */ - MPI_Win_lock_all(0, dart_win_local_alloc); - /* Start an access epoch on win, and later on all the units * can access the attached memory region allocated by the * collective allocation function through win. */ @@ -256,23 +286,23 @@ dart_ret_t dart_exit() return DART_ERR_OTHER; } - dart_segment_fini(&team_data->segdata); + dart_segment_info_t *seginfo = dart_segment_get_info(&team_data->segdata, 0); if (MPI_Win_unlock_all(team_data->window) != MPI_SUCCESS) { DART_LOG_ERROR("%2d: dart_exit: MPI_Win_unlock_all failed", unitid.id); return DART_ERR_OTHER; } /* End the shared access epoch in dart_win_local_alloc. */ - if (MPI_Win_unlock_all(dart_win_local_alloc) != MPI_SUCCESS) { + if (MPI_Win_unlock_all(seginfo->win) != MPI_SUCCESS) { DART_LOG_ERROR("%2d: dart_exit: MPI_Win_unlock_all failed", unitid.id); return DART_ERR_OTHER; } /* -- Free up all the resources for dart programme -- */ - MPI_Win_free(&dart_win_local_alloc); + MPI_Win_free(&seginfo->win); #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) /* Has MPI shared windows: */ - MPI_Win_free(&dart_sharedmem_win_local_alloc); + MPI_Win_free(&seginfo->shmwin); MPI_Comm_free(&(team_data->sharedmem_comm)); #else /* No MPI shared windows: */ @@ -282,10 +312,11 @@ dart_ret_t dart_exit() #endif MPI_Win_free(&team_data->window); + dart_segment_fini(&team_data->segdata); dart_buddy_delete(dart_localpool); #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) - free(team_data->sharedmem_tab); - free(dart_sharedmem_local_baseptr_set); +// free(team_data->sharedmem_tab); +// free(dart_sharedmem_local_baseptr_set); #endif dart_adapt_teamlist_destroy(); diff --git a/dart-impl/mpi/src/dart_segment.c b/dart-impl/mpi/src/dart_segment.c index 3af557e4b..dc62cd0b3 100644 --- a/dart-impl/mpi/src/dart_segment.c +++ b/dart-impl/mpi/src/dart_segment.c @@ -5,27 +5,22 @@ #include #include #include +#include #include +#include -#define DART_SEGMENT_INVALID (INT32_MAX) -struct dart_seghash_elem { - dart_seghash_elem_t *next; - dart_segment_info_t data; -}; - - -static inline int hash_segid(dart_segid_t segid) -{ - /* Simply use the lower bits of the segment ID. - * Since segment IDs are allocated continuously, this is likely to cause - * collisions starting at (segment number == DART_SEGMENT_HASH_SIZE) - * TODO: come up with a random distribution to account for random free'd - * segments? - * */ - return (abs(segid) % DART_SEGMENT_HASH_SIZE); -} +//static inline int hash_segid(dart_segid_t segid) +//{ +// /* Simply use the lower bits of the segment ID. +// * Since segment IDs are allocated continuously, this is likely to cause +// * collisions starting at (segment number == DART_SEGMENT_HASH_SIZE) +// * TODO: come up with a random distribution to account for random free'd +// * segments? +// * */ +// return (abs(segid) % DART_SEGMENT_HASH_SIZE); +//} static inline void register_segment(dart_segmentdata_t *segdata, dart_seghash_elem_t *elem) @@ -35,28 +30,6 @@ register_segment(dart_segmentdata_t *segdata, dart_seghash_elem_t *elem) segdata->hashtab[slot] = elem; } -/** - * Initialize the segment data hash table. - */ -dart_ret_t dart_segment_init(dart_segmentdata_t *segdata, dart_team_t teamid) -{ - memset(segdata->hashtab, 0, - sizeof(dart_seghash_elem_t*) * DART_SEGMENT_HASH_SIZE); - - segdata->team_id = teamid; - segdata->mem_freelist = NULL; - segdata->reg_freelist = NULL; - segdata->memid = 1; - segdata->registermemid = -1; - - // register the segment for non-global allocations on DART_TEAM_ALL - if (teamid == DART_TEAM_ALL) { - dart_seghash_elem_t *elem = calloc(1, sizeof(dart_seghash_elem_t)); - register_segment(segdata, elem); - } - return DART_OK; -} - static dart_segment_info_t * get_segment( dart_segmentdata_t *segdata, dart_segid_t segid) @@ -81,6 +54,30 @@ static dart_segment_info_t * get_segment( return &(elem->data); } +//dart_segment_info_t * dart_segment_get_info( +// dart_segmentdata_t *segdata, +// dart_segid_t segid) +//{ +// return get_segment(segdata, segid); +//} + +/** + * Initialize the segment data hash table. + */ +dart_ret_t dart_segment_init(dart_segmentdata_t *segdata, dart_team_t teamid) +{ + memset(segdata->hashtab, 0, + sizeof(dart_seghash_elem_t*) * DART_SEGMENT_HASH_SIZE); + + segdata->team_id = teamid; + segdata->mem_freelist = NULL; + segdata->reg_freelist = NULL; + segdata->memid = 1; + segdata->registermemid = -1; + + return DART_OK; +} + /** * Allocates a new segment data struct. May be served from a freelist. * @@ -94,7 +91,12 @@ dart_segment_alloc(dart_segmentdata_t *segdata, dart_segment_type type) int16_t segid; dart_seghash_elem_t *elem = NULL; - if (type == DART_SEGMENT_ALLOC) { + if (type == DART_SEGMENT_LOCAL_ALLOC) { + // no need to check for overflow + segid = DART_SEGMENT_LOCAL; + elem = calloc(1, sizeof(dart_seghash_elem_t)); + elem->data.segid = segid; + } else if (type == DART_SEGMENT_ALLOC) { if (segdata->mem_freelist != NULL) { elem = segdata->mem_freelist; segid = elem->data.segid; @@ -140,7 +142,7 @@ dart_segment_alloc(dart_segmentdata_t *segdata, dart_segment_type type) } #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS) -dart_ret_t dart_segment_get_win( +dart_ret_t dart_segment_get_shmwin( dart_segmentdata_t * segdata, int16_t segid, MPI_Win * win) @@ -151,7 +153,7 @@ dart_ret_t dart_segment_get_win( return DART_ERR_INVAL; } - *win = segment->win; + *win = segment->shmwin; return DART_OK; } #endif @@ -161,7 +163,6 @@ dart_ret_t dart_segment_get_disp(dart_segmentdata_t * segdata, dart_team_unit_t rel_unitid, MPI_Aint * disp_s) { - MPI_Aint trans_disp = 0; *disp_s = 0; DART_LOG_TRACE("dart_segment_get_disp() " @@ -175,10 +176,39 @@ dart_ret_t dart_segment_get_disp(dart_segmentdata_t * segdata, return DART_ERR_INVAL; } - trans_disp = segment->disp[rel_unitid.id]; - *disp_s = trans_disp; + *disp_s = segment->disp[rel_unitid.id]; DART_LOG_TRACE("dart_segment_get_disp > disp:%"PRIu64"", - (unsigned long)trans_disp); + (unsigned long)*disp_s); + return DART_OK; +} + +dart_ret_t dart_segment_get_dirty( + dart_segmentdata_t * segdata, + int16_t segid, + bool * dirty) +{ + dart_segment_info_t *segment = get_segment(segdata, segid); + if (segment == NULL) { + DART_LOG_ERROR("dart_segment_get_dirty ! Invalid segment ID %i on team %i", + segid, segdata->team_id); + return DART_ERR_INVAL; + } + *dirty = segment->dirty; + return DART_OK; +} + +dart_ret_t dart_segment_set_dirty( + dart_segmentdata_t * segdata, + int16_t segid, + bool dirty) +{ + dart_segment_info_t *segment = get_segment(segdata, segid); + if (segment == NULL) { + DART_LOG_ERROR("dart_segment_get_dirty ! Invalid segment ID %i on team %i", + segid, segdata->team_id); + return DART_ERR_INVAL; + } + segment->dirty = dirty; return DART_OK; } @@ -278,7 +308,7 @@ static inline void free_segment_info(dart_segment_info_t *seg_info){ seg_info->baseptr = NULL; } #endif - memset(seg_info, 0, sizeof(dart_segment_info_t)); +// memset(seg_info, 0, sizeof(dart_segment_info_t)); } /** @@ -338,7 +368,10 @@ static void clear_segdata_list(dart_seghash_elem_t *listhead) dart_seghash_elem_t *tmp = elem; elem = tmp->next; tmp->next = NULL; - free_segment_info(&tmp->data); + // segment info should have been cleared in dart_segment_fini + if (tmp->data.segid != DART_SEGMENT_LOCAL) { + free_segment_info(&tmp->data); + } free(tmp); } } @@ -349,7 +382,15 @@ static void clear_segdata_list(dart_seghash_elem_t *listhead) dart_ret_t dart_segment_fini( dart_segmentdata_t * segdata) { - // clear the hash table + // only clear up the local allocation segment in DART_TEAM_ALL + if (segdata->team_id == DART_TEAM_ALL) { + dart_segment_info_t *seg = get_segment( + &(dart_adapt_teamlist_get(DART_TEAM_ALL)->segdata), + DART_SEGMENT_LOCAL); + free_segment_info(seg); + } + + // clear the remaining hash table for (int i = 0; i < DART_SEGMENT_HASH_SIZE; i++) { clear_segdata_list(segdata->hashtab[i]); segdata->hashtab[i] = NULL; @@ -359,5 +400,6 @@ dart_ret_t dart_segment_fini( clear_segdata_list(segdata->reg_freelist); segdata->reg_freelist = NULL; + return DART_OK; } diff --git a/dart-impl/mpi/src/dart_team_private.c b/dart-impl/mpi/src/dart_team_private.c index ff970a809..0f12e091b 100644 --- a/dart-impl/mpi/src/dart_team_private.c +++ b/dart-impl/mpi/src/dart_team_private.c @@ -277,9 +277,6 @@ dart_ret_t dart_allocate_shared_comm(dart_team_data_t *team_data) sharedmem_comm, &(team_data->sharedmem_nodesize)); - // dart_unit_mapping[index] = (int*)malloc ( - // dart_sharedmem_size[index] * sizeof (int)); - MPI_Comm_group(sharedmem_comm, &sharedmem_group); MPI_Comm_group(team_data->comm, &group_all); @@ -293,8 +290,6 @@ dart_ret_t dart_allocate_shared_comm(dart_team_data_t *team_data) sharedmem_ranks[i] = i; } - // MPI_Group_translate_ranks (sharedmem_group, dart_sharedmem_size[index], - // sharedmem_ranks, group_all, dart_unit_mapping[index]); MPI_Group_translate_ranks( sharedmem_group, team_data->sharedmem_nodesize, diff --git a/dash/include/dash/Atomic.h b/dash/include/dash/Atomic.h index 4d0a6f3c0..d2c99321e 100644 --- a/dash/include/dash/Atomic.h +++ b/dash/include/dash/Atomic.h @@ -129,6 +129,7 @@ std::ostream & operator<<( } // namespace dash #include +#include #include #endif // DASH__ATOMIC_H__INCLUDED diff --git a/dash/include/dash/GlobAsyncRef.h b/dash/include/dash/GlobAsyncRef.h index cb40f8095..0561b718c 100644 --- a/dash/include/dash/GlobAsyncRef.h +++ b/dash/include/dash/GlobAsyncRef.h @@ -217,7 +217,8 @@ class GlobAsyncRef dart_storage_t ds = dash::dart_storage(1); DASH_ASSERT_RETURNS( dart_get_blocking( - static_cast(&_value), _gptr, ds.nelem, ds.dtype), + static_cast(&_value), _gptr, + ds.nelem, ds.dtype, DART_FLAG_ORDERED), DART_OK ); } @@ -298,7 +299,9 @@ class GlobAsyncRef *tptr = *_lptr; } else { dart_storage_t ds = dash::dart_storage(1); - dart_get(static_cast(tptr), _gptr, ds.nelem, ds.dtype); + dart_get( + static_cast(tptr), _gptr, + ds.nelem, ds.dtype, DART_FLAG_ORDERED); } } diff --git a/dash/include/dash/GlobRef.h b/dash/include/dash/GlobRef.h index dc31591f3..d54c85d69 100644 --- a/dash/include/dash/GlobRef.h +++ b/dash/include/dash/GlobRef.h @@ -170,7 +170,8 @@ class GlobRef nonconst_value_type t; dart_storage_t ds = dash::dart_storage(1); DASH_ASSERT_RETURNS( - dart_get_blocking(static_cast(&t), _gptr, ds.nelem, ds.dtype), + dart_get_blocking( + static_cast(&t), _gptr, ds.nelem, ds.dtype, DART_FLAG_ORDERED), DART_OK ); DASH_LOG_TRACE_VAR("GlobRef.T >", _gptr); @@ -225,7 +226,8 @@ class GlobRef nonconst_value_type t; dart_storage_t ds = dash::dart_storage(1); DASH_ASSERT_RETURNS( - dart_get_blocking(static_cast(&t), _gptr, ds.nelem, ds.dtype), + dart_get_blocking( + static_cast(&t), _gptr, ds.nelem, ds.dtype, DART_FLAG_ORDERED), DART_OK ); return t; @@ -236,7 +238,9 @@ class GlobRef DASH_LOG_TRACE_VAR("GlobRef.T()", _gptr); dart_storage_t ds = dash::dart_storage(1); DASH_ASSERT_RETURNS( - dart_get_blocking(static_cast(tptr), _gptr, ds.nelem, ds.dtype), + dart_get_blocking( + static_cast(tptr), _gptr, + ds.nelem, ds.dtype, DART_FLAG_ORDERED), DART_OK ); } @@ -246,7 +250,9 @@ class GlobRef DASH_LOG_TRACE_VAR("GlobRef.T()", _gptr); dart_storage_t ds = dash::dart_storage(1); DASH_ASSERT_RETURNS( - dart_get_blocking(static_cast(&tref), _gptr, ds.nelem, ds.dtype), + dart_get_blocking( + static_cast(&tref), _gptr, + ds.nelem, ds.dtype, DART_FLAG_ORDERED), DART_OK ); } diff --git a/dash/include/dash/GlobSharedRef.h b/dash/include/dash/GlobSharedRef.h index 132b65ea0..0c49fc64a 100644 --- a/dash/include/dash/GlobSharedRef.h +++ b/dash/include/dash/GlobSharedRef.h @@ -177,7 +177,8 @@ class GlobSharedRef DASH_LOG_TRACE_VAR("GlobSharedRef.T()", _gptr); T t; dart_storage_t ds = dash::dart_storage(1); - dart_get_blocking(static_cast(&t), _gptr, ds.nelem, ds.dtype); + dart_get_blocking( + static_cast(&t), _gptr, ds.nelem, ds.dtype, DART_FLAG_ORDERED); return t; } DASH_THROW( @@ -202,7 +203,8 @@ class GlobSharedRef } else if (!DART_GPTR_ISNULL(_gptr)) { DASH_LOG_TRACE_VAR("GlobSharedRef.T()", _gptr); dart_storage_t ds = dash::dart_storage(1); - dart_get_blocking(static_cast(&t), _gptr, ds.nelem, ds.dtype); + dart_get_blocking( + static_cast(&t), _gptr, ds.nelem, ds.dtype, DART_FLAG_ORDERED); } return t; } diff --git a/dash/include/dash/Onesided.h b/dash/include/dash/Onesided.h index 59e0fa4cf..ec250eb72 100644 --- a/dash/include/dash/Onesided.h +++ b/dash/include/dash/Onesided.h @@ -74,7 +74,8 @@ void get_value_async( dart_get(ptr, gptr.dart_gptr(), ds.nelem, - ds.dtype), + ds.dtype, + DART_FLAG_ORDERED), DART_OK); } @@ -117,7 +118,8 @@ void get_value( dart_get_blocking(ptr, gptr.dart_gptr(), ds.nelem, - ds.dtype), + ds.dtype, + DART_FLAG_ORDERED), DART_OK); } diff --git a/dash/include/dash/algorithm/Copy.h b/dash/include/dash/algorithm/Copy.h index 368d58ed9..7e8860466 100644 --- a/dash/include/dash/algorithm/Copy.h +++ b/dash/include/dash/algorithm/Copy.h @@ -180,7 +180,8 @@ ValueType * copy_impl( cur_out_first, cur_in_first.dart_gptr(), ds.nelem, - ds.dtype), + ds.dtype, + DART_FLAG_NONE), DART_OK); num_elem_copied += num_copy_elem; } @@ -233,7 +234,8 @@ ValueType * copy_impl( dest_ptr, src_gptr, ds.nelem, - ds.dtype) + ds.dtype, + DART_FLAG_NONE) != DART_OK) { DASH_LOG_ERROR("dash::copy_impl", "dart_get failed"); DASH_THROW( @@ -333,7 +335,8 @@ dash::Future copy_async_impl( cur_out_first, cur_in_first.dart_gptr(), ds.nelem, - ds.dtype), + ds.dtype, + DART_FLAG_NONE), DART_OK); req_handles.push_back(in_first.dart_gptr()); #else @@ -345,7 +348,8 @@ dash::Future copy_async_impl( cur_in_first.dart_gptr(), ds.nelem, ds.dtype, - &get_handle), + &get_handle, + DART_FLAG_NONE), DART_OK); if (get_handle != NULL) { req_handles.push_back(get_handle); @@ -403,7 +407,8 @@ dash::Future copy_async_impl( dest_ptr, src_gptr, ds.nelem, - ds.dtype) + ds.dtype, + DART_FLAG_NONE) != DART_OK) { DASH_LOG_ERROR("dash::copy_async_impl", "dart_get failed"); DASH_THROW( @@ -419,7 +424,8 @@ dash::Future copy_async_impl( src_gptr, ds.nelem, ds.dtype, - &get_handle), + &get_handle, + DART_FLAG_NONE), DART_OK); if (get_handle != NULL) { req_handles.push_back(get_handle); diff --git a/dash/include/dash/allocator/SymmetricAllocator.h b/dash/include/dash/allocator/SymmetricAllocator.h index 3ae31ead8..c7d2c4f01 100644 --- a/dash/include/dash/allocator/SymmetricAllocator.h +++ b/dash/include/dash/allocator/SymmetricAllocator.h @@ -190,7 +190,11 @@ class SymmetricAllocator "number of local values:", num_local_elem); pointer gptr = DART_GPTR_NULL; dart_storage_t ds = dart_storage(num_local_elem); +#ifdef DART_FULL_ALLOC + if (dart_team_memalloc_aligned_full(_team_id, ds.nelem, ds.dtype, &gptr) +#else if (dart_team_memalloc_aligned(_team_id, ds.nelem, ds.dtype, &gptr) +#endif == DART_OK) { _allocated.push_back(gptr); } else { diff --git a/dash/include/dash/atomic/GlobAsyncAtomicRef.h b/dash/include/dash/atomic/GlobAsyncAtomicRef.h new file mode 100644 index 000000000..67f5c4c81 --- /dev/null +++ b/dash/include/dash/atomic/GlobAsyncAtomicRef.h @@ -0,0 +1,315 @@ +#ifndef DASH__ASYNC_ATOMIC_GLOBREF_H_ +#define DASH__ASYNC_ATOMIC_GLOBREF_H_ + +#include +#include +#include +#include + + +namespace dash { + +// forward decls +template +class Atomic; + +template +class Shared; + +/** + * Specialization for atomic values. All atomic operations are + * \c const as the \c GlobRef does not own the atomic values. + */ +template +class GlobAsyncRef> +{ + /* Notes on type compatibility: + * + * - The general support of atomic operations on values of type T is + * checked in `dash::Atomic` and is not verified here. + * - Whether arithmetic operations (like `fetch_add`) are supported + * for values of type T is implicitly tested in the DASH operation + * types (like `dash::plus`) and is not verified here. + * + */ + + template + friend std::ostream & operator<<( + std::ostream & os, + const GlobAsyncRef & gref); + +public: + typedef T + value_type; + typedef GlobAsyncRef> + const_type; + +private: + typedef dash::Atomic atomic_t; + typedef GlobAsyncRef self_t; + +private: + dart_gptr_t _gptr; + +public: + /** + * Default constructor, creates an GlobRef object referencing an element in + * global memory. + */ + GlobAsyncRef() + : _gptr(DART_GPTR_NULL) { + } + + /** + * Constructor, creates an GlobRef object referencing an element in global + * memory. + */ + template + explicit GlobAsyncRef( + /// Pointer to referenced object in global memory + GlobPtr & gptr) + : GlobAsyncRef(gptr.dart_gptr()) + { } + + /** + * Constructor, creates an GlobRef object referencing an element in global + * memory. + */ + template + GlobAsyncRef( + /// Pointer to referenced object in global memory + const GlobPtr & gptr) + : GlobAsyncRef(gptr.dart_gptr()) + { } + + /** + * Constructor, creates an GlobRef object referencing an element in global + * memory. + */ + explicit GlobAsyncRef(dart_gptr_t dart_gptr) + : _gptr(dart_gptr) + { + DASH_LOG_TRACE_VAR("GlobRef(dart_gptr_t)", dart_gptr); + } + + /** + * Copy constructor. + */ + GlobAsyncRef( + /// GlobRef instance to copy. + const GlobRef & other) + : _gptr(other._gptr) + { } + + self_t & operator=(const self_t & other) = delete; + + inline bool operator==(const self_t & other) const noexcept + { + return _gptr == other._gptr; + } + + inline bool operator!=(const self_t & other) const noexcept + { + return !(*this == other); + } + + inline bool operator==(const T & value) const = delete; + inline bool operator!=(const T & value) const = delete; + + operator T() const { + return load(); + } + + operator GlobPtr() const { + DASH_LOG_TRACE("GlobRef.GlobPtr()", "conversion operator"); + DASH_LOG_TRACE_VAR("GlobRef.T()", _gptr); + return GlobPtr(_gptr); + } + + dart_gptr_t dart_gptr() const { + return _gptr; + } + + /** + * Checks whether the globally referenced element is in + * the calling unit's local memory. + */ + bool is_local() const { + return GlobPtr(_gptr).is_local(); + } + + void flush() { + dart_flush(_gptr); + } + + void flush_local() { + dart_flush_local(_gptr); + } + + /** + * Set the value of the shared atomic variable. + */ + void set(const T * value) const + { + DASH_LOG_DEBUG_VAR("GlobAsyncRef.store()", *value); + DASH_LOG_TRACE_VAR("GlobAsyncRef.store", _gptr); + dart_ret_t ret = dart_accumulate( + _gptr, + reinterpret_cast(value), + 1, + dash::dart_punned_datatype::value, + DART_OP_REPLACE); + DASH_ASSERT_EQ(DART_OK, ret, "dart_accumulate failed"); + DASH_LOG_DEBUG("GlobAsyncRef.store >"); + } + + /// atomically fetches value + T get() const + { + DASH_LOG_DEBUG("GlobAsyncRef.load()"); + DASH_LOG_TRACE_VAR("GlobAsyncRef.load", _gptr); + value_type nothing; + value_type result; + dart_ret_t ret = dart_fetch_and_op( + _gptr, + reinterpret_cast(¬hing), + reinterpret_cast(&result), + dash::dart_punned_datatype::value, + DART_OP_NO_OP); + flush_local(); + DASH_ASSERT_EQ(DART_OK, ret, "dart_accumulate failed"); + DASH_LOG_DEBUG_VAR("GlobAsyncRef.get >", result); + return result; + } + + /// atomically fetches value + void get(T * result) const + { + DASH_LOG_DEBUG("GlobAsyncRef.load()"); + DASH_LOG_TRACE_VAR("GlobAsyncRef.load", _gptr); + value_type nothing; + dart_ret_t ret = dart_fetch_and_op( + _gptr, + reinterpret_cast(¬hing), + reinterpret_cast(result), + dash::dart_punned_datatype::value, + DART_OP_NO_OP); + DASH_ASSERT_EQ(DART_OK, ret, "dart_accumulate failed"); + } + + /** + * Get the value of the shared atomic variable. + */ + inline T load() const { + return get(); + } + + /** + * Atomically executes specified operation on the referenced shared value. + */ + template + void op( + BinaryOp binary_op, + /// Value to be added to global atomic variable. + const T * value) const + { + DASH_LOG_DEBUG_VAR("GlobAsyncRef.op()", *value); + DASH_LOG_TRACE_VAR("GlobAsyncRef.op", _gptr); + DASH_LOG_TRACE("GlobAsyncRef.op", "dart_accumulate"); + dart_ret_t ret = dart_accumulate( + _gptr, + reinterpret_cast(value), + 1, + dash::dart_punned_datatype::value, + binary_op.dart_operation()); + DASH_ASSERT_EQ(DART_OK, ret, "dart_accumulate failed"); + DASH_LOG_DEBUG_VAR("GlobAsyncRef.op >", *value); + } + + /** + * Atomic fetch-and-op operation on the referenced shared value. + * + * \return The value of the referenced shared variable before the + * operation. + */ + template + void fetch_op( + BinaryOp binary_op, + /// Value to be added to global atomic variable. + const T * value, + T * result) const + { + DASH_LOG_DEBUG_VAR("GlobAsyncRef.fetch_op()", *value); + DASH_LOG_TRACE_VAR("GlobAsyncRef.fetch_op", _gptr); + DASH_LOG_TRACE_VAR("GlobAsyncRef.fetch_op", typeid(*value).name()); + dart_ret_t ret = dart_fetch_and_op( + _gptr, + reinterpret_cast(value), + reinterpret_cast(result), + dash::dart_punned_datatype::value, + binary_op.dart_operation()); + DASH_ASSERT_EQ(DART_OK, ret, "dart_fetch_op failed"); + } + + /** + * Atomically exchanges value + */ + void exchange(const T * value, T * result) const { + fetch_op(dash::second(), value, result); + } + + /** + * Atomically compares the value with the value of expected and if thosei + * are bitwise-equal, replaces the former with desired. + * + * \return True if value is exchanged + * + * \see \c dash::atomic::compare_exchange + */ + void compare_exchange( + const T * expected, + const T * desired, + T * result) const { + DASH_LOG_DEBUG_VAR("GlobAsyncRef.compare_exchange()", *desired); + DASH_LOG_TRACE_VAR("GlobAsyncRef.compare_exchange", _gptr); + DASH_LOG_TRACE_VAR("GlobAsyncRef.compare_exchange", *expected); + DASH_LOG_TRACE_VAR( + "GlobAsyncRef.compare_exchange", typeid(*desired).name()); + dart_ret_t ret = dart_compare_and_swap( + _gptr, + reinterpret_cast(desired), + reinterpret_cast(expected), + reinterpret_cast(result), + dash::dart_punned_datatype::value); + DASH_ASSERT_EQ(DART_OK, ret, "dart_compare_and_swap failed"); + } + + /** + * DASH specific variant which is faster than \c fetch_add + * but does not return value + */ + void add(const T * value) const + { + op(dash::plus(), value); + } + + /** + * Atomic fetch-and-add operation on the referenced shared value. + * + * \return The value of the referenced shared variable before the + * operation. + */ + void fetch_add( + /// Value to be added to global atomic variable. + const T * value, + T * result) const + { + fetch_op(dash::plus(), value, result); + } + +}; + +} // namespace dash + +#endif // DASH__ASYNC_ATOMIC_GLOBREF_H_ + diff --git a/dash/include/dash/experimental/HaloMatrix.h b/dash/include/dash/experimental/HaloMatrix.h index 9ed752cb3..5bd09d2a4 100644 --- a/dash/include/dash/experimental/HaloMatrix.h +++ b/dash/include/dash/experimental/HaloMatrix.h @@ -231,7 +231,7 @@ class HaloMatrix auto it = data.blockview.begin(); for(auto i = 0; i < data.num_handles; ++i, it += data.cont_elems){ dart_storage_t ds = dash::dart_storage(data.cont_elems); - dart_get_handle (off + ds.nelem * i, it.dart_gptr(), ds.nelem, ds.dtype, &(data.handle[i])); + dart_get_handle (off + ds.nelem * i, it.dart_gptr(), ds.nelem, ds.dtype, &(data.handle[i]), DART_FLAG_NONE); } if(!async) dart_waitall(data.handle, data.num_handles); diff --git a/dash/include/dash/memory/GlobHeapMem.h b/dash/include/dash/memory/GlobHeapMem.h index 99292a9ba..6fce6d896 100644 --- a/dash/include/dash/memory/GlobHeapMem.h +++ b/dash/include/dash/memory/GlobHeapMem.h @@ -1184,7 +1184,9 @@ class GlobHeapMem // global source: u_attach_buckets_sizes_gptr, // request bytes (~= number of sizes) from unit u: - ds.nelem, ds.dtype), + ds.nelem, ds.dtype, + // no previous local put to that location + DART_FLAG_NONE), DART_OK); // Update local snapshot of cumulative bucket sizes at unit u: for (int bi = 0; bi < u_num_attach_buckets; ++bi) { diff --git a/dash/test/dart/DARTMemAllocTest.cc b/dash/test/dart/DARTMemAllocTest.cc index 9d8f5972a..2605a5a9b 100644 --- a/dash/test/dart/DARTMemAllocTest.cc +++ b/dash/test/dart/DARTMemAllocTest.cc @@ -74,23 +74,49 @@ TEST_F(DARTMemAllocTest, LocalAlloc) arr.local[0] = gptr; arr.barrier(); + // read from the neighbor value_t neighbor_val; - size_t neighbor_id = (dash::myid().id + 1) % dash::size(); + size_t rneighbor_id = (dash::myid().id + 1) % dash::size(); + size_t lneighbor_id = (dash::myid().id + dash::size() - 1) % dash::size(); dart_storage_t ds = dash::dart_storage(1); ASSERT_EQ_U( DART_OK, dart_get_blocking( &neighbor_val, - arr[neighbor_id], + arr[rneighbor_id], ds.nelem, - ds.dtype)); + ds.dtype, + DART_FLAG_NONE)); ASSERT_EQ_U( - neighbor_id, + rneighbor_id, neighbor_val); arr.barrier(); + // write to the neighbor val + + + int val = dash::myid().id; + ASSERT_EQ_U( + DART_OK, + dart_put_blocking( + arr[rneighbor_id], + &val, + ds.nelem, + ds.dtype)); + + arr.barrier(); + + // read local value + ASSERT_EQ_U( + DART_OK, + dart_get_blocking(&val, gptr, ds.nelem, ds.dtype, DART_FLAG_NONE)); + + ASSERT_EQ_U( + lneighbor_id, + val); + ASSERT_EQ_U( DART_OK, dart_memfree(gptr)); diff --git a/dash/test/dart/DARTOnesidedTest.cc b/dash/test/dart/DARTOnesidedTest.cc index d483fe5b1..18672b119 100644 --- a/dash/test/dart/DARTOnesidedTest.cc +++ b/dash/test/dart/DARTOnesidedTest.cc @@ -29,7 +29,8 @@ TEST_F(DARTOnesidedTest, GetBlockingSingleBlock) local_array, // lptr dest (array.begin() + g_src_index).dart_gptr(), // gptr start ds.nelem, - ds.dtype + ds.dtype, + DART_FLAG_NONE ); for (size_t l = 0; l < block_size; ++l) { value_t expected = array[g_src_index + l]; @@ -70,7 +71,8 @@ TEST_F(DARTOnesidedTest, GetBlockingSingleBlockTeam) local_array, // lptr dest (array.begin() + g_src_index).dart_gptr(), // gptr start ds.nelem, - ds.dtype + ds.dtype, + DART_FLAG_NONE ); for (size_t l = 0; l < block_size; ++l) { value_t expected = array[g_src_index + l]; @@ -84,10 +86,10 @@ TEST_F(DARTOnesidedTest, GetBlockingTwoBlocks) const size_t block_size = 10; const size_t num_elem_copy = 2 * block_size; size_t num_elem_total = dash::size() * block_size; - dash::Array array(num_elem_total, dash::BLOCKED); if (dash::size() < 2) { - return; + SKIP_TEST_MSG("requires at least 2 units"); } + dash::Array array(num_elem_total, dash::BLOCKED); // Array to store local copy: int local_array[num_elem_copy]; // Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ] @@ -102,7 +104,8 @@ TEST_F(DARTOnesidedTest, GetBlockingTwoBlocks) local_array, // lptr dest array.begin().dart_gptr(), // gptr start ds.nelem, // number of elements - ds.dtype // data type + ds.dtype, // data type + DART_FLAG_NONE // no need to synchronize previous put ); // Fails for elements in second block, i.e. for l < num_elem_copy: for (size_t l = 0; l < block_size; ++l) { @@ -117,10 +120,10 @@ TEST_F(DARTOnesidedTest, GetHandleAllRemote) const size_t block_size = 5000; size_t num_elem_copy = (dash::size() - 1) * block_size; size_t num_elem_total = dash::size() * block_size; - dash::Array array(num_elem_total, dash::BLOCKED); if (dash::size() < 2) { - return; + SKIP_TEST_MSG("requires at least 2 units"); } + dash::Array array(num_elem_total, dash::BLOCKED); // Array to store local copy: int * local_array = new int[num_elem_copy]; // Array of handles, one for each dart_get_handle: @@ -148,7 +151,8 @@ TEST_F(DARTOnesidedTest, GetHandleAllRemote) (array.begin() + (u * block_size)).dart_gptr(), ds.nelem, ds.dtype, - &handle) + &handle, + DART_FLAG_NONE) ); LOG_MESSAGE("dart_get_handle returned handle %p", static_cast(handle)); @@ -176,3 +180,35 @@ TEST_F(DARTOnesidedTest, GetHandleAllRemote) delete[] local_array; ASSERT_EQ_U(num_elem_copy, l); } + +TEST_F(DARTOnesidedTest, ConsistentAsyncGet) +{ + typedef int value_t; + if (dash::size() < 2) { + SKIP_TEST_MSG("requires at least 2 units"); + } + + dash::Array array(dash::size(), dash::BLOCKED); + array.local[0] = dash::myid(); + dash::barrier(); + + int lneighbor = (dash::myid() + dash::size() - 1) % dash::size(); + int rneighbor = (dash::myid() + 1) % dash::size(); + + dart_gptr_t gptr = array[lneighbor].dart_gptr(); + + dart_storage_t ds = dash::dart_storage(1); + value_t pval = dash::myid() * 100; + // async update + dart_put(gptr, &pval, ds.nelem, ds.dtype); + value_t gval; + // retrieve the value again + dart_get_blocking(&gval, gptr, ds.nelem, ds.dtype, DART_FLAG_NONE); + + ASSERT_EQ_U(gval, dash::myid() * 100); + + array.barrier(); + + ASSERT_EQ_U(array.local[0], rneighbor * 100); + +} diff --git a/dash/test/dart/ThreadsafetyTest.cc b/dash/test/dart/ThreadsafetyTest.cc index a81d4977b..e5d72f8f6 100644 --- a/dash/test/dart/ThreadsafetyTest.cc +++ b/dash/test/dart/ThreadsafetyTest.cc @@ -203,7 +203,7 @@ TEST_F(ThreadsafetyTest, ConcurrentAttach) { gptr_r.unitid = (team->myid() + 1) % team->size(); dart_storage_t ds = dash::dart_storage(elem_per_thread); ASSERT_EQ_U( - dart_get_blocking(check, gptr_r, ds.nelem, ds.dtype), + dart_get_blocking(check, gptr_r, ds.nelem, ds.dtype, DART_FLAG_NONE), DART_OK); team->barrier(); diff --git a/dash/test/memory/GlobHeapMemTest.cc b/dash/test/memory/GlobHeapMemTest.cc index 7ed369551..9bdf6cb01 100644 --- a/dash/test/memory/GlobHeapMemTest.cc +++ b/dash/test/memory/GlobHeapMemTest.cc @@ -224,7 +224,8 @@ TEST_F(GlobHeapMemTest, UnbalancedRealloc) // request value via DART global pointer: value_t dart_gptr_value; dart_storage_t ds = dash::dart_storage(1); - dart_get_blocking(&dart_gptr_value, gptr, ds.nelem, ds.dtype); + dart_get_blocking( + &dart_gptr_value, gptr, ds.nelem, ds.dtype, DART_FLAG_NONE); DASH_LOG_TRACE_VAR("GlobHeapMemTest.UnbalancedRealloc", dart_gptr_value); diff --git a/dash/test/types/AtomicTest.cc b/dash/test/types/AtomicTest.cc index f8db77b37..414288778 100644 --- a/dash/test/types/AtomicTest.cc +++ b/dash/test/types/AtomicTest.cc @@ -507,3 +507,53 @@ TEST_F(AtomicTest, AtomicSignal){ ASSERT_GT_U(count, 0); } } + + +TEST_F(AtomicTest, AsyncAtomic){ + using value_t = int; + using atom_t = dash::Atomic; + using array_t = dash::Array; + const value_t zero = 0; + const value_t one = 1; + const value_t three = 3; + value_t res; + + array_t array(dash::size()); + // async set + array.async[dash::myid()].set(&zero); + array.flush(); + + ASSERT_EQ_U(zero, array[dash::myid()]); + + dash::barrier(); + + int neighbor = (dash::myid() + 1) % dash::size(); + auto neighbor_ref = array.async[neighbor]; + + // async get + neighbor_ref.get(&res); + neighbor_ref.flush(); + ASSERT_EQ_U(zero, res); + + // async op and fetch_op + neighbor_ref.add(&one); + neighbor_ref.add(&one); + neighbor_ref.fetch_add(&one, &res); + neighbor_ref.flush(); + ASSERT_EQ_U(2, res); + + // compare_exchange + neighbor_ref.compare_exchange(&three, &one, &res); + neighbor_ref.flush(); + ASSERT_EQ_U(three, res); + + neighbor_ref.exchange(&zero, &res); + neighbor_ref.flush(); + ASSERT_EQ_U(one, res); + + + dash::barrier(); + + ASSERT_EQ_U(zero, array[dash::myid()]); + +}