From 9366e0c865f99fd41bf61260d63b7d0c11d8df26 Mon Sep 17 00:00:00 2001 From: yumianjie_um_3090_2 <1041884221@qq.com> Date: Wed, 27 Mar 2024 12:57:16 +0000 Subject: [PATCH] support one ggml_tensor rdma --- rdma-example/src/rdma_client_LLM.cpp | 93 ++++++++---- rdma-example/src/rdma_common.c | 210 --------------------------- rdma-example/src/rdma_common.cpp | 13 ++ 3 files changed, 74 insertions(+), 242 deletions(-) delete mode 100755 rdma-example/src/rdma_common.c diff --git a/rdma-example/src/rdma_client_LLM.cpp b/rdma-example/src/rdma_client_LLM.cpp index 642849bc..6b8c0784 100755 --- a/rdma-example/src/rdma_client_LLM.cpp +++ b/rdma-example/src/rdma_client_LLM.cpp @@ -38,7 +38,10 @@ static struct rdma_buffer_attr_vec server_metadata_attrs; static struct rdma_buffer_attr_vec client_metadata_attrs; std::vector client_src_mrs; std::vector client_dst_mrs; -int total = 1; +std::vector client_send_sges; +std::vector server_recv_wrs; + +int total = 2; void printf_value(struct ggml_tensor * tensor ) { @@ -72,6 +75,21 @@ static int check_src_dst_LLM() printf_value(tensor_dst); return memcmp((void*) tensor_src->data, (void*) tensor_dst->data, ggml_nbytes(tensor_dst)); } +static int check_src_dst_LLM_vec() +{ + printf("%s\n",__func__); + for(int i=0;idata, (void*) tensor_dsts[i]->data, ggml_nbytes(tensor_dsts[i]))!=0) + { + return -1; + } + } + return 0; + +} /* This function prepares client side connection resources for an RDMA connection */ static int client_prepare_connection(struct sockaddr_in *s_addr) { @@ -529,7 +547,7 @@ static int client_xchange_metadata_with_server_LLM_vec() return ret; } debug("Server sent us its buffer location and credentials, showing \n"); - show_rdma_buffer_attr(&server_metadata_attr); + show_rdma_buffer_attrs(&server_metadata_attrs); return 0; } @@ -709,10 +727,11 @@ static int client_remote_memory_ops_LLM() } static int client_remote_memory_ops_LLM_vec() { - struct ibv_wc wc; + struct ibv_wc wc[total]; int ret = -1; + printf_value(tensor_srcs[0]); for(int i=0;idata, ggml_nbytes(tensor_srcs[i]), @@ -724,6 +743,7 @@ static int client_remote_memory_ops_LLM_vec() return -ENOMEM; } } + printf_value(tensor_srcs[0]); /* Step 1: is to copy the local buffer into the remote buffer. We will * reuse the previous variables. */ @@ -740,8 +760,8 @@ static int client_remote_memory_ops_LLM_vec() client_send_wr.opcode = IBV_WR_RDMA_WRITE; client_send_wr.send_flags = IBV_SEND_SIGNALED; /* we have to tell server side info for RDMA */ //设置远程RDMA操作的相关信息,包括远程键(rkey)和远程地址。 - client_send_wr.wr.rdma.rkey = server_metadata_attr.stag.remote_stag; - client_send_wr.wr.rdma.remote_addr = server_metadata_attr.address; + client_send_wr.wr.rdma.rkey = server_metadata_attrs.stags[i].remote_stag; + client_send_wr.wr.rdma.remote_addr = server_metadata_attrs.address[i]; /* Now we post it */ ret = ibv_post_send(client_qp, //调用ibv_post_send()函数将发送请求发送到RDMA队列中。 &client_send_wr, @@ -751,6 +771,8 @@ static int client_remote_memory_ops_LLM_vec() -errno); return -errno; } + sleep(5); + } /* now we link to the send work request */ //初始化client_send_wr结构体,并设置相关参数,如SGE列表、SGE数量、操作码(IBV_WR_RDMA_WRITE)和发送标志(IBV_SEND_SIGNALED)。 @@ -760,15 +782,16 @@ static int client_remote_memory_ops_LLM_vec() /* at this point we are expecting 1 work completion for the write */ printf("process_work_completion_events\n"); - ret = process_work_completion_events(io_completion_channel, //函数等待并处理工作完成事件。 - &wc, 1); - if(ret != 1) { - rdma_error("We failed to get 1 work completions , ret = %d \n", - ret); - return ret; - } + // ret = process_work_completion_events(io_completion_channel, //函数等待并处理工作完成事件。 + // wc, total); + // if(ret != total) { + // rdma_error("We failed to get 1 work completions , ret = %d \n", + // ret); + // return ret; + // } + debug("Client side WRITE is complete \n"); - for(int i=0;i<1;++i) + for(int i=0;iaddr; client_send_sge.length = (uint32_t) client_dst_mrs[i]->length; @@ -780,8 +803,8 @@ static int client_remote_memory_ops_LLM_vec() client_send_wr.opcode = IBV_WR_RDMA_READ; client_send_wr.send_flags = IBV_SEND_SIGNALED; /* we have to tell server side info for RDMA */ // 设置远程RDMA操作的相关信息,包括远程键和远程地址。 - client_send_wr.wr.rdma.rkey = server_metadata_attr.stag.remote_stag; - client_send_wr.wr.rdma.remote_addr = server_metadata_attr.address; + client_send_wr.wr.rdma.rkey = server_metadata_attrs.stags[0].remote_stag; + client_send_wr.wr.rdma.remote_addr = server_metadata_attrs.address[0]; /* Now we post it */ ret = ibv_post_send(client_qp, //函数将发送请求发送到RDMA队列中。 &client_send_wr, @@ -792,16 +815,21 @@ static int client_remote_memory_ops_LLM_vec() return -errno; } /* Now we prepare a READ using same variables but for destination */ //将目标缓冲区的地址、长度和本地键赋值给client_send_sge结构体,表示接收的数据 - + sleep(5); } + + /* at this point we are expecting 1 work completion for the write */ - ret = process_work_completion_events(io_completion_channel, - &wc, 1); - if(ret != 1) { - rdma_error("We failed to get 1 work completions , ret = %d \n", - ret); - return ret; - } + + // ret = process_work_completion_events(io_completion_channel, + // wc, total); + + + // if(ret != total) { + // printf("We failed to get 1 work completions , ret = %d \n", + // ret); + // return ret; + // } debug("Client side READ is complete \n"); return 0; } @@ -962,10 +990,10 @@ int main(int argc, char **argv) { tensor_srcs.resize(total); client_src_mrs.resize(total); client_dst_mrs.resize(total); - client_metadata_attrs.address.resize(total); - client_metadata_attrs.length.resize(total); - client_metadata_attrs.stags.resize(total); - client_metadata_attrs.size = total; + // client_metadata_attrs.address.resize(total); + // client_metadata_attrs.length.resize(total); + // client_metadata_attrs.stags.resize(total); + // client_metadata_attrs.size = total; for(int i=0;iverbs && id->verbs->device) - printf("dev_ctx: %p (device name: %s) \n", id->verbs, - id->verbs->device->name); - if(id->channel) - printf("cm event channel %p\n", id->channel); - printf("QP: %p, port_space %x, port_num %u \n", id->qp, - id->ps, - id->port_num); -} - -void show_rdma_buffer_attr(struct rdma_buffer_attr *attr){ - if(!attr){ - rdma_error("Passed attr is NULL\n"); - return; - } - printf("---------------------------------------------------------\n"); - printf("buffer attr, addr: %p , len: %u , stag : 0x%x \n", - (void*) attr->address, - (unsigned int) attr->length, - attr->stag.local_stag); - printf("---------------------------------------------------------\n"); -} - -struct ibv_mr* rdma_buffer_alloc(struct ibv_pd *pd, uint32_t size, - enum ibv_access_flags permission) -{ - struct ibv_mr *mr = NULL; - if (!pd) { - rdma_error("Protection domain is NULL \n"); - return NULL; - } - void *buf = calloc(1, size); - if (!buf) { - rdma_error("failed to allocate buffer, -ENOMEM\n"); - return NULL; - } - debug("Buffer allocated: %p , len: %u \n", buf, size); - mr = rdma_buffer_register(pd, buf, size, permission); - if(!mr){ - free(buf); - } - return mr; -} - -struct ibv_mr *rdma_buffer_register(struct ibv_pd *pd, - void *addr, uint32_t length, - enum ibv_access_flags permission) -{ - struct ibv_mr *mr = NULL; - if (!pd) { - rdma_error("Protection domain is NULL, ignoring \n"); - return NULL; - } - mr = ibv_reg_mr(pd, addr, length, permission); - if (!mr) { - rdma_error("Failed to create mr on buffer, errno: %d \n", -errno); - return NULL; - } - debug("Registered: %p , len: %u , stag: 0x%x \n", - mr->addr, - (unsigned int) mr->length, - mr->lkey); - return mr; -} - -void rdma_buffer_free(struct ibv_mr *mr) -{ - if (!mr) { - rdma_error("Passed memory region is NULL, ignoring\n"); - return ; - } - void *to_free = mr->addr; - rdma_buffer_deregister(mr); - debug("Buffer %p free'ed\n", to_free); - free(to_free); -} - -void rdma_buffer_deregister(struct ibv_mr *mr) -{ - if (!mr) { - rdma_error("Passed memory region is NULL, ignoring\n"); - return; - } - debug("Deregistered: %p , len: %u , stag : 0x%x \n", - mr->addr, - (unsigned int) mr->length, - mr->lkey); - ibv_dereg_mr(mr); -} - -int process_rdma_cm_event(struct rdma_event_channel *echannel, - enum rdma_cm_event_type expected_event, - struct rdma_cm_event **cm_event) -{ - int ret = 1; - ret = rdma_get_cm_event(echannel, cm_event); - if (ret) { - rdma_error("Failed to retrieve a cm event, errno: %d \n", - -errno); - return -errno; - } - /* lets see, if it was a good event */ - if(0 != (*cm_event)->status){ - rdma_error("CM event has non zero status: %d\n", (*cm_event)->status); - ret = -((*cm_event)->status); - /* important, we acknowledge the event */ - rdma_ack_cm_event(*cm_event); - return ret; - } - /* if it was a good event, was it of the expected type */ - if ((*cm_event)->event != expected_event) { - rdma_error("Unexpected event received: %s [ expecting: %s ]", - rdma_event_str((*cm_event)->event), - rdma_event_str(expected_event)); - /* important, we acknowledge the event */ - rdma_ack_cm_event(*cm_event); - return -1; // unexpected event :( - } - debug("A new %s type event is received \n", rdma_event_str((*cm_event)->event)); - /* The caller must acknowledge the event */ - return ret; -} - - -int process_work_completion_events (struct ibv_comp_channel *comp_channel, - struct ibv_wc *wc, int max_wc) -{ - struct ibv_cq *cq_ptr = NULL; - void *context = NULL; - int ret = -1, i, total_wc = 0; - /* We wait for the notification on the CQ channel */ - ret = ibv_get_cq_event(comp_channel, /* IO channel where we are expecting the notification */ - &cq_ptr, /* which CQ has an activity. This should be the same as CQ we created before */ - &context); /* Associated CQ user context, which we did set */ - if (ret) { - rdma_error("Failed to get next CQ event due to %d \n", -errno); - return -errno; - } - /* Request for more notifications. */ - ret = ibv_req_notify_cq(cq_ptr, 0); - if (ret){ - rdma_error("Failed to request further notifications %d \n", -errno); - return -errno; - } - /* We got notification. We reap the work completion (WC) element. It is - * unlikely but a good practice it write the CQ polling code that - * can handle zero WCs. ibv_poll_cq can return zero. Same logic as - * MUTEX conditional variables in pthread programming. - */ - total_wc = 0; - do { - ret = ibv_poll_cq(cq_ptr /* the CQ, we got notification for */, - max_wc - total_wc /* number of remaining WC elements*/, - wc + total_wc/* where to store */); - if (ret < 0) { - rdma_error("Failed to poll cq for wc due to %d \n", ret); - /* ret is errno here */ - return ret; - } - total_wc += ret; - } while (total_wc < max_wc); - debug("%d WC are completed \n", total_wc); - /* Now we check validity and status of I/O work completions */ - for( i = 0 ; i < total_wc ; i++) { - if (wc[i].status != IBV_WC_SUCCESS) { - rdma_error("Work completion (WC) has error status: %s at index %d", - ibv_wc_status_str(wc[i].status), i); - /* return negative value */ - return -(wc[i].status); - } - } - /* Similar to connection management events, we need to acknowledge CQ events */ - ibv_ack_cq_events(cq_ptr, - 1 /* we received one event notification. This is not - number of WC elements */); - return total_wc; -} - - -/* Code acknowledgment: rping.c from librdmacm/examples */ -int get_addr(char *dst, struct sockaddr *addr) -{ - struct addrinfo *res; - int ret = -1; - ret = getaddrinfo(dst, NULL, NULL, &res); - if (ret) { - rdma_error("getaddrinfo failed - invalid hostname or IP address\n"); - return ret; - } - memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in)); - freeaddrinfo(res); - return ret; -} - diff --git a/rdma-example/src/rdma_common.cpp b/rdma-example/src/rdma_common.cpp index 0478c29e..569f0b78 100755 --- a/rdma-example/src/rdma_common.cpp +++ b/rdma-example/src/rdma_common.cpp @@ -37,6 +37,19 @@ void show_rdma_buffer_attr(struct rdma_buffer_attr *attr){ printf("---------------------------------------------------------\n"); } +void show_rdma_buffer_attrs(struct rdma_buffer_attr_vec *attr){ + if(!attr){ + rdma_error("Passed attr is NULL\n"); + return; + } + printf("---------------------------------------------------------\n"); + printf("buffer attr, addr: %p , len: %u , stag : 0x%x \n", + (void*) attr->address[0], + (unsigned int) attr->length[0], + attr->stags[0].local_stag); + printf("---------------------------------------------------------\n"); +} + struct ibv_mr* rdma_buffer_alloc(struct ibv_pd *pd, uint32_t size, enum ibv_access_flags permission) {