Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/api/cpp/nixl.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,22 @@ class nixlAgent {
/**
* @brief Create a GPU transfer request from a transfer request.
*
* @param req_hndl [in] Transfer request obtained from makeXferReq/createXferReq
* @param gpu_req_hndl [out] GPU transfer request handle
* @return nixl_status_t Error code if call was not successful
*
* @param local_descs [in] Local descriptor list (empty for signal-only case)
* @param remote_descs [in] Remote descriptor list
* @param remote_agent [in] Remote agent name for accessing the remote data
* @param gpu_req_hndl [out] GPU transfer request handle
* @param req_hndl [out] Transfer request handle
* @param extra_params [in] Optional extra parameters
* @return nixl_status_t Error code if call was not successful
*/
nixl_status_t
createGpuXferReq(const nixlXferReqH &req_hndl, nixlGpuXferReqH &gpu_req_hndl) const;
createGpuXferReq(const nixl_xfer_dlist_t &local_descs,
const nixl_xfer_dlist_t &remote_descs,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's confusing to have local_descs and remote_descs with different lengths, which implies that the last len(remote_descs) - len(local_descs) remote descs are inline only.

Instead, I think we should enforce len(local_descs) == len(remote_descs) and introduce a third parameter- inline_descs which will describe remote memory that has no corresponding src buffer (and therefore can only be written into with inline data)

const std::string &remote_agent,
nixlGpuXferReqH &gpu_req_hndl,
nixlXferReqH *&req_hndl,
const nixl_opt_args_t *extra_params = nullptr) const;

/**
* @brief Release transfer request from GPU memory
Expand Down
158 changes: 131 additions & 27 deletions src/core/nixl_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -879,43 +879,33 @@ nixlAgent::createXferReq(const nixl_xfer_op_t &operation,
if (!extra_params || extra_params->backends.size() == 0) {
// Finding backends that support the corresponding memories
// locally and remotely, and find the common ones.
backend_set_t* local_set =
data->memorySection->queryBackends(local_descs.getType());
backend_set_t* remote_set =
data->remoteSections[remote_agent]->queryBackends(
remote_descs.getType());
backend_set_t *local_set = data->memorySection->queryBackends(local_descs.getType());
backend_set_t *remote_set =
data->remoteSections[remote_agent]->queryBackends(remote_descs.getType());
if (!local_set || !remote_set) {
NIXL_ERROR_FUNC << "no backends found for local or remote for their "
"corresponding memory type";
return NIXL_ERR_NOT_FOUND;
}

for (auto & elm : *local_set)
if (remote_set->count(elm) != 0)
backend_set->insert(elm);
for (auto &elm : *local_set)
if (remote_set->count(elm) != 0) backend_set->insert(elm);

if (backend_set->empty()) {
NIXL_ERROR_FUNC << "no potential backend found to be able to do the transfer";
return NIXL_ERR_NOT_FOUND;
}
} else {
for (auto & elm : extra_params->backends)
for (auto &elm : extra_params->backends)
backend_set->insert(elm->engine);
}

// TODO: when central KV is supported, add a call to fetchRemoteMD
// TODO: merge descriptors back to back in memory (like makeXferReq).
// TODO [Perf]: Avoid heap allocation on the datapath, maybe use a mem pool

std::unique_ptr<nixlXferReqH> handle = std::make_unique<nixlXferReqH>();
handle->initiatorDescs = new nixl_meta_dlist_t(local_descs.getType());

handle->targetDescs = new nixl_meta_dlist_t(remote_descs.getType());

// Currently we loop through and find first local match. Can use a
// preference list or more exhaustive search.
for (auto & backend : *backend_set) {
// If populate fails, it clears the resp before return
ret1 = data->memorySection->populate(
local_descs, backend, *handle->initiatorDescs);
ret2 = data->remoteSections[remote_agent]->populate(
Expand Down Expand Up @@ -1217,25 +1207,139 @@ nixlAgent::releaseXferReq(nixlXferReqH *req_hndl) const {
}

nixl_status_t
nixlAgent::createGpuXferReq(const nixlXferReqH &req_hndl, nixlGpuXferReqH &gpu_req_hndl) const {
if (!req_hndl.engine) {
NIXL_ERROR_FUNC << "Invalid request handle[" << &req_hndl << "]: engine is null";
return NIXL_ERR_INVALID_PARAM;
nixlAgent::createGpuXferReq(const nixl_xfer_dlist_t &local_descs,
const nixl_xfer_dlist_t &remote_descs,
const std::string &remote_agent,
nixlGpuXferReqH &gpu_req_hndl,
nixlXferReqH *&req_hndl,
const nixl_opt_args_t *extra_params) const {
nixl_status_t ret1, ret2;
nixl_opt_b_args_t opt_args;

std::unique_ptr<backend_set_t> backend_set = std::make_unique<backend_set_t>();

req_hndl = nullptr;

NIXL_SHARED_LOCK_GUARD(data->lock);

if (data->remoteSections.count(remote_agent) == 0) {
NIXL_ERROR_FUNC << "metadata for remote agent '" << remote_agent << "' not found";
data->addErrorTelemetry(NIXL_ERR_NOT_FOUND);
return NIXL_ERR_NOT_FOUND;
}

if (!req_hndl.backendHandle) {
NIXL_ERROR_FUNC << "Invalid request handle[" << &req_hndl << "]: backendHandle is null";
size_t total_bytes = 0;
if (local_descs.descCount() != remote_descs.descCount()) {
NIXL_ERROR_FUNC << "different descriptor list sizes (local=" << local_descs.descCount()
<< ", remote=" << remote_descs.descCount() << ")";
return NIXL_ERR_INVALID_PARAM;
}
for (int i = 0; i < local_descs.descCount(); ++i) {
if (local_descs[i].len != remote_descs[i].len) {
NIXL_ERROR_FUNC << "length mismatch at index " << i;
return NIXL_ERR_INVALID_PARAM;
}
total_bytes += local_descs[i].len;
}

NIXL_SHARED_LOCK_GUARD(data->lock);
const auto status = req_hndl.engine->createGpuXferReq(
*req_hndl.backendHandle, *req_hndl.initiatorDescs, *req_hndl.targetDescs, gpu_req_hndl);
if (!extra_params || extra_params->backends.size() == 0) {
backend_set_t* local_set =
data->memorySection->queryBackends(local_descs.getType());
backend_set_t* remote_set =
data->remoteSections[remote_agent]->queryBackends(
remote_descs.getType());
if (!local_set || !remote_set) {
NIXL_ERROR_FUNC << "no backends found for local or remote for their "
"corresponding memory type";
return NIXL_ERR_NOT_FOUND;
}

for (auto & elm : *local_set)
if (remote_set->count(elm) != 0)
backend_set->insert(elm);

if (backend_set->empty()) {
NIXL_ERROR_FUNC << "no potential backend found to be able to do the transfer";
return NIXL_ERR_NOT_FOUND;
}
} else {
for (auto & elm : extra_params->backends)
backend_set->insert(elm->engine);
}

std::unique_ptr<nixlXferReqH> handle = std::make_unique<nixlXferReqH>();
handle->initiatorDescs = new nixl_meta_dlist_t(local_descs.getType());

handle->targetDescs = new nixl_meta_dlist_t(remote_descs.getType());

for (auto &backend : *backend_set) {
ret1 = data->memorySection->populate(local_descs, backend, *handle->initiatorDescs);
ret2 = data->remoteSections[remote_agent]->populate(
remote_descs, backend, *handle->targetDescs);

if ((ret1 == NIXL_SUCCESS) && (ret2 == NIXL_SUCCESS)) {
NIXL_INFO << "Selected backend: " << backend->getType();
handle->engine = backend;
break;
}
}

if (!handle->engine) {
NIXL_ERROR_FUNC << "no specified or potential backend had the required "
"registrations to be able to do the transfer";
data->addErrorTelemetry(NIXL_ERR_NOT_FOUND);
return NIXL_ERR_NOT_FOUND;
}

if (extra_params) {
if (extra_params->hasNotif) {
opt_args.notifMsg = extra_params->notifMsg;
opt_args.hasNotif = true;
}

if (extra_params->customParam.length() > 0)
opt_args.customParam = extra_params->customParam;
}

if (opt_args.hasNotif && (!handle->engine->supportsNotif())) {
NIXL_ERROR_FUNC << "the selected backend '" << handle->engine->getType()
<< "' does not support notifications";
data->addErrorTelemetry(NIXL_ERR_BACKEND);
return NIXL_ERR_BACKEND;
}

handle->remoteAgent = remote_agent;
handle->status = NIXL_ERR_NOT_POSTED;
handle->notifMsg = opt_args.notifMsg;
handle->hasNotif = opt_args.hasNotif;

if (data->telemetryEnabled) {
handle->telemetry.totalBytes = total_bytes;
handle->telemetry.descCount = handle->initiatorDescs->descCount();
}

ret1 = handle->engine->prepXfer (handle->backendOp,
*handle->initiatorDescs,
*handle->targetDescs,
handle->remoteAgent,
handle->backendHandle,
&opt_args);
if (ret1 != NIXL_SUCCESS) {
NIXL_ERROR_FUNC << "backend '" << handle->engine->getType()
<< "' failed to prepare the transfer request with status " << ret1;
data->addErrorTelemetry(ret1);
return ret1;
}

req_hndl = handle.release();

const auto status = req_hndl->engine->createGpuXferReq(
*req_hndl->backendHandle, *req_hndl->initiatorDescs, *req_hndl->targetDescs, gpu_req_hndl);
if (status == NIXL_SUCCESS) {
data->gpuReqToEngine.emplace(gpu_req_hndl, req_hndl.engine);
data->gpuReqToEngine.emplace(gpu_req_hndl, req_hndl->engine);
}

return status;
return NIXL_SUCCESS;
}

void
Expand Down
Loading
Loading