diff --git a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c index 8e94c9d26c3..6b9431890ef 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c +++ b/src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c @@ -61,13 +61,13 @@ int MPIDI_OFI_gpu_pipeline_send(MPIR_Request * sreq, const void *send_buf, } } - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) = n_chunks; - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data) = cq_data; - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr) = remote_addr; - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local) = vci_local; - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx) = ctx_idx; - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.match_bits) = match_bits; - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) = pipeline_tag; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) = n_chunks; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.cq_data) = cq_data; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.remote_addr) = remote_addr; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.vci_local) = vci_local; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.ctx_idx) = ctx_idx; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.match_bits) = match_bits; + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.pipeline_tag) = pipeline_tag; struct pipeline_header hdr; hdr.n_chunks = n_chunks; @@ -193,7 +193,7 @@ static int send_copy_poll(MPIR_Async_thing * thing) static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint chunk_sz) { int mpi_errno = MPI_SUCCESS; - int vci_local = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.vci_local); + int vci_local = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.vci_local); struct chunk_req *chunk_req = MPL_malloc(sizeof(struct chunk_req), MPL_MEM_BUFFER); MPIR_Assertp(chunk_req); @@ -202,10 +202,10 @@ static void send_copy_complete(MPIR_Request * sreq, const void *buf, MPI_Aint ch chunk_req->event_id = MPIDI_OFI_EVENT_SEND_GPU_PIPELINE; chunk_req->buf = (void *) buf; - int ctx_idx = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.ctx_idx); - fi_addr_t remote_addr = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.remote_addr); - uint64_t cq_data = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.cq_data); - uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, pipeline_info.send.pipeline_tag) | + int ctx_idx = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.ctx_idx); + fi_addr_t remote_addr = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.remote_addr); + uint64_t cq_data = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.cq_data); + uint64_t match_bits = MPIDI_OFI_REQUEST(sreq, u.pipeline_send.pipeline_tag) | MPIDI_OFI_GPU_PIPELINE_SEND; MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI(vci_local).lock); MPIDI_OFI_CALL_RETRY(fi_tsenddata(MPIDI_OFI_global.ctx[ctx_idx].tx, @@ -234,8 +234,8 @@ int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Reques MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, host_buf); - MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) -= 1; - if (MPIDI_OFI_REQUEST(sreq, pipeline_info.send.num_remain) == 0) { + MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) -= 1; + if (MPIDI_OFI_REQUEST(sreq, u.pipeline_send.num_remain) == 0) { MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype)); MPIDI_Request_complete_fast(sreq); } @@ -263,20 +263,20 @@ int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq, int mpi_errno = MPI_SUCCESS; /* The 1st recv is an empty chunk for matching. We need initialize rreq. */ - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset) = 0; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) = 0; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 0; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = false; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr) = remote_addr; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.vci_local) = vci_local; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits) = match_bits; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.mask_bits) = mask_bits; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.ctx_idx) = ctx_idx; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset) = 0; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) = 0; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = 0; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync) = false; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.remote_addr) = remote_addr; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.vci_local) = vci_local; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.match_bits) = match_bits; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.mask_bits) = mask_bits; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.ctx_idx) = ctx_idx; /* Save original buf, datatype and count */ - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.buf) = recv_buf; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.count) = count; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.datatype) = datatype; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.buf) = recv_buf; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.count) = count; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.datatype) = datatype; struct recv_alloc *p; p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER); @@ -313,7 +313,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) MPIR_Request *rreq = p->rreq; /* arbitrary threshold */ - if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) > 1) { + if (MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) > 1) { return MPIR_ASYNC_THING_NOPROGRESS; } @@ -323,10 +323,10 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) return MPIR_ASYNC_THING_NOPROGRESS; } - fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.remote_addr); - int ctx_idx = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.ctx_idx); + fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.remote_addr); + int ctx_idx = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.ctx_idx); int vci = MPIDI_Request_get_vci(rreq); - uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.mask_bits); + uint64_t mask_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.mask_bits); struct chunk_req *chunk_req; chunk_req = MPL_malloc(sizeof(*chunk_req), MPL_MEM_BUFFER); @@ -337,10 +337,10 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) uint64_t match_bits; if (p->n_chunks == -1) { - match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits); + match_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.match_bits); chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT; } else { - match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) | + match_bits = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.pipeline_tag) | MPIDI_OFI_GPU_PIPELINE_SEND; chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE; } @@ -350,7 +350,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing) match_bits, mask_bits, (void *) &chunk_req->context); MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock); if (ret == 0) { - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) += 1; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) += 1; free(p); /* chunk_req and host_buf will be freed in recv_events */ return MPIR_ASYNC_THING_DONE; @@ -378,9 +378,9 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques MPL_free(chunk_req); - void *recv_buf = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.buf); - size_t recv_count = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.count); - MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.datatype); + void *recv_buf = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.buf); + size_t recv_count = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.count); + MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.datatype); if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) { rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true); @@ -388,20 +388,20 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag); if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) { - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync) = true; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync) = true; } bool is_pipeline = (wc->data & MPIDI_OFI_IDATA_PIPELINE); if (!is_pipeline) { /* message from a normal send */ - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = 1; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = 1; mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); MPIR_ERR_CHECK(mpi_errno); } else { struct pipeline_header *p_hdr = host_buf; MPIR_Assert(p_hdr->n_chunks > 0); - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) = p_hdr->n_chunks; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.pipeline_tag) = p_hdr->pipeline_tag; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) = p_hdr->n_chunks; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.pipeline_tag) = p_hdr->pipeline_tag; /* There is no data in the init chunk, free the buffer */ MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf); /* Post recv for the remaining chunks. */ @@ -412,7 +412,7 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques } } else { MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE); - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) -= 1; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_inrecv) -= 1; mpi_errno = start_recv_copy(rreq, host_buf, wc->len, recv_buf, recv_count, datatype); MPIR_ERR_CHECK(mpi_errno); } @@ -443,7 +443,7 @@ static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, { int mpi_errno = MPI_SUCCESS; - MPI_Aint offset = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset); + MPI_Aint offset = MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset); int engine_type = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_H2D_ENGINE_TYPE; /* FIXME: current design unpacks all bytes from host buffer, overflow check is missing. */ @@ -453,7 +453,7 @@ static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz, MPL_GPU_COPY_H2D, engine_type, 1, &async_req); MPIR_ERR_CHECK(mpi_errno); - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset) += chunk_sz; + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset) += chunk_sz; struct recv_copy *p; p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER); @@ -490,10 +490,10 @@ static int recv_copy_poll(MPIR_Async_thing * thing) static void recv_copy_complete(MPIR_Request * rreq, void *buf) { int mpi_errno = MPI_SUCCESS; - MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) -= 1; - if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_remain) == 0) { + MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) -= 1; + if (MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.num_remain) == 0) { /* all chunks arrived and copied */ - if (unlikely(MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.is_sync))) { + if (unlikely(MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.is_sync))) { MPIR_Comm *comm = rreq->comm; uint64_t ss_bits = MPIDI_OFI_init_sendtag(MPL_atomic_relaxed_load_int @@ -520,7 +520,7 @@ static void recv_copy_complete(MPIR_Request * rreq, void *buf) MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(rreq, datatype)); /* Set number of bytes in status. */ - MPIR_STATUS_SET_COUNT(rreq->status, MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.offset)); + MPIR_STATUS_SET_COUNT(rreq->status, MPIDI_OFI_REQUEST(rreq, u.pipeline_recv.offset)); MPIDI_Request_complete_fast(rreq); } diff --git a/src/mpid/ch4/netmod/ofi/ofi_pre.h b/src/mpid/ch4/netmod/ofi/ofi_pre.h index 09c72a22923..d97bc434323 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_pre.h +++ b/src/mpid/ch4/netmod/ofi/ofi_pre.h @@ -220,12 +220,6 @@ typedef struct { struct { struct iovec *iovs; } nopack_recv; - } u; - union { - struct iovec iov; - void *inject_buf; /* Internal buffer for inject emulation */ - } util; - union { struct { int vci_local; int ctx_idx; @@ -234,7 +228,7 @@ typedef struct { uint64_t match_bits; int pipeline_tag; int num_remain; - } send; + } pipeline_send; struct { int vci_local; int ctx_idx; @@ -249,8 +243,12 @@ typedef struct { void *buf; MPI_Aint count; MPI_Datatype datatype; - } recv; - } pipeline_info; /* GPU pipeline */ + } pipeline_recv; + } u; + union { + struct iovec iov; + void *inject_buf; /* Internal buffer for inject emulation */ + } util; } MPIDI_OFI_request_t; typedef struct {