diff --git a/src/plugins/ucx/meson.build b/src/plugins/ucx/meson.build index aebfa92f9..2bf1ff038 100644 --- a/src/plugins/ucx/meson.build +++ b/src/plugins/ucx/meson.build @@ -16,26 +16,20 @@ ucx_utils_dep = declare_dependency(link_with: ucx_utils_lib, include_directories: utils_inc_dirs ) asio_dep = [dependency('asio', required: true)] -compile_flags = [] -if cuda_dep.found() - compile_flags = [ '-DHAVE_CUDA' ] -endif - if 'UCX' in static_plugins ucx_backend_lib = static_library('UCX', 'ucx_backend.cpp', 'ucx_backend.h', 'ucx_plugin.cpp', - dependencies: [nixl_infra, ucx_utils_dep, serdes_interface, cuda_dep, ucx_dep, thread_dep, nixl_common_dep, asio_dep], + dependencies: [nixl_infra, ucx_utils_dep, serdes_interface, ucx_dep, thread_dep, nixl_common_dep, asio_dep], include_directories: nixl_inc_dirs, install: false, - cpp_args : compile_flags, name_prefix: 'libplugin_') # Custom prefix for plugin libraries else ucx_backend_lib = shared_library('UCX', 'ucx_backend.cpp', 'ucx_backend.h', 'ucx_plugin.cpp', - dependencies: [nixl_infra, ucx_utils_dep, serdes_interface, cuda_dep, ucx_dep, thread_dep, nixl_common_dep, asio_dep], + dependencies: [nixl_infra, ucx_utils_dep, serdes_interface, ucx_dep, thread_dep, nixl_common_dep, asio_dep], include_directories: nixl_inc_dirs, install: true, - cpp_args : compile_flags + ['-fPIC'], + cpp_args : ['-fPIC'], name_prefix: 'libplugin_', # Custom prefix for plugin libraries install_dir: plugin_install_dir, install_rpath: '$ORIGIN/..') diff --git a/src/plugins/ucx/ucx_backend.cpp b/src/plugins/ucx/ucx_backend.cpp index 9be55fb28..56af4f0b5 100644 --- a/src/plugins/ucx/ucx_backend.cpp +++ b/src/plugins/ucx/ucx_backend.cpp @@ -30,14 +30,6 @@ #include "absl/strings/numbers.h" #include "absl/strings/str_join.h" #include - -#ifdef HAVE_CUDA - -#include -#include - -#endif - namespace { void moveNotifList(notif_list_t &src, notif_list_t &tgt) { @@ -48,223 +40,6 @@ namespace { } } -/**************************************** - * CUDA related code - *****************************************/ - -class nixlUcxCudaCtx { -public: -#ifdef HAVE_CUDA - CUcontext pthrCudaCtx; - int myDevId; - - nixlUcxCudaCtx() { - pthrCudaCtx = NULL; - myDevId = -1; - } -#endif - void cudaResetCtxPtr(); - int cudaUpdateCtxPtr(void *address, int expected_dev, bool &was_updated); - int cudaSetCtx(); -}; - -class nixlUcxCudaDevicePrimaryCtx { -#ifndef HAVE_CUDA -public: - bool push() { return false; } - void pop() {}; -#else - static constexpr int defaultCudaDeviceOrdinal = 0; - int m_ordinal{defaultCudaDeviceOrdinal}; - CUdevice m_device{CU_DEVICE_INVALID}; - CUcontext m_context{nullptr}; -public: - - bool push() { - CUcontext context; - - const auto res = cuCtxGetCurrent(&context); - if (res != CUDA_SUCCESS || context != nullptr) { - return false; - } - - if (m_context == nullptr) { - CUresult res = cuDeviceGet(&m_device, m_ordinal); - if (res != CUDA_SUCCESS) { - return false; - } - - res = cuDevicePrimaryCtxRetain(&m_context, m_device); - if (res != CUDA_SUCCESS) { - m_context = nullptr; - return false; - } - } - - return cuCtxPushCurrent(m_context) == CUDA_SUCCESS; - } - - void pop() { - cuCtxPopCurrent(nullptr); - } - - ~nixlUcxCudaDevicePrimaryCtx() { - if (m_context != nullptr) { - cuDevicePrimaryCtxRelease(m_device); - } - } -#endif -}; - -class nixlUcxCudaCtxGuard { - nixlUcxCudaDevicePrimaryCtxPtr m_primary; -public: - nixlUcxCudaCtxGuard(nixl_mem_t nixl_mem, - nixlUcxCudaDevicePrimaryCtxPtr primary) { - if (nixl_mem == VRAM_SEG && primary && primary->push()) { - m_primary = primary; - } - } - ~nixlUcxCudaCtxGuard() { - if (m_primary) { - m_primary->pop(); - } - } -}; - -#ifdef HAVE_CUDA - -static int cudaQueryAddr(void *address, bool &is_dev, - CUdevice &dev, CUcontext &ctx) -{ - CUmemorytype mem_type = CU_MEMORYTYPE_HOST; - uint32_t is_managed = 0; -#define NUM_ATTRS 4 - CUpointer_attribute attr_type[NUM_ATTRS]; - void *attr_data[NUM_ATTRS]; - CUresult result; - - attr_type[0] = CU_POINTER_ATTRIBUTE_MEMORY_TYPE; - attr_data[0] = &mem_type; - attr_type[1] = CU_POINTER_ATTRIBUTE_IS_MANAGED; - attr_data[1] = &is_managed; - attr_type[2] = CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL; - attr_data[2] = &dev; - attr_type[3] = CU_POINTER_ATTRIBUTE_CONTEXT; - attr_data[3] = &ctx; - - result = cuPointerGetAttributes(4, attr_type, attr_data, (CUdeviceptr)address); - - is_dev = (mem_type == CU_MEMORYTYPE_DEVICE); - - return (CUDA_SUCCESS != result); -} - -// This routine finds the CUDA context matching for the given input address. -int nixlUcxCudaCtx::cudaUpdateCtxPtr(void *address, int expected_dev, bool &was_updated) -{ - bool is_dev; - CUdevice dev; - CUcontext ctx; - int ret; - - was_updated = false; - - /* TODO: proper error codes and log outputs through this method */ - if (expected_dev == -1) { - return -1; - } - - ret = cudaQueryAddr(address, is_dev, dev, ctx); - if (ret) { - return ret; - } - - if (!is_dev) { - return 0; - } - - if (dev != expected_dev) { - // User provided address that does not match dev_id - return -1; - } - - pthrCudaCtx = ctx; - was_updated = true; - myDevId = expected_dev; - - return 0; -} - -int nixlUcxCudaCtx::cudaSetCtx() -{ - CUresult result; - if (NULL == pthrCudaCtx) { - return 0; - } - - result = cuCtxSetCurrent(pthrCudaCtx); - - return (CUDA_SUCCESS == result); -} - -#else - -int nixlUcxCudaCtx::cudaUpdateCtxPtr(void *address, int expected_dev, bool &was_updated) -{ - was_updated = false; - return 0; -} - -int nixlUcxCudaCtx::cudaSetCtx() { - return 0; -} - -#endif - - -void nixlUcxEngine::vramInitCtx() -{ - cudaCtx = std::make_unique(); -} - -int -nixlUcxEngine::vramUpdateCtx(void *address, uint64_t dev_id, bool &restart_reqd) { - int ret; - bool was_updated; - - restart_reqd = false; - - if(!cuda_addr_wa) { - // Nothing to do - return 0; - } - - ret = cudaCtx->cudaUpdateCtxPtr(address, dev_id, was_updated); - if (ret) { - return ret; - } - - restart_reqd = was_updated; - - return 0; -} - -int nixlUcxEngine::vramApplyCtx() -{ - if(!cuda_addr_wa) { - // Nothing to do - return 0; - } - - return cudaCtx->cudaSetCtx(); -} - -void nixlUcxEngine::vramFiniCtx() -{ - cudaCtx.reset(); -} - /**************************************** * UCX request management *****************************************/ @@ -429,9 +204,7 @@ class nixlUcxBackendH : public nixlBackendReqH { */ class nixlUcxThread { public: - nixlUcxThread(const nixlUcxEngine *engine, std::function init, size_t num_workers) - : engine_(engine), - init_(std::move(init)) { + nixlUcxThread(const nixlUcxEngine *engine, size_t num_workers) : engine_(engine) { workers_.reserve(num_workers); } @@ -477,7 +250,6 @@ class nixlUcxThread { void operator()() { tlsThread() = this; - init_(); threadActive_->set_value(); run(); } @@ -506,7 +278,6 @@ class nixlUcxThread { private: const nixlUcxEngine *engine_; - std::function init_; std::vector workers_; std::vector workerIds_; std::unique_ptr thread_; @@ -515,11 +286,8 @@ class nixlUcxThread { class nixlUcxSharedThread : public nixlUcxThread { public: - nixlUcxSharedThread(const nixlUcxEngine *engine, - std::function init, - size_t num_workers, - nixlTime::us_t delay) - : nixlUcxThread(engine, std::move(init), num_workers) { + nixlUcxSharedThread(const nixlUcxEngine *engine, size_t num_workers, nixlTime::us_t delay) + : nixlUcxThread(engine, num_workers) { if (pipe(controlPipe_) < 0) { throw std::runtime_error("Couldn't create progress thread control pipe"); } @@ -607,8 +375,7 @@ nixlUcxThreadEngine::nixlUcxThreadEngine(const nixlBackendInitParams &init_param } size_t num_workers = getWorkers().size(); - thread_ = std::make_unique( - this, [this]() { nixlUcxEngine::vramApplyCtx(); }, num_workers, init_params.pthrDelay); + thread_ = std::make_unique(this, num_workers, init_params.pthrDelay); for (size_t i = 0; i < num_workers; i++) { thread_->addWorker(getWorkers()[i].get(), i); } @@ -619,13 +386,6 @@ nixlUcxThreadEngine::~nixlUcxThreadEngine() { thread_->join(); } -int -nixlUcxThreadEngine::vramApplyCtx() { - thread_->join(); - thread_->start(); - return nixlUcxEngine::vramApplyCtx(); -} - void nixlUcxThreadEngine::appendNotif(std::string remote_name, std::string msg) { if (nixlUcxThread::isProgressThread(this)) { @@ -827,8 +587,8 @@ class nixlUcxCompositeBackendH : public nixlUcxBackendH { class nixlUcxDedicatedThread : public nixlUcxThread { public: - nixlUcxDedicatedThread(nixlUcxEngine *engine, std::function init, asio::io_context &io) - : nixlUcxThread(engine, std::move(init), 1), + nixlUcxDedicatedThread(nixlUcxEngine *engine, asio::io_context &io) + : nixlUcxThread(engine, 1), io_(io) {} static nixlUcxDedicatedThread * @@ -898,11 +658,9 @@ nixlUcxThreadPoolEngine::nixlUcxThreadPoolEngine(const nixlBackendInitParams &in splitBatchSize_ = nixl_b_params_get(init_params.customParams, "split_batch_size", 1024); - auto init = [this]() { nixlUcxEngine::vramApplyCtx(); }; - if (init_params.enableProgTh) { - sharedThread_ = std::make_unique( - this, init, numSharedWorkers_, init_params.pthrDelay); + sharedThread_ = + std::make_unique(this, numSharedWorkers_, init_params.pthrDelay); for (size_t i = 0; i < numSharedWorkers_; i++) { sharedThread_->addWorker(getWorkers()[i].get(), i); } @@ -914,8 +672,7 @@ nixlUcxThreadPoolEngine::nixlUcxThreadPoolEngine(const nixlBackendInitParams &in dedicatedThreads_.reserve(num_threads); for (size_t i = 0; i < num_threads; ++i) { size_t worker_id = numSharedWorkers_ + i; - dedicatedThreads_.emplace_back( - std::make_unique(this, init, *io_)); + dedicatedThreads_.emplace_back(std::make_unique(this, *io_)); dedicatedThreads_.back()->addWorker(getWorker(worker_id).get(), worker_id); dedicatedThreads_.back()->start(); } @@ -1014,25 +771,6 @@ nixlUcxThreadPoolEngine::sendXferRange(const nixl_xfer_op_t &operation, return status.load(); } -int -nixlUcxThreadPoolEngine::vramApplyCtx() { - if (sharedThread_) { - sharedThread_->join(); - sharedThread_->start(); - } - if (io_) { - io_->stop(); - for (auto &thread : dedicatedThreads_) { - thread->join(); - } - io_->restart(); - for (auto &thread : dedicatedThreads_) { - thread->start(); - } - } - return nixlUcxEngine::vramApplyCtx(); -} - void nixlUcxThreadPoolEngine::appendNotif(std::string remote_name, std::string msg) { if (nixlUcxThread::isProgressThread(this)) { @@ -1111,17 +849,6 @@ nixlUcxEngine::nixlUcxEngine(const nixlBackendInitParams &init_params) auto &uw = uws.front(); workerAddr = uw->epAddr(); uw->regAmCallback(NOTIF_STR, notifAmCb, this); - - // Temp fixup - if (getenv("NIXL_DISABLE_CUDA_ADDR_WA")) { - NIXL_INFO << "disabling CUDA address workaround"; - cuda_addr_wa = false; - } else { - cuda_addr_wa = true; - } - - m_cudaPrimaryCtx = std::make_shared(); - vramInitCtx(); } nixl_mem_list_t nixlUcxEngine::getSupportedMems () const { @@ -1139,7 +866,6 @@ tlsSharedWorkerMap() { // Through parent destructor the unregister will be called. nixlUcxEngine::~nixlUcxEngine() { - vramFiniCtx(); tlsSharedWorkerMap().erase(this); } @@ -1218,17 +944,6 @@ nixl_status_t nixlUcxEngine::registerMem (const nixlBlobDesc &mem, { auto priv = std::make_unique(); - if (nixl_mem == VRAM_SEG) { - bool need_restart; - if (vramUpdateCtx((void*)mem.addr, mem.devId, need_restart)) { - return NIXL_ERR_NOT_SUPPORTED; - //TODO Add to logging - } - if (need_restart) { - vramApplyCtx(); - } - } - // TODO: Add nixl_mem check? const int ret = uc->memReg((void*) mem.addr, mem.len, priv->mem, nixl_mem); if (ret) { @@ -1307,8 +1022,6 @@ nixl_status_t nixlUcxEngine::loadRemoteMD (const nixlBlobDesc &input, const std::string &remote_agent, nixlBackendMD* &output) { - // Set CUDA context of first device, UCX will anyways detect proper device when sending - nixlUcxCudaCtxGuard guard(nixl_mem, m_cudaPrimaryCtx); return internalMDHelper(input.metaInfo, remote_agent, output); } diff --git a/src/plugins/ucx/ucx_backend.h b/src/plugins/ucx/ucx_backend.h index bf9e6546d..907222ee9 100644 --- a/src/plugins/ucx/ucx_backend.h +++ b/src/plugins/ucx/ucx_backend.h @@ -93,16 +93,6 @@ class nixlUcxPublicMetadata : public nixlBackendMD { std::vector> rkeys_; }; -// Forward declaration of CUDA context -// It is only visible in ucx_backend.cpp to ensure that -// HAVE_CUDA works properly -// Once we will introduce static config (i.e. config.h) that -// will be part of NIXL installation - we can have -// HAVE_CUDA in h-files -class nixlUcxCudaCtx; -class nixlUcxCudaDevicePrimaryCtx; -using nixlUcxCudaDevicePrimaryCtxPtr = std::shared_ptr; - class nixlUcxEngine : public nixlBackendEngine { public: static std::unique_ptr @@ -247,9 +237,6 @@ class nixlUcxEngine : public nixlBackendEngine { void getNotifsImpl(notif_list_t ¬if_list); - virtual int - vramApplyCtx(); - virtual void appendNotif(std::string remote_name, std::string msg); @@ -265,13 +252,6 @@ class nixlUcxEngine : public nixlBackendEngine { nixlUcxEngine(const nixlBackendInitParams &init_params); private: - void - vramInitCtx(); - void - vramFiniCtx(); - int - vramUpdateCtx(void *address, uint64_t dev_id, bool &restart_reqd); - // Memory management helpers nixl_status_t internalMDHelper(const nixl_blob_t &blob, const std::string &agent, nixlBackendMD *&output); @@ -300,14 +280,8 @@ class nixlUcxEngine : public nixlBackendEngine { std::string workerAddr; mutable std::atomic sharedWorkerIndex_; - /* CUDA data*/ - std::unique_ptr cudaCtx; // Context matching specific device - bool cuda_addr_wa; mutable std::optional gpuSignalSize_; - // Context to use when current context is missing - nixlUcxCudaDevicePrimaryCtxPtr m_cudaPrimaryCtx; - /* Notifications */ notif_list_t notifMainList; @@ -330,9 +304,6 @@ class nixlUcxThreadEngine : public nixlUcxEngine { getNotifs(notif_list_t ¬if_list) override; protected: - int - vramApplyCtx() override; - void appendNotif(std::string remote_name, std::string msg) override; @@ -368,9 +339,6 @@ class nixlUcxThreadPoolEngine : public nixlUcxEngine { getNotifs(notif_list_t ¬if_list) override; protected: - int - vramApplyCtx() override; - void appendNotif(std::string remote_name, std::string msg) override; diff --git a/src/utils/ucx/ucx_utils.cpp b/src/utils/ucx/ucx_utils.cpp index d337e7247..5b1019430 100644 --- a/src/utils/ucx/ucx_utils.cpp +++ b/src/utils/ucx/ucx_utils.cpp @@ -456,6 +456,9 @@ nixlUcxContext::nixlUcxContext(std::vector devs, unsigned ucp_version = UCP_VERSION(major_version, minor_version); if (ucp_version >= UCP_VERSION(1, 19)) { config.modify ("MAX_COMPONENT_MDS", "32"); + } else { + NIXL_WARN << "UCX version is less than 1.19, CUDA support is limited, " + << "including the lack of support for multi-GPU within a single process."; } if (ucp_version >= UCP_VERSION(1, 20)) {