diff --git a/.azure-pipelines/ut.yml b/.azure-pipelines/ut.yml new file mode 100644 index 000000000..4332379d3 --- /dev/null +++ b/.azure-pipelines/ut.yml @@ -0,0 +1,50 @@ +trigger: +- main + +pr: +- main + +jobs: +- job: UnitTest + timeoutInMinutes: 30 + pool: + name: mscclpp + container: + image: superbench/superbench:v0.8.0-cuda12.1 + options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1 + + steps: + - task: Bash@3 + name: Build + displayName: Build + inputs: + targetType: 'inline' + script: | + curl -L -C- https://github.com/Kitware/CMake/releases/download/v3.26.4/cmake-3.26.4-linux-x86_64.tar.gz -o /tmp/cmake-3.26.4-linux-x86_64.tar.gz + tar xzf /tmp/cmake-3.26.4-linux-x86_64.tar.gz -C /tmp + export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/cuda-12.1/compat/lib.real + mkdir build && cd build + /tmp/cmake-3.26.4-linux-x86_64/bin/cmake .. + make -j + workingDirectory: '$(System.DefaultWorkingDirectory)' + + + - task: Bash@3 + name: UnitTests + displayName: Run mscclpp unit tests + inputs: + targetType: 'inline' + script: | + ./build/test/unit_tests + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: Bash@3 + name: MpUnitTests + displayName: Run mscclpp multi-process unit tests + inputs: + targetType: 'inline' + script: | + mpirun -tag-output -np 2 ./build/test/mp_unit_tests + mpirun -tag-output -np 4 ./build/test/mp_unit_tests + mpirun -tag-output -np 8 ./build/test/mp_unit_tests + workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index e61c6dc12..2ae182d5e 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -23,7 +23,7 @@ jobs: - name: Run cpplint run: | - CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*" -not -path "./test/*") + CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*") PYTHONCPPSOURCES=$(find ./python/src/ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)') clang-format-12 -style=file --verbose --Werror --dry-run ${CPPSOURCES} clang-format-12 --dry-run ${PYTHONCPPSOURCES} @@ -40,10 +40,11 @@ jobs: - name: Check out Git repository uses: actions/checkout@v3 - - name: Install dependencies + - name: Download misspell run: | - curl -L https://git.io/misspell | sudo bash -s -- -b /bin + curl -L https://github.com/client9/misspell/releases/download/v0.3.4/misspell_0.3.4_linux_64bit.tar.gz -o /tmp/misspell_0.3.4_linux_64bit.tar.gz + tar -xzf /tmp/misspell_0.3.4_linux_64bit.tar.gz -C . - name: Check spelling run: | - misspell -error . + ./misspell -error . diff --git a/include/mscclpp/channel.hpp b/include/mscclpp/channel.hpp index 407e7f687..a012bf063 100644 --- a/include/mscclpp/channel.hpp +++ b/include/mscclpp/channel.hpp @@ -131,9 +131,7 @@ struct DeviceChannel { uint64_t curFifoHead = fifo_.push( ChannelTrigger(TriggerData | TriggerFlag | TriggerSync, dst, dstOffset, src, srcOffset, size, channelId_) .value); - while (*(volatile uint64_t*)&fifo_.triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 && - *(volatile uint64_t*)fifo_.tailReplica <= curFifoHead) - ; + fifo_.sync(curFifoHead); } __forceinline__ __device__ void putWithSignalAndFlush(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) { @@ -142,11 +140,7 @@ struct DeviceChannel { __forceinline__ __device__ void flush() { uint64_t curFifoHead = fifo_.push(ChannelTrigger(TriggerSync, 0, 0, 0, 0, 1, channelId_).value); - // we need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail - // to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0. - while (*(volatile uint64_t*)&fifo_.triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 && - *(volatile uint64_t*)fifo_.tailReplica <= curFifoHead) - ; + fifo_.sync(curFifoHead); } __forceinline__ __device__ void wait() { epoch_.wait(); } diff --git a/include/mscclpp/epoch.hpp b/include/mscclpp/epoch.hpp index 28fad52e2..b08602605 100644 --- a/include/mscclpp/epoch.hpp +++ b/include/mscclpp/epoch.hpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace mscclpp { @@ -51,8 +52,7 @@ class DeviceEpoch : BaseEpoch { #ifdef __CUDACC__ __forceinline__ __device__ void wait() { (*expectedInboundEpochId) += 1; - while (*(volatile uint64_t*)&(epochIds->inboundReplica) < (*expectedInboundEpochId)) - ; + POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(epochIds->inboundReplica) < (*expectedInboundEpochId), 1000000000); } __forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(epochIds->outbound) += 1; } diff --git a/include/mscclpp/errors.hpp b/include/mscclpp/errors.hpp index cdaf9ed80..ff84a1a24 100644 --- a/include/mscclpp/errors.hpp +++ b/include/mscclpp/errors.hpp @@ -12,6 +12,7 @@ enum class ErrorCode { SystemError, InternalError, InvalidUsage, + Timeout, }; std::string errorToString(enum ErrorCode error); diff --git a/include/mscclpp/fifo.hpp b/include/mscclpp/fifo.hpp index aff86f8f5..9e75c8b62 100644 --- a/include/mscclpp/fifo.hpp +++ b/include/mscclpp/fifo.hpp @@ -1,17 +1,14 @@ #ifndef MSCCLPP_FIFO_HPP_ #define MSCCLPP_FIFO_HPP_ -#include - +#include #include #include +#include -namespace mscclpp { - -// For every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER, a flush of the tail to device memory is triggered. -// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem. #define MSCCLPP_PROXY_FIFO_SIZE 128 -#define MSCCLPP_PROXY_FIFO_FLUSH_COUNTER 4 + +namespace mscclpp { struct alignas(16) ProxyTrigger { uint64_t fst, snd; @@ -34,14 +31,23 @@ struct DeviceProxyFifo { #ifdef __CUDACC__ __forceinline__ __device__ uint64_t push(ProxyTrigger trigger) { uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1); - while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica)) - ; - while (*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0) - ; + + POLL_MAYBE_JAILBREAK(curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica), 1000000000); + + POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0, 1000000000); + ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]); asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd)); return curFifoHead; } + + __forceinline__ __device__ void sync(uint64_t curFifoHead) { + // We need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail + // to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0. + POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]) != 0 && + *(volatile uint64_t*)(this->tailReplica) <= curFifoHead, + 1000000000); + } #endif // __CUDACC__ ProxyTrigger* triggers; // Allocate on host via cudaHostAlloc. This space is used for pushing the workelements diff --git a/include/mscclpp/poll.hpp b/include/mscclpp/poll.hpp new file mode 100644 index 000000000..2ffa9664c --- /dev/null +++ b/include/mscclpp/poll.hpp @@ -0,0 +1,43 @@ +#ifndef MSCCLPP_POLL_HPP_ +#define MSCCLPP_POLL_HPP_ + +#ifdef __CUDACC__ + +#ifndef NDEBUG +#include +#define POLL_PRINT_ON_STUCK(__cond) \ + do { \ + printf("mscclpp: spin is stuck. condition: " #__cond "\n"); \ + } while (0); +#else // NDEBUG +#define POLL_PRINT_ON_STUCK(__cond) +#endif // NDEBUG + +// If a spin is stuck, escape from it and set status to 1. +#define POLL_MAYBE_JAILBREAK_ESCAPE(__cond, __max_spin_cnt, __status) \ + do { \ + uint64_t __spin_cnt = 0; \ + __status = 0; \ + while (__cond) { \ + if (__spin_cnt++ == __max_spin_cnt) { \ + POLL_PRINT_ON_STUCK(__cond); \ + __status = 1; \ + break; \ + } \ + } \ + } while (0); + +// If a spin is stuck, print a warning and keep spinning. +#define POLL_MAYBE_JAILBREAK(__cond, __max_spin_cnt) \ + do { \ + uint64_t __spin_cnt = 0; \ + while (__cond) { \ + if (__spin_cnt++ == __max_spin_cnt) { \ + POLL_PRINT_ON_STUCK(__cond); \ + } \ + } \ + } while (0); + +#endif // __CUDACC__ + +#endif // MSCCLPP_POLL_HPP_ diff --git a/include/mscclpp/utils.hpp b/include/mscclpp/utils.hpp index 3614fa024..1ce5d07dd 100644 --- a/include/mscclpp/utils.hpp +++ b/include/mscclpp/utils.hpp @@ -1,55 +1,37 @@ #ifndef MSCCLPP_UTILS_HPP_ #define MSCCLPP_UTILS_HPP_ -#include - #include -#include -#include -#include #include namespace mscclpp { struct Timer { - std::chrono::steady_clock::time_point start; + std::chrono::steady_clock::time_point start_; + int timeout_; + + Timer(int timeout = -1); + + ~Timer(); - Timer() { start = std::chrono::steady_clock::now(); } + int64_t elapsed() const; - int64_t elapsed() { - auto end = std::chrono::steady_clock::now(); - return std::chrono::duration_cast(end - start).count(); - } + void set(int timeout); - void reset() { start = std::chrono::steady_clock::now(); } + void reset(); - void print(const char* name) { - auto end = std::chrono::steady_clock::now(); - auto elapsed = std::chrono::duration_cast(end - start).count(); - printf("%s: %ld us\n", name, elapsed); - } + void print(const std::string& name); }; -struct ScopedTimer { - Timer timer; - const char* name; +struct ScopedTimer : public Timer { + const std::string name_; - ScopedTimer(const char* name) : name(name) {} + ScopedTimer(const std::string& name); - ~ScopedTimer() { timer.print(name); } + ~ScopedTimer(); }; -inline std::string getHostName(int maxlen, const char delim) { - std::string hostname(maxlen + 1, '\0'); - if (gethostname(const_cast(hostname.data()), maxlen) != 0) { - std::strncpy(const_cast(hostname.data()), "unknown", maxlen); - throw Error("gethostname failed", ErrorCode::SystemError); - } - int i = 0; - while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++; - hostname[i] = '\0'; - return hostname; -} +std::string getHostName(int maxlen, const char delim); } // namespace mscclpp diff --git a/src/errors.cc b/src/errors.cc index 50d7a2efe..66ab99674 100644 --- a/src/errors.cc +++ b/src/errors.cc @@ -13,6 +13,8 @@ std::string errorToString(enum ErrorCode error) { return "InternalError"; case ErrorCode::InvalidUsage: return "InvalidUsage"; + case ErrorCode::Timeout: + return "Timeout"; default: return "UnknownError"; } diff --git a/src/fifo.cc b/src/fifo.cc index d76769262..1cf435fe7 100644 --- a/src/fifo.cc +++ b/src/fifo.cc @@ -49,9 +49,8 @@ MSCCLPP_API_CPP void HostProxyFifo::pop() { } MSCCLPP_API_CPP void HostProxyFifo::flushTail(bool sync) { - // Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure - // that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush - // request. + // Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure that the fifo can + // make progress even if there is no request mscclppSync. However, mscclppSync type is for flush request. MSCCLPP_CUDATHROW(cudaMemcpyAsync(pimpl->tailReplica.get(), &pimpl->hostTail, sizeof(uint64_t), cudaMemcpyHostToDevice, pimpl->stream)); if (sync) { diff --git a/src/include/utils_internal.hpp b/src/include/utils_internal.hpp index 16e6dfd64..2f0ad978f 100644 --- a/src/include/utils_internal.hpp +++ b/src/include/utils_internal.hpp @@ -35,18 +35,6 @@ using TimePoint = std::chrono::steady_clock::time_point; TimePoint getClock(); int64_t elapsedClock(TimePoint start, TimePoint end); -/* get any bytes of random data from /dev/urandom */ -inline void getRandomData(void* buffer, size_t bytes) { - if (bytes > 0) { - const size_t one = 1UL; - FILE* fp = fopen("/dev/urandom", "r"); - if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) { - throw Error("Failed to read random data", ErrorCode::SystemError); - } - if (fp) fclose(fp); - } -} - } // namespace mscclpp #endif diff --git a/src/proxy.cc b/src/proxy.cc index f7c32dd46..0ff5ffcb5 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -10,6 +10,8 @@ namespace mscclpp { const int ProxyStopCheckPeriod = 1000; +// Unless explicitly requested, a flush of the tail to device memory is triggered for every ProxyFlushPeriod. +// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem. const int ProxyFlushPeriod = 4; struct Proxy::Impl { diff --git a/src/utils.cc b/src/utils.cc new file mode 100644 index 000000000..d39ba2df4 --- /dev/null +++ b/src/utils.cc @@ -0,0 +1,66 @@ +#include +#include + +#include +#include +#include +#include +#include +#include + +// Throw upon SIGALRM. +static void sigalrmTimeoutHandler(int) { + signal(SIGALRM, SIG_IGN); + throw mscclpp::Error("Timer timed out", mscclpp::ErrorCode::Timeout); +} + +namespace mscclpp { + +Timer::Timer(int timeout) { set(timeout); } + +Timer::~Timer() { + if (timeout_ > 0) { + alarm(0); + signal(SIGALRM, SIG_DFL); + } +} + +int64_t Timer::elapsed() const { + auto end = std::chrono::steady_clock::now(); + return std::chrono::duration_cast(end - start_).count(); +} + +void Timer::set(int timeout) { + timeout_ = timeout; + if (timeout > 0) { + signal(SIGALRM, sigalrmTimeoutHandler); + alarm(timeout); + } + start_ = std::chrono::steady_clock::now(); +} + +void Timer::reset() { set(timeout_); } + +void Timer::print(const std::string& name) { + auto us = elapsed(); + std::stringstream ss; + ss << name << ": " << us << " us\n"; + std::cout << ss.str(); +} + +ScopedTimer::ScopedTimer(const std::string& name) : name_(name) {} + +ScopedTimer::~ScopedTimer() { print(name_); } + +std::string getHostName(int maxlen, const char delim) { + std::string hostname(maxlen + 1, '\0'); + if (gethostname(const_cast(hostname.data()), maxlen) != 0) { + throw Error("gethostname failed", ErrorCode::SystemError); + } + int i = 0; + while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++; + hostname[i] = '\0'; + return hostname; +} + +} // namespace mscclpp diff --git a/src/utils_internal.cc b/src/utils_internal.cc index f5190a0a9..41c7ea386 100644 --- a/src/utils_internal.cc +++ b/src/utils_internal.cc @@ -171,4 +171,17 @@ TimePoint getClock() { return std::chrono::steady_clock::now(); } int64_t elapsedClock(TimePoint start, TimePoint end) { return std::chrono::duration_cast(end - start).count(); } + +/* get any bytes of random data from /dev/urandom */ +void getRandomData(void* buffer, size_t bytes) { + if (bytes > 0) { + const size_t one = 1UL; + FILE* fp = fopen("/dev/urandom", "r"); + if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) { + throw Error("Failed to read random data", ErrorCode::SystemError); + } + if (fp) fclose(fp); + } +} + } // namespace mscclpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 54fe31b63..57bc9cc54 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -9,17 +9,19 @@ function(add_test_executable name sources) endif() endfunction() -add_test_executable(bootstrap_test_cpp bootstrap_test_cpp.cc) -add_test_executable(communicator_test_cpp communicator_test_cpp.cu) add_test_executable(allgather_test_cpp allgather_test_cpp.cu) add_test_executable(allgather_test_host_offloading allgather_test_host_offloading.cu) -add_test_executable(ib_test ib_test.cc) + +add_executable(mp_unit_tests mp_unit_tests.cu) +target_link_libraries(mp_unit_tests mscclpp CUDA::cudart CUDA::cuda_driver MPI::MPI_CXX GTest::gtest_main GTest::gmock_main) +target_include_directories(mp_unit_tests PRIVATE ${PROJECT_SOURCE_DIR}/src/include) configure_file(run_mpi_test.sh.in run_mpi_test.sh) # Unit tests add_executable(unit_tests) target_link_libraries(unit_tests GTest::gtest_main GTest::gmock_main mscclpp CUDA::cudart CUDA::cuda_driver) +target_include_directories(unit_tests PRIVATE ${PROJECT_SOURCE_DIR}/src/include) add_subdirectory(unit) # This adds the sources to the mscclpp target gtest_discover_tests(unit_tests DISCOVERY_MODE PRE_TEST) diff --git a/test/bootstrap_test_cpp.cc b/test/bootstrap_test_cpp.cc deleted file mode 100644 index 12a4d9a1f..000000000 --- a/test/bootstrap_test_cpp.cc +++ /dev/null @@ -1,122 +0,0 @@ -#include - -#include -#include -#include -#include - -void test_allgather(std::shared_ptr bootstrap) { - std::vector tmp(bootstrap->getNranks(), 0); - tmp[bootstrap->getRank()] = bootstrap->getRank() + 1; - bootstrap->allGather(tmp.data(), sizeof(int)); - for (int i = 0; i < bootstrap->getNranks(); i++) { - assert(tmp[i] == i + 1); - } - if (bootstrap->getRank() == 0) std::cout << "AllGather test passed!" << std::endl; -} - -void test_barrier(std::shared_ptr bootstrap) { - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "Barrier test passed!" << std::endl; -} - -void test_sendrecv(std::shared_ptr bootstrap) { - for (int i = 0; i < bootstrap->getNranks(); i++) { - if (bootstrap->getRank() == i) continue; - int msg1 = (bootstrap->getRank() + 1) * 3; - int msg2 = (bootstrap->getRank() + 1) * 3 + 1; - int msg3 = (bootstrap->getRank() + 1) * 3 + 2; - bootstrap->send(&msg1, sizeof(int), i, 0); - bootstrap->send(&msg2, sizeof(int), i, 1); - bootstrap->send(&msg3, sizeof(int), i, 2); - } - - for (int i = 0; i < bootstrap->getNranks(); i++) { - if (bootstrap->getRank() == i) continue; - int msg1 = 0; - int msg2 = 0; - int msg3 = 0; - // recv them in the opposite order to check correctness - bootstrap->recv(&msg2, sizeof(int), i, 1); - bootstrap->recv(&msg3, sizeof(int), i, 2); - bootstrap->recv(&msg1, sizeof(int), i, 0); - assert(msg1 == (i + 1) * 3); - assert(msg2 == (i + 1) * 3 + 1); - assert(msg3 == (i + 1) * 3 + 2); - } - if (bootstrap->getRank() == 0) std::cout << "Send/Recv test passed!" << std::endl; -} - -void test_all(std::shared_ptr bootstrap) { - test_allgather(bootstrap); - test_barrier(bootstrap); - test_sendrecv(bootstrap); -} - -void test_mscclpp_bootstrap_with_id(int rank, int worldSize) { - auto bootstrap = std::make_shared(rank, worldSize); - mscclpp::UniqueId id; - if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); - MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); - bootstrap->initialize(id); - - test_all(bootstrap); - if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with unique id passed! ---" << std::endl; -} - -void test_mscclpp_bootstrap_with_ip_port_pair(int rank, int worldSize, char* ipPortPair) { - std::shared_ptr bootstrap(new mscclpp::Bootstrap(rank, worldSize)); - bootstrap->initialize(ipPortPair); - - test_all(bootstrap); - if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Bootstrap test with ip_port pair passed! ---" << std::endl; -} - -class MPIBootstrap : public mscclpp::BaseBootstrap { - public: - MPIBootstrap() : BaseBootstrap() {} - int getRank() override { - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - return rank; - } - int getNranks() override { - int worldSize; - MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - return worldSize; - } - void allGather(void* sendbuf, int size) override { - MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD); - } - void barrier() override { MPI_Barrier(MPI_COMM_WORLD); } - void send(void* sendbuf, int size, int dest, int tag) override { - MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD); - } - void recv(void* recvbuf, int size, int source, int tag) override { - MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - } -}; - -void test_mpi_bootstrap() { - std::shared_ptr bootstrap(new MPIBootstrap()); - test_all(bootstrap); - if (bootstrap->getRank() == 0) std::cout << "--- MPI Bootstrap test passed! ---" << std::endl; -} - -int main(int argc, char** argv) { - int rank, worldSize; - MPI_Init(&argc, &argv); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - if (argc > 2) { - if (rank == 0) std::cout << "Usage: " << argv[0] << " [ip:port]" << std::endl; - MPI_Finalize(); - return 0; - } - test_mscclpp_bootstrap_with_id(rank, worldSize); - if (argc == 2) test_mscclpp_bootstrap_with_ip_port_pair(rank, worldSize, argv[1]); - test_mpi_bootstrap(); - - MPI_Finalize(); - return 0; -} diff --git a/test/communicator_test_cpp.cu b/test/communicator_test_cpp.cu deleted file mode 100644 index 38b9fe624..000000000 --- a/test/communicator_test_cpp.cu +++ /dev/null @@ -1,336 +0,0 @@ -#include -#include - -#include -#include -#include -#include -#include -#include - -#define CUDATHROW(cmd) \ - do { \ - cudaError_t err = cmd; \ - if (err != cudaSuccess) { \ - throw std::runtime_error(std::string("Cuda failure '") + cudaGetErrorString(err) + "'"); \ - } \ - } while (false) - -mscclpp::Transport findIb(int localRank) { - mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2, - mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5, - mscclpp::Transport::IB6, mscclpp::Transport::IB7}; - return IBs[localRank]; -} - -void register_all_memories(mscclpp::Communicator& communicator, int rank, int worldSize, void* devicePtr, - size_t deviceBufferSize, mscclpp::Transport myIbDevice, - mscclpp::RegisteredMemory& localMemory, - std::unordered_map& remoteMemory) { - localMemory = communicator.registerMemory(devicePtr, deviceBufferSize, mscclpp::Transport::CudaIpc | myIbDevice); - std::unordered_map> futureRemoteMemory; - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - communicator.sendMemoryOnSetup(localMemory, i, 0); - futureRemoteMemory[i] = communicator.recvMemoryOnSetup(i, 0); - } - } - communicator.setup(); - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - remoteMemory[i] = futureRemoteMemory[i].get(); - } - } -} - -void make_connections(mscclpp::Communicator& communicator, int rank, int worldSize, int nRanksPerNode, - mscclpp::Transport myIbDevice, - std::unordered_map>& connections) { - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - if (i / nRanksPerNode == rank / nRanksPerNode) { - connections[i] = communicator.connectOnSetup(i, 0, mscclpp::Transport::CudaIpc); - } else { - connections[i] = communicator.connectOnSetup(i, 0, myIbDevice); - } - } - } - communicator.setup(); -} - -void write_remote(int rank, int worldSize, std::unordered_map>& connections, - std::unordered_map& remoteRegisteredMemories, - mscclpp::RegisteredMemory& registeredMemory, int dataCountPerRank) { - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - auto& conn = connections.at(i); - auto& peerMemory = remoteRegisteredMemories.at(i); - conn->write(peerMemory, rank * dataCountPerRank * sizeof(int), registeredMemory, - rank * dataCountPerRank * sizeof(int), dataCountPerRank * sizeof(int)); - conn->flush(); - } - } -} - -void device_buffer_init(int rank, int worldSize, int dataCount, std::vector& devicePtr) { - for (int n = 0; n < (int)devicePtr.size(); n++) { - std::vector hostBuffer(dataCount, 0); - for (int i = 0; i < dataCount; i++) { - hostBuffer[i] = rank + n * worldSize; - } - CUDATHROW(cudaMemcpy(devicePtr[n], hostBuffer.data(), dataCount * sizeof(int), cudaMemcpyHostToDevice)); - } - CUDATHROW(cudaDeviceSynchronize()); -} - -bool test_device_buffer_write_correctness(int rank, int worldSize, int nRanksPerNode, int dataCount, - std::vector& devicePtr, bool skipLocal = false) { - for (int n = 0; n < (int)devicePtr.size(); n++) { - std::vector hostBuffer(dataCount, 0); - CUDATHROW(cudaMemcpy(hostBuffer.data(), devicePtr[n], dataCount * sizeof(int), cudaMemcpyDeviceToHost)); - for (int i = 0; i < worldSize; i++) { - if (i / nRanksPerNode == rank / nRanksPerNode && skipLocal) { - continue; - } - for (int j = i * dataCount / worldSize; j < (i + 1) * dataCount / worldSize; j++) { - if (hostBuffer[j] != i + n * worldSize) { - return false; - } - } - } - } - return true; -} - -void test_write(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize, - std::shared_ptr bootstrap, - std::unordered_map>& connections, - std::vector>& remoteMemory, - std::vector& localMemory, std::vector& devicePtr, int numBuffers) { - assert((deviceBufferSize / sizeof(int)) % worldSize == 0); - size_t dataCount = deviceBufferSize / sizeof(int); - - device_buffer_init(rank, worldSize, dataCount, devicePtr); - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl; - - for (int n = 0; n < numBuffers; n++) { - write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize); - } - bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "RDMA write for " << std::to_string(numBuffers) << " buffers passed" << std::endl; - - // polling until it becomes ready - bool ready = false; - int niter = 0; - do { - ready = test_device_buffer_write_correctness(rank, worldSize, nRanksPerNode, dataCount, devicePtr); - niter++; - if (niter == 10000) { - throw std::runtime_error("Polling is stuck."); - } - } while (!ready); - - bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "Polling for " << std::to_string(numBuffers) << " buffers passed" << std::endl; - - if (bootstrap->getRank() == 0) std::cout << "--- Testing vanialla writes passed ---" << std::endl; -} - -__global__ void increament_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { - int tid = threadIdx.x; - if (tid != rank && tid < worldSize) { - deviceEpochs[tid].epochIncrement(); - } -} - -__global__ void wait_epochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { - int tid = threadIdx.x; - if (tid != rank && tid < worldSize) { - deviceEpochs[tid].wait(); - } -} - -void test_write_with_device_epochs(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize, - mscclpp::Communicator& communicator, - std::shared_ptr bootstrap, - std::unordered_map>& connections, - std::vector>& remoteMemory, - std::vector& localMemory, std::vector& devicePtr, - int numBuffers) { - std::unordered_map> epochs; - for (auto entry : connections) { - auto& conn = entry.second; - epochs.insert({entry.first, std::make_shared(communicator, conn)}); - } - communicator.setup(); - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl; - - assert((deviceBufferSize / sizeof(int)) % worldSize == 0); - size_t dataCount = deviceBufferSize / sizeof(int); - - device_buffer_init(rank, worldSize, dataCount, devicePtr); - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl; - - mscclpp::DeviceEpoch::DeviceHandle* deviceEpochHandles; - CUDATHROW(cudaMalloc(&deviceEpochHandles, sizeof(mscclpp::DeviceEpoch::DeviceHandle) * worldSize)); - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - mscclpp::DeviceEpoch::DeviceHandle deviceHandle = epochs[i]->deviceHandle(); - CUDATHROW(cudaMemcpy(&deviceEpochHandles[i], &deviceHandle, sizeof(mscclpp::DeviceEpoch::DeviceHandle), - cudaMemcpyHostToDevice)); - } - } - CUDATHROW(cudaDeviceSynchronize()); - - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "CUDA device epochs are created" << std::endl; - - for (int n = 0; n < numBuffers; n++) { - write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize); - } - - increament_epochs<<<1, worldSize>>>(deviceEpochHandles, rank, worldSize); - CUDATHROW(cudaDeviceSynchronize()); - - for (int i = 0; i < worldSize; i++) { - if (i != rank) { - epochs[i]->signal(); - } - } - - wait_epochs<<<1, worldSize>>>(deviceEpochHandles, rank, worldSize); - CUDATHROW(cudaDeviceSynchronize()); - - if (!test_device_buffer_write_correctness(rank, worldSize, nRanksPerNode, dataCount, devicePtr)) { - throw std::runtime_error("unexpected result."); - } - - bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "--- Testing writes with device epochs for " << std::to_string(numBuffers) << " buffers passed ---" - << std::endl; -} - -void test_write_with_host_epochs(int rank, int worldSize, int nRanksPerNode, int deviceBufferSize, - mscclpp::Communicator& communicator, std::shared_ptr bootstrap, - std::unordered_map>& connections, - std::vector>& remoteMemory, - std::vector& localMemory, std::vector& devicePtr, - int numBuffers) { - std::unordered_map> epochs; - for (auto entry : connections) { - auto& conn = entry.second; - if (conn->transport() == mscclpp::Transport::CudaIpc) continue; - epochs.insert({entry.first, std::make_shared(communicator, conn)}); - } - communicator.setup(); - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "Epochs are created" << std::endl; - - assert((deviceBufferSize / sizeof(int)) % worldSize == 0); - size_t dataCount = deviceBufferSize / sizeof(int); - - device_buffer_init(rank, worldSize, dataCount, devicePtr); - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "CUDA memory initialization passed" << std::endl; - - bootstrap->barrier(); - if (bootstrap->getRank() == 0) std::cout << "Host epochs are created" << std::endl; - - for (int n = 0; n < numBuffers; n++) { - write_remote(rank, worldSize, connections, remoteMemory[n], localMemory[n], dataCount / worldSize); - } - - for (int i = 0; i < worldSize; i++) { - if (i != rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { - epochs[i]->incrementAndSignal(); - } - } - - for (int i = 0; i < worldSize; i++) { - if (i != rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { - epochs[i]->wait(); - } - } - - if (!test_device_buffer_write_correctness(rank, worldSize, nRanksPerNode, dataCount, devicePtr, true)) { - throw std::runtime_error("unexpected result."); - } - - bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "--- Testing writes with host epochs for " << std::to_string(numBuffers) << " buffers passed ---" - << std::endl; -} - -void test_communicator(int rank, int worldSize, int nRanksPerNode) { - auto bootstrap = std::make_shared(rank, worldSize); - mscclpp::UniqueId id; - if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); - MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); - bootstrap->initialize(id); - - mscclpp::Communicator communicator(bootstrap); - if (bootstrap->getRank() == 0) std::cout << "Communicator initialization passed" << std::endl; - - std::unordered_map> connections; - auto myIbDevice = findIb(rank % nRanksPerNode); - - make_connections(communicator, rank, worldSize, nRanksPerNode, myIbDevice, connections); - if (bootstrap->getRank() == 0) std::cout << "Connection setup passed" << std::endl; - - int numBuffers = 10; - std::vector devicePtr(numBuffers); - int deviceBufferSize = 1024 * 1024; - - std::vector localMemory(numBuffers); - std::vector> remoteMemory(numBuffers); - - for (int n = 0; n < numBuffers; n++) { - if (n % 100 == 0) std::cout << "Registering memory for " << std::to_string(n) << " buffers" << std::endl; - CUDATHROW(cudaMalloc(&devicePtr[n], deviceBufferSize)); - register_all_memories(communicator, rank, worldSize, devicePtr[n], deviceBufferSize, myIbDevice, localMemory[n], - remoteMemory[n]); - } - bootstrap->barrier(); - if (bootstrap->getRank() == 0) - std::cout << "Memory registration for " << std::to_string(numBuffers) << " buffers passed" << std::endl; - - test_write(rank, worldSize, nRanksPerNode, deviceBufferSize, bootstrap, connections, remoteMemory, localMemory, - devicePtr, numBuffers); - - test_write_with_device_epochs(rank, worldSize, nRanksPerNode, deviceBufferSize, communicator, bootstrap, connections, - remoteMemory, localMemory, devicePtr, numBuffers); - - test_write_with_host_epochs(rank, worldSize, nRanksPerNode, deviceBufferSize, communicator, bootstrap, connections, - remoteMemory, localMemory, devicePtr, numBuffers); - - if (bootstrap->getRank() == 0) std::cout << "--- MSCCLPP::Communicator tests passed! ---" << std::endl; - - for (int n = 0; n < numBuffers; n++) { - CUDATHROW(cudaFree(devicePtr[n])); - } -} - -int main(int argc, char** argv) { - int rank, worldSize; - MPI_Init(&argc, &argv); - MPI_Comm_rank(MPI_COMM_WORLD, &rank); - MPI_Comm_size(MPI_COMM_WORLD, &worldSize); - MPI_Comm shmcomm; - MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); - int shmWorldSize; - MPI_Comm_size(shmcomm, &shmWorldSize); - int nRanksPerNode = shmWorldSize; - MPI_Comm_free(&shmcomm); - - test_communicator(rank, worldSize, nRanksPerNode); - - MPI_Finalize(); - return 0; -} diff --git a/test/ib_test.cc b/test/ib_test.cc deleted file mode 100644 index da2d0f3ab..000000000 --- a/test/ib_test.cc +++ /dev/null @@ -1,98 +0,0 @@ -#include "ib.hpp" - -#include -#include -#include -#include - -#include "checks_internal.hpp" -#include "infiniband/verbs.h" - -// Measure current time in second. -static double getTime(void) { - struct timespec tspec; - if (clock_gettime(CLOCK_MONOTONIC, &tspec) == -1) { - printf("clock_gettime failed\n"); - exit(EXIT_FAILURE); - } - return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec; -} - -// Example usage: -// Receiver: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 0 0 0 -// Sender: ./build/bin/tests/unittests/ib_test 127.0.0.1:50000 1 0 0 -int main(int argc, const char* argv[]) { - if (argc != 5) { - printf("Usage: %s <0(recv)/1(send)> \n", argv[0]); - return 1; - } - const char* ipPortPair = argv[1]; - int isSend = atoi(argv[2]); - int cudaDevId = atoi(argv[3]); - std::string ibDevName = "mlx5_ib" + std::string(argv[4]); - - MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId)); - - int nelem = 1; - auto data = mscclpp::allocUniqueCuda(nelem); - - std::shared_ptr bootstrap(new mscclpp::Bootstrap(isSend, 2)); - bootstrap->initialize(ipPortPair); - - mscclpp::IbCtx ctx(ibDevName); - mscclpp::IbQp* qp = ctx.createQp(); - const mscclpp::IbMr* mr = ctx.registerMr(data.get(), sizeof(int) * nelem); - - std::array qpInfo; - qpInfo[isSend] = qp->getInfo(); - - std::array mrInfo; - mrInfo[isSend] = mr->getInfo(); - - bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo)); - bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo)); - - for (int i = 0; i < bootstrap->getNranks(); ++i) { - if (i == isSend) continue; - qp->rtr(qpInfo[i]); - qp->rts(); - break; - } - - printf("connection succeed\n"); - - bootstrap->barrier(); - - if (isSend) { - int maxIter = 100000; - double start = getTime(); - for (int iter = 0; iter < maxIter; ++iter) { - qp->stageSend(mr, mrInfo[0], sizeof(int) * nelem, 0, 0, 0, true); - qp->postSend(); - bool waiting = true; - while (waiting) { - int wcNum = qp->pollCq(); - if (wcNum < 0) { - WARN("pollCq failed: errno %d", errno); - return 1; - } - for (int i = 0; i < wcNum; ++i) { - const ibv_wc* wc = qp->getWc(i); - if (wc->status != IBV_WC_SUCCESS) { - WARN("wc status %d", wc->status); - return 1; - } - waiting = false; - break; - } - } - } - // TODO(chhwang): print detailed stats such as avg, 99%p, etc. - printf("%f us/iter\n", (getTime() - start) / maxIter * 1e6); - } - - // A simple barrier - bootstrap->barrier(); - - return 0; -} diff --git a/test/mp_unit_tests.cu b/test/mp_unit_tests.cu new file mode 100644 index 000000000..377625a87 --- /dev/null +++ b/test/mp_unit_tests.cu @@ -0,0 +1,765 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "config.hpp" +#include "ib.hpp" +#include "infiniband/verbs.h" + +static const char gDefaultIpPort[] = "127.0.0.1:50053"; + +class MultiProcessTestEnv : public ::testing::Environment { + public: + MultiProcessTestEnv(int argc, const char** argv) : argc(argc), argv(argv) {} + + // Override this to define how to set up the environment. + void SetUp() { + MPI_Init(NULL, NULL); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + // get the local number of nodes with MPI + MPI_Comm shmcomm; + MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shmcomm); + int shmrank; + MPI_Comm_size(shmcomm, &shmrank); + nRanksPerNode = shmrank; + MPI_Comm_free(&shmcomm); + + // parse the command line arguments + args = parseArgs(argc, argv); + } + + // Override this to define how to tear down the environment. + void TearDown() { MPI_Finalize(); } + + static std::unordered_map parseArgs(int argc, const char* argv[]) { + auto printUsage = [](const char* prog) { + std::stringstream ss; + ss << "Usage: " << prog << " [-ip_port IP:PORT]\n"; + std::cout << ss.str(); + }; + + std::unordered_map options; + + // Default values + options["ip_port"] = gDefaultIpPort; + + // Parse the command line arguments + for (int i = 1; i < argc; i++) { + std::string arg = argv[i]; + if (arg == "-ip_port") { + if (i + 1 < argc) { + options["ip_port"] = argv[++i]; + } else { + throw std::invalid_argument("Error: -ip_port option requires an argument.\n"); + } + } else if (arg == "-help" || arg == "-h") { + printUsage(argv[0]); + exit(0); + } else { + throw std::invalid_argument("Error: Unknown option " + std::string(argv[i]) + "\n"); + } + } + return options; + } + + const int argc; + const char** argv; + int rank; + int worldSize; + int nRanksPerNode; + std::unordered_map args; +}; + +MultiProcessTestEnv* gEnv = nullptr; + +class MultiProcessTest : public ::testing::Test {}; + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + gEnv = new MultiProcessTestEnv(argc, (const char**)argv); + ::testing::AddGlobalTestEnvironment(gEnv); + return RUN_ALL_TESTS(); +} + +TEST_F(MultiProcessTest, Prelim) { + // Test to make sure the MPI environment is set up correctly + ASSERT_GE(gEnv->worldSize, 2); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Bootstrap tests +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class BootstrapTest : public MultiProcessTest { + protected: + // Each test case should finish within 3 seconds. + mscclpp::Timer bootstrapTestTimer{3}; +}; + +void bootstrapTestAllGather(std::shared_ptr bootstrap) { + std::vector tmp(bootstrap->getNranks(), 0); + tmp[bootstrap->getRank()] = bootstrap->getRank() + 1; + bootstrap->allGather(tmp.data(), sizeof(int)); + for (int i = 0; i < bootstrap->getNranks(); ++i) { + EXPECT_EQ(tmp[i], i + 1); + } +} + +void bootstrapTestBarrier(std::shared_ptr bootstrap) { bootstrap->barrier(); } + +void bootstrapTestSendRecv(std::shared_ptr bootstrap) { + for (int i = 0; i < bootstrap->getNranks(); i++) { + if (bootstrap->getRank() == i) continue; + int msg1 = (bootstrap->getRank() + 1) * 3; + int msg2 = (bootstrap->getRank() + 1) * 3 + 1; + int msg3 = (bootstrap->getRank() + 1) * 3 + 2; + bootstrap->send(&msg1, sizeof(int), i, 0); + bootstrap->send(&msg2, sizeof(int), i, 1); + bootstrap->send(&msg3, sizeof(int), i, 2); + } + + for (int i = 0; i < bootstrap->getNranks(); i++) { + if (bootstrap->getRank() == i) continue; + int msg1 = 0; + int msg2 = 0; + int msg3 = 0; + // recv them in the opposite order to check correctness + bootstrap->recv(&msg2, sizeof(int), i, 1); + bootstrap->recv(&msg3, sizeof(int), i, 2); + bootstrap->recv(&msg1, sizeof(int), i, 0); + EXPECT_EQ(msg1, (i + 1) * 3); + EXPECT_EQ(msg2, (i + 1) * 3 + 1); + EXPECT_EQ(msg3, (i + 1) * 3 + 2); + } +} + +void bootstrapTestAll(std::shared_ptr bootstrap) { + bootstrapTestAllGather(bootstrap); + bootstrapTestBarrier(bootstrap); + bootstrapTestSendRecv(bootstrap); +} + +TEST_F(BootstrapTest, WithId) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id; + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + bootstrap->initialize(id); + bootstrapTestAll(bootstrap); +} + +TEST_F(BootstrapTest, WithIpPortPair) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + bootstrap->initialize(gEnv->args["ip_port"]); + bootstrapTestAll(bootstrap); +} + +TEST_F(BootstrapTest, ResumeWithId) { + for (int i = 0; i < 5; ++i) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id; + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + bootstrap->initialize(id); + } +} + +TEST_F(BootstrapTest, ResumeWithIpPortPair) { + // TODO: enable when the bug is fixed. bootstrap hangs and even timer doesn't work +#if 0 + for (int i = 0; i < 5; ++i) { + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + bootstrap->initialize(gEnv->args["ip_port"]); + } +#else + // TODO: remove when the bug is fixed. + FAIL(); +#endif +} + +TEST_F(BootstrapTest, ExitBeforeConnect) { + // TODO: enable when the bug is fixed. bootstrap rootThread_ does not exit gracefully +#if 0 + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id = bootstrap->createUniqueId(); +#else + // TODO: remove when the bug is fixed. + FAIL(); +#endif +} + +TEST_F(BootstrapTest, TimeoutWithId) { + // TODO: enable when BootstrapTest.ExitBeforeConnect passes. +#if 0 + // Set bootstrap timeout to 1 second + mscclpp::Config* cfg = mscclpp::Config::getInstance(); + cfg->setBootstrapConnectionTimeoutConfig(1); + + // All ranks initialize a bootstrap with their own id (will hang) + auto bootstrap = std::make_shared(gEnv->rank, gEnv->worldSize); + mscclpp::UniqueId id = bootstrap->createUniqueId(); + + ASSERT_THROW(bootstrap->initialize(id), mscclpp::Error); + + // Timeout should be less than 3 seconds + ASSERT_LT(timer.elapsed(), 3000000); +#else + // TODO: remove when BootstrapTest.ExitBeforeConnect passes. + FAIL(); +#endif +} + +class MPIBootstrap : public mscclpp::BaseBootstrap { + public: + MPIBootstrap() : BaseBootstrap() {} + int getRank() override { + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + return rank; + } + int getNranks() override { + int worldSize; + MPI_Comm_size(MPI_COMM_WORLD, &worldSize); + return worldSize; + } + void allGather(void* sendbuf, int size) override { + MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sendbuf, size, MPI_BYTE, MPI_COMM_WORLD); + } + void barrier() override { MPI_Barrier(MPI_COMM_WORLD); } + void send(void* sendbuf, int size, int dest, int tag) override { + MPI_Send(sendbuf, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD); + } + void recv(void* recvbuf, int size, int source, int tag) override { + MPI_Recv(recvbuf, size, MPI_BYTE, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + } +}; + +TEST_F(BootstrapTest, MPIBootstrap) { + auto bootstrap = std::make_shared(); + bootstrapTestAll(bootstrap); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// InfiniBand tests +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +static mscclpp::Transport ibIdToTransport(int id) { + mscclpp::Transport IBs[] = {mscclpp::Transport::IB0, mscclpp::Transport::IB1, mscclpp::Transport::IB2, + mscclpp::Transport::IB3, mscclpp::Transport::IB4, mscclpp::Transport::IB5, + mscclpp::Transport::IB6, mscclpp::Transport::IB7}; + return IBs[id]; +} + +class IbTest : public MultiProcessTest { + protected: + void SetUp() override { + MSCCLPP_CUDATHROW(cudaGetDeviceCount(&cudaDevNum)); + cudaDevId = (gEnv->rank % gEnv->nRanksPerNode) % cudaDevNum; + MSCCLPP_CUDATHROW(cudaSetDevice(cudaDevId)); + + int ibDevId = (gEnv->rank % gEnv->nRanksPerNode) / mscclpp::getIBDeviceCount(); + ibDevName = mscclpp::getIBDeviceName(ibIdToTransport(ibDevId)); + } + + int cudaDevNum; + int cudaDevId; + std::string ibDevName; +}; + +TEST_F(IbTest, SimpleSendRecv) { + if (gEnv->rank >= 2) { + // This test needs only two ranks + return; + } + + mscclpp::Timer timer(3); + + const int maxIter = 100000; + const int nelem = 1; + auto data = mscclpp::allocUniqueCuda(nelem); + + auto bootstrap = std::make_shared(gEnv->rank, 2); + mscclpp::UniqueId id; + if (bootstrap->getRank() == 0) id = bootstrap->createUniqueId(); + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + bootstrap->initialize(id); + + mscclpp::IbCtx ctx(ibDevName); + mscclpp::IbQp* qp = ctx.createQp(); + const mscclpp::IbMr* mr = ctx.registerMr(data.get(), sizeof(int) * nelem); + + std::array qpInfo; + qpInfo[gEnv->rank] = qp->getInfo(); + + std::array mrInfo; + mrInfo[gEnv->rank] = mr->getInfo(); + + bootstrap->allGather(qpInfo.data(), sizeof(mscclpp::IbQpInfo)); + bootstrap->allGather(mrInfo.data(), sizeof(mscclpp::IbMrInfo)); + + for (int i = 0; i < bootstrap->getNranks(); ++i) { + if (i == gEnv->rank) continue; + qp->rtr(qpInfo[i]); + qp->rts(); + break; + } + bootstrap->barrier(); + + if (gEnv->rank == 1) { + mscclpp::Timer timer; + for (int iter = 0; iter < maxIter; ++iter) { + qp->stageSend(mr, mrInfo[0], sizeof(int) * nelem, 0, 0, 0, true); + qp->postSend(); + bool waiting = true; + int spin = 0; + while (waiting) { + int wcNum = qp->pollCq(); + ASSERT_GE(wcNum, 0); + for (int i = 0; i < wcNum; ++i) { + const ibv_wc* wc = qp->getWc(i); + EXPECT_EQ(wc->status, IBV_WC_SUCCESS); + waiting = false; + break; + } + if (spin++ > 1000000) { + FAIL() << "Polling is stuck."; + } + } + } + float us = (float)timer.elapsed(); + std::cout << "IbTest.SimpleSendRecv: " << us / maxIter << " us/iter" << std::endl; + } + bootstrap->barrier(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Communicator tests +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class CommunicatorTestBase : public MultiProcessTest { + protected: + void SetUp() override { + MultiProcessTest::SetUp(); + + if (numRanksToUse == -1) { + numRanksToUse = gEnv->worldSize; + } + ASSERT_LE(numRanksToUse, gEnv->worldSize); + + std::shared_ptr bootstrap; + mscclpp::UniqueId id; + if (gEnv->rank < numRanksToUse) { + bootstrap = std::make_shared(gEnv->rank, numRanksToUse); + if (gEnv->rank == 0) id = bootstrap->createUniqueId(); + } + MPI_Bcast(&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD); + + if (gEnv->rank >= numRanksToUse) { + return; + } + bootstrap->initialize(id); + communicator = std::make_shared(bootstrap); + ibTransport = ibIdToTransport(rankToLocalRank(gEnv->rank)); + } + + void TearDown() override { + connections.clear(); + communicator.reset(); + MultiProcessTest::TearDown(); + } + + void setNumRanksToUse(int num) { numRanksToUse = num; } + + int rankToLocalRank(int rank) const { return rank % gEnv->nRanksPerNode; } + + int rankToNode(int rank) const { return rank / gEnv->nRanksPerNode; } + + void connectMesh(bool useIbOnly = false) { + for (int i = 0; i < numRanksToUse; i++) { + if (i != gEnv->rank) { + if ((rankToNode(i) == rankToNode(gEnv->rank)) && !useIbOnly) { + connections[i] = communicator->connectOnSetup(i, 0, mscclpp::Transport::CudaIpc); + } else { + connections[i] = communicator->connectOnSetup(i, 0, ibTransport); + } + } + } + communicator->setup(); + } + + // Register a local memory and receive corresponding remote memories + void registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, + const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, + std::unordered_map& remoteMemories) { + localMemory = communicator->registerMemory(buff, buffSize, transport); + std::unordered_map> futureRemoteMemories; + for (int remoteRank : remoteRanks) { + if (remoteRank != communicator->bootstrapper()->getRank()) { + communicator->sendMemoryOnSetup(localMemory, remoteRank, tag); + futureRemoteMemories[remoteRank] = communicator->recvMemoryOnSetup(remoteRank, tag); + } + } + communicator->setup(); + for (int remoteRank : remoteRanks) { + if (remoteRank != communicator->bootstrapper()->getRank()) { + remoteMemories[remoteRank] = futureRemoteMemories[remoteRank].get(); + } + } + } + + // Register a local memory an receive one corresponding remote memory + void registerMemoryPair(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, int remoteRank, + mscclpp::RegisteredMemory& localMemory, mscclpp::RegisteredMemory& remoteMemory) { + std::vector remoteRanks = {remoteRank}; + std::unordered_map remoteMemories; + registerMemoryPairs(buff, buffSize, transport, tag, remoteRanks, localMemory, remoteMemories); + remoteMemory = remoteMemories[remoteRank]; + } + + int numRanksToUse = -1; + std::shared_ptr communicator; + mscclpp::Transport ibTransport; + std::unordered_map> connections; +}; + +class CommunicatorTest : public CommunicatorTestBase { + protected: + void SetUp() override { + CommunicatorTestBase::SetUp(); + + ASSERT_EQ((deviceBufferSize / sizeof(int)) % gEnv->worldSize, 0); + + connectMesh(); + + devicePtr.resize(numBuffers); + localMemory.resize(numBuffers); + remoteMemory.resize(numBuffers); + + std::vector remoteRanks; + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + remoteRanks.push_back(i); + } + } + + for (int n = 0; n < numBuffers; n++) { + devicePtr[n] = mscclpp::allocSharedCuda(deviceBufferSize / sizeof(int)); + registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, + remoteRanks, localMemory[n], remoteMemory[n]); + } + } + + void TearDown() override { + remoteMemory.clear(); + localMemory.clear(); + devicePtr.clear(); + CommunicatorTestBase::TearDown(); + } + + void deviceBufferInit() { + size_t dataCount = deviceBufferSize / sizeof(int); + for (int n = 0; n < (int)devicePtr.size(); n++) { + std::vector hostBuffer(dataCount, 0); + for (int i = 0; i < dataCount; i++) { + hostBuffer[i] = gEnv->rank + n * gEnv->worldSize; + } + mscclpp::memcpyCuda(devicePtr[n].get(), hostBuffer.data(), dataCount, cudaMemcpyHostToDevice); + } + } + + void writeToRemote(int dataCountPerRank) { + for (int n = 0; n < numBuffers; n++) { + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + auto& conn = connections.at(i); + auto& peerMemory = remoteMemory[n].at(i); + conn->write(peerMemory, gEnv->rank * dataCountPerRank * sizeof(int), localMemory[n], + gEnv->rank * dataCountPerRank * sizeof(int), dataCountPerRank * sizeof(int)); + conn->flush(); + } + } + } + } + + bool testWriteCorrectness(bool skipLocal = false) { + size_t dataCount = deviceBufferSize / sizeof(int); + for (int n = 0; n < (int)devicePtr.size(); n++) { + std::vector hostBuffer(dataCount, 0); + mscclpp::memcpyCuda(hostBuffer.data(), devicePtr[n].get(), dataCount, cudaMemcpyDeviceToHost); + for (int i = 0; i < gEnv->worldSize; i++) { + if (((i / gEnv->nRanksPerNode) == (gEnv->rank / gEnv->nRanksPerNode)) && skipLocal) { + continue; + } + for (int j = i * dataCount / gEnv->worldSize; j < (i + 1) * dataCount / gEnv->worldSize; j++) { + if (hostBuffer[j] != i + n * gEnv->worldSize) { + return false; + } + } + } + } + return true; + } + + const size_t numBuffers = 10; + const int deviceBufferSize = 1024 * 1024; + std::vector> devicePtr; + std::vector localMemory; + std::vector> remoteMemory; +}; + +TEST_F(CommunicatorTest, BasicWrite) { + if (gEnv->rank >= numRanksToUse) return; + + deviceBufferInit(); + communicator->bootstrapper()->barrier(); + + writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); + communicator->bootstrapper()->barrier(); + + // polling until it becomes ready + bool ready = false; + int niter = 0; + do { + ready = testWriteCorrectness(); + niter++; + if (niter == 10000) { + FAIL() << "Polling is stuck."; + } + } while (!ready); + communicator->bootstrapper()->barrier(); +} + +__global__ void kernelIncEpochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { + int tid = threadIdx.x; + if (tid != rank && tid < worldSize) { + deviceEpochs[tid].epochIncrement(); + } +} + +__global__ void kernelWaitEpochs(mscclpp::DeviceEpoch::DeviceHandle* deviceEpochs, int rank, int worldSize) { + int tid = threadIdx.x; + if (tid != rank && tid < worldSize) { + deviceEpochs[tid].wait(); + } +} + +TEST_F(CommunicatorTest, WriteWithDeviceEpochs) { + if (gEnv->rank >= numRanksToUse) return; + + std::unordered_map> epochs; + for (auto entry : connections) { + auto& conn = entry.second; + epochs.insert({entry.first, std::make_shared(*communicator.get(), conn)}); + } + communicator->setup(); + communicator->bootstrapper()->barrier(); + + deviceBufferInit(); + communicator->bootstrapper()->barrier(); + + auto deviceEpochHandles = mscclpp::allocSharedCuda(gEnv->worldSize); + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + mscclpp::DeviceEpoch::DeviceHandle deviceHandle = epochs[i]->deviceHandle(); + mscclpp::memcpyCuda(deviceEpochHandles.get() + i, &deviceHandle, 1, + cudaMemcpyHostToDevice); + } + } + communicator->bootstrapper()->barrier(); + + writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); + + kernelIncEpochs<<<1, gEnv->worldSize>>>(deviceEpochHandles.get(), gEnv->rank, gEnv->worldSize); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + epochs[i]->signal(); + } + } + + kernelWaitEpochs<<<1, gEnv->worldSize>>>(deviceEpochHandles.get(), gEnv->rank, gEnv->worldSize); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + ASSERT_TRUE(testWriteCorrectness()); + communicator->bootstrapper()->barrier(); +} + +TEST_F(CommunicatorTest, WriteWithHostEpochs) { + if (gEnv->rank >= numRanksToUse) return; + + std::unordered_map> epochs; + for (auto entry : connections) { + auto& conn = entry.second; + // HostEpoch cannot be used with CudaIpc transport + if (conn->transport() == mscclpp::Transport::CudaIpc) continue; + epochs.insert({entry.first, std::make_shared(*communicator.get(), conn)}); + } + communicator->setup(); + communicator->bootstrapper()->barrier(); + + deviceBufferInit(); + communicator->bootstrapper()->barrier(); + + writeToRemote(deviceBufferSize / sizeof(int) / gEnv->worldSize); + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { + epochs[i]->incrementAndSignal(); + } + } + + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank && connections[i]->transport() != mscclpp::Transport::CudaIpc) { + epochs[i]->wait(); + } + } + + ASSERT_TRUE(testWriteCorrectness()); + communicator->bootstrapper()->barrier(); +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Channel tests +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class ChannelOneToOneTest : public CommunicatorTestBase { + protected: + void SetUp() override { + // Use only two ranks + setNumRanksToUse(2); + CommunicatorTestBase::SetUp(); + channelService = std::make_shared(*communicator.get()); + } + + void TearDown() override { CommunicatorTestBase::TearDown(); } + + void setupMeshConnections(std::vector& devChannels, bool useIbOnly, + void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0) { + const int rank = communicator->bootstrapper()->getRank(); + const int worldSize = communicator->bootstrapper()->getNranks(); + const bool isInPlace = (recvBuff == nullptr); + mscclpp::TransportFlags transport = mscclpp::Transport::CudaIpc | ibTransport; + + connectMesh(useIbOnly); + + for (int r = 0; r < worldSize; r++) { + if (r == rank) { + continue; + } + mscclpp::RegisteredMemory sendMemory; + mscclpp::RegisteredMemory remoteMemory; + void* tmpBuff = nullptr; + + if (isInPlace) { + registerMemoryPair(sendBuff, sendBuffBytes, transport, 0, r, sendMemory, remoteMemory); + } else { + sendMemory = communicator->registerMemory(recvBuff, recvBuffBytes, transport); + mscclpp::RegisteredMemory recvMemory; + registerMemoryPair(recvBuff, recvBuffBytes, transport, 0, r, recvMemory, remoteMemory); + tmpBuff = recvMemory.data(); + } + + mscclpp::channel::ChannelId cid = channelService->addChannel(connections[r]); + communicator->setup(); + + // TODO: enable this when we support out-of-place + // devChannels.emplace_back(channelService->deviceChannel(cid), + // channelService->addMemory(remoteMemory), channelService->addMemory(sendMemory), + // remoteMemory.data(), sendMemory.data(), tmpBuff); + devChannels.emplace_back(channelService->deviceChannel(cid), channelService->addMemory(remoteMemory), + channelService->addMemory(sendMemory), remoteMemory.data(), sendMemory.data()); + } + } + + std::shared_ptr channelService; +}; + +__constant__ mscclpp::channel::SimpleDeviceChannel gChannelOneToOneTestConstDevChans; + +__global__ void kernelPingPong(int rank, int nElem) { + mscclpp::channel::SimpleDeviceChannel& devChan = gChannelOneToOneTestConstDevChans; + volatile int* sendBuff = (volatile int*)devChan.srcPtr_; + int nTries = 1000; + int flusher = 0; + int rank1Offset = 10000000; + for (int i = 0; i < nTries; i++) { + if (rank == 0) { + if (i > 0) { + if (threadIdx.x == 0) devChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + if (sendBuff[j] != rank1Offset + i - 1 + j) { + printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], 100000 + i - 1 + j); + } + } + } + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + sendBuff[j] = i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int)); + } + if (rank == 1) { + if (threadIdx.x == 0) devChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + if (sendBuff[j] != i + j) { + printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j); + } + } + if (i < nTries - 1) { + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + sendBuff[j] = rank1Offset + i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) devChan.putWithSignal(0, nElem * sizeof(int)); + } + } + flusher++; + if (flusher == 100) { + devChan.flush(); + flusher = 0; + } + } +} + +TEST_F(ChannelOneToOneTest, PingPongIb) { + if (gEnv->rank >= numRanksToUse) return; + + const int nElem = 4 * 1024 * 1024; + + std::vector devChannels; + std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + setupMeshConnections(devChannels, true, buff.get(), nElem * sizeof(int)); + + ASSERT_EQ(devChannels.size(), 1); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstDevChans, devChannels.data(), + sizeof(mscclpp::channel::SimpleDeviceChannel))); + + channelService->startProxy(); + + kernelPingPong<<<1, 1024>>>(gEnv->rank, 1); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + kernelPingPong<<<1, 1024>>>(gEnv->rank, 1024); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + kernelPingPong<<<1, 1024>>>(gEnv->rank, 1024 * 1024); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + kernelPingPong<<<1, 1024>>>(gEnv->rank, 4 * 1024 * 1024); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + channelService->stopProxy(); +} diff --git a/test/mscclpp-test/alltoall_test.cu b/test/mscclpp-test/alltoall_test.cu index 8f55bc17d..b36370032 100644 --- a/test/mscclpp-test/alltoall_test.cu +++ b/test/mscclpp-test/alltoall_test.cu @@ -47,7 +47,8 @@ __device__ void alltoall1(int rank, int nRanksPerNode, size_t nElements) { __global__ void kernel(int rank, int worldSize, size_t nElements, int nRanksPerNode, int kernelNum) { if (kernelNum == 0) { alltoall0(rank, worldSize, nElements); - } if (kernelNum == 1) { + } + if (kernelNum == 1) { alltoall1(rank, nRanksPerNode, nElements); } } @@ -69,7 +70,7 @@ void AllToAllTestColl::runColl(const TestArgs& args, cudaStream_t stream) { const int kernelNum = args.kernelNum; const int nRanksPerNode = args.nRanksPerNode; CUDATHROW(cudaMemcpyAsync((int*)localRecvBuff + paramCount_ * rank, (int*)localSendBuff + paramCount_ * rank, - paramCount_ * sizeof(int), cudaMemcpyDeviceToDevice, stream)); + paramCount_ * sizeof(int), cudaMemcpyDeviceToDevice, stream)); kernel<<>>(rank, worldSize, paramCount_, nRanksPerNode, kernelNum); } diff --git a/test/mscclpp-test/common.cu b/test/mscclpp-test/common.cu index 1893937ca..0c37acd2e 100644 --- a/test/mscclpp-test/common.cu +++ b/test/mscclpp-test/common.cu @@ -72,8 +72,11 @@ double allreduceTime(int worldSize, double value, int average) { double accumulator = value; if (average != 0) { - MPI_Op op = - average == 1 ? MPI_SUM : average == 2 ? MPI_MIN : average == 3 ? MPI_MAX : average == 4 ? MPI_SUM : MPI_Op(); + MPI_Op op = average == 1 ? MPI_SUM + : average == 2 ? MPI_MIN + : average == 3 ? MPI_MAX + : average == 4 ? MPI_SUM + : MPI_Op(); MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator, 1, MPI_DOUBLE, op, MPI_COMM_WORLD); } diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 356625fb6..636cd8460 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -1,4 +1,8 @@ target_sources(unit_tests PRIVATE core_tests.cc - cuda_memory_tests.cc + cuda_utils_tests.cc + errors_tests.cc + fifo_tests.cu + numa_tests.cc + utils_tests.cc ) diff --git a/test/unit/core_tests.cc b/test/unit/core_tests.cc index 7adfb7ea5..f33cc5c31 100644 --- a/test/unit/core_tests.cc +++ b/test/unit/core_tests.cc @@ -37,14 +37,14 @@ TEST_F(LocalCommunicatorTest, RegisterMemory) { EXPECT_EQ(memory.transports(), mscclpp::NoTransports); } -TEST_F(LocalCommunicatorTest, SendMemoryToSelf) { - int dummy[42]; - auto memory = comm->registerMemory(&dummy, sizeof(dummy), mscclpp::NoTransports); - comm->sendMemoryOnSetup(memory, 0, 0); - auto memoryFuture = comm->recvMemoryOnSetup(0, 0); - comm->setup(); - auto sameMemory = memoryFuture.get(); - EXPECT_EQ(sameMemory.size(), memory.size()); - EXPECT_EQ(sameMemory.rank(), memory.rank()); - EXPECT_EQ(sameMemory.transports(), memory.transports()); -} +// TEST_F(LocalCommunicatorTest, SendMemoryToSelf) { +// int dummy[42]; +// auto memory = comm->registerMemory(&dummy, sizeof(dummy), mscclpp::NoTransports); +// comm->sendMemoryOnSetup(memory, 0, 0); +// auto memoryFuture = comm->recvMemoryOnSetup(0, 0); +// comm->setup(); +// auto sameMemory = memoryFuture.get(); +// EXPECT_EQ(sameMemory.size(), memory.size()); +// EXPECT_EQ(sameMemory.rank(), memory.rank()); +// EXPECT_EQ(sameMemory.transports(), memory.transports()); +// } diff --git a/test/unit/cuda_memory_tests.cc b/test/unit/cuda_memory_tests.cc deleted file mode 100644 index 7e670aeb7..000000000 --- a/test/unit/cuda_memory_tests.cc +++ /dev/null @@ -1,13 +0,0 @@ -#include - -#include - -TEST(CudaMemoryTest, Shared) { - auto p1 = mscclpp::allocSharedCuda(); - auto p2 = mscclpp::allocSharedCuda(5); -} - -TEST(CudaMemoryTest, Unique) { - auto p1 = mscclpp::allocUniqueCuda(); - auto p2 = mscclpp::allocUniqueCuda(5); -} diff --git a/test/unit/cuda_utils_tests.cc b/test/unit/cuda_utils_tests.cc new file mode 100644 index 000000000..2bc118599 --- /dev/null +++ b/test/unit/cuda_utils_tests.cc @@ -0,0 +1,39 @@ +#include + +#include + +TEST(CudaUtilsTest, AllocShared) { + auto p1 = mscclpp::allocSharedCuda(); + auto p2 = mscclpp::allocSharedCuda(5); +} + +TEST(CudaUtilsTest, AllocUnique) { + auto p1 = mscclpp::allocUniqueCuda(); + auto p2 = mscclpp::allocUniqueCuda(5); +} + +TEST(CudaUtilsTest, MakeSharedHost) { + auto p1 = mscclpp::makeSharedCudaHost(); + auto p2 = mscclpp::makeSharedCudaHost(5); +} + +TEST(CudaUtilsTest, MakeUniqueHost) { + auto p1 = mscclpp::makeUniqueCudaHost(); + auto p2 = mscclpp::makeUniqueCudaHost(5); +} + +TEST(CudaUtilsTest, Memcpy) { + const int nElem = 1024; + std::vector hostBuff(nElem); + for (int i = 0; i < nElem; ++i) { + hostBuff[i] = i + 1; + } + std::vector hostBuffTmp(nElem, 0); + auto devBuff = mscclpp::allocSharedCuda(nElem); + mscclpp::memcpyCuda(devBuff.get(), hostBuff.data(), nElem, cudaMemcpyHostToDevice); + mscclpp::memcpyCuda(hostBuffTmp.data(), devBuff.get(), nElem, cudaMemcpyDeviceToHost); + + for (int i = 0; i < nElem; ++i) { + EXPECT_EQ(hostBuff[i], hostBuffTmp[i]); + } +} diff --git a/test/unit/errors_tests.cc b/test/unit/errors_tests.cc new file mode 100644 index 000000000..04b056fce --- /dev/null +++ b/test/unit/errors_tests.cc @@ -0,0 +1,33 @@ +#include + +#include + +TEST(ErrorsTest, SystemError) { + mscclpp::Error error("test", mscclpp::ErrorCode::SystemError); + EXPECT_EQ(error.getErrorCode(), static_cast(mscclpp::ErrorCode::SystemError)); + EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: SystemError)")); +} + +TEST(ErrorsTest, InternalError) { + mscclpp::Error error("test", mscclpp::ErrorCode::InternalError); + EXPECT_EQ(error.getErrorCode(), static_cast(mscclpp::ErrorCode::InternalError)); + EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: InternalError)")); +} + +TEST(ErrorsTest, InvalidUsage) { + mscclpp::Error error("test", mscclpp::ErrorCode::InvalidUsage); + EXPECT_EQ(error.getErrorCode(), static_cast(mscclpp::ErrorCode::InvalidUsage)); + EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: InvalidUsage)")); +} + +TEST(ErrorsTest, Timeout) { + mscclpp::Error error("test", mscclpp::ErrorCode::Timeout); + EXPECT_EQ(error.getErrorCode(), static_cast(mscclpp::ErrorCode::Timeout)); + EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: Timeout)")); +} + +TEST(ErrorsTest, UnknownError) { + mscclpp::Error error("test", static_cast(-1)); + EXPECT_EQ(error.getErrorCode(), -1); + EXPECT_EQ(error.what(), std::string("test (Mscclpp failure: UnknownError)")); +} diff --git a/test/unit/fifo_tests.cu b/test/unit/fifo_tests.cu new file mode 100644 index 000000000..b531ccacd --- /dev/null +++ b/test/unit/fifo_tests.cu @@ -0,0 +1,67 @@ +#include + +#include +#include +#include + +#define FLUSH_PERIOD (MSCCLPP_PROXY_FIFO_SIZE) // should not exceed MSCCLPP_PROXY_FIFO_SIZE +#define ITER 10000 // should be larger than MSCCLPP_PROXY_FIFO_SIZE for proper testing + +__constant__ mscclpp::DeviceProxyFifo gFifoTestDeviceProxyFifo; +__global__ void kernelFifoTest() { + if (threadIdx.x + blockIdx.x * blockDim.x != 0) return; + + mscclpp::DeviceProxyFifo& fifo = gFifoTestDeviceProxyFifo; + mscclpp::ProxyTrigger trigger; + for (uint64_t i = 1; i < ITER + 1; ++i) { + trigger.fst = i; + trigger.snd = i; + uint64_t curFifoHead = fifo.push(trigger); + if (i % FLUSH_PERIOD == 0) { + fifo.sync(curFifoHead); + } + } +} + +TEST(FifoTest, HostProxyFifo) { + ASSERT_LE(FLUSH_PERIOD, MSCCLPP_PROXY_FIFO_SIZE); + + mscclpp::HostProxyFifo hostFifo; + mscclpp::DeviceProxyFifo devFifo = hostFifo.deviceFifo(); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gFifoTestDeviceProxyFifo, &devFifo, sizeof(devFifo))); + + kernelFifoTest<<<1, 1>>>(); + MSCCLPP_CUDATHROW(cudaGetLastError()); + + mscclpp::ProxyTrigger trigger; + trigger.fst = 0; + trigger.snd = 0; + + uint64_t spin = 0; + uint64_t flushCnt = 0; + mscclpp::Timer timer(3); + for (uint64_t i = 0; i < ITER; ++i) { + while (trigger.fst == 0) { + hostFifo.poll(&trigger); + + if (spin++ > 1000000) { + FAIL() << "Polling is stuck."; + } + } + ASSERT_TRUE(trigger.fst == (i + 1)); + ASSERT_TRUE(trigger.snd == (i + 1)); + hostFifo.pop(); + if ((++flushCnt % FLUSH_PERIOD) == 0) { + hostFifo.flushTail(); + } + trigger.fst = 0; + spin = 0; + } + hostFifo.flushTail(true); + + std::stringstream ss; + ss << "FifoTest.HostProxyFifo: " << (float)timer.elapsed() / ITER << " us/iter\n"; + std::cout << ss.str(); + + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); +} diff --git a/test/unit/numa_tests.cc b/test/unit/numa_tests.cc new file mode 100644 index 000000000..5fbd5c2e6 --- /dev/null +++ b/test/unit/numa_tests.cc @@ -0,0 +1,18 @@ +#include + +#include + +#include "numa.hpp" + +TEST(NumaTest, Basic) { + int num; + MSCCLPP_CUDATHROW(cudaGetDeviceCount(&num)); + if (num == 0) { + return; + } + for (int i = 0; i < num; i++) { + int numaNode = mscclpp::getDeviceNumaNode(i); + EXPECT_GE(numaNode, 0); + mscclpp::numaBind(numaNode); + } +} diff --git a/test/unit/utils_tests.cc b/test/unit/utils_tests.cc new file mode 100644 index 000000000..d4f6dadd1 --- /dev/null +++ b/test/unit/utils_tests.cc @@ -0,0 +1,49 @@ +#include + +#include +#include + +TEST(UtilsTest, Timer) { + mscclpp::Timer timer; + sleep(1); + int64_t elapsed = timer.elapsed(); + EXPECT_GE(elapsed, 1000000); + + timer.reset(); + sleep(1); + elapsed = timer.elapsed(); + EXPECT_GE(elapsed, 1000000); + EXPECT_LT(elapsed, 1100000); +} + +TEST(UtilsTest, TimerTimeout) { + mscclpp::Timer timer(1); + ASSERT_THROW(sleep(2), mscclpp::Error); +} + +TEST(UtilsTest, TimerTimeoutReset) { + mscclpp::Timer timer(3); + sleep(2); + // Resetting the timer should prevent the timeout. + timer.reset(); + ASSERT_NO_THROW(sleep(2)); + + // Elapsed time should be slightly larger than 2 seconds. + EXPECT_GT(timer.elapsed(), 2000000); + EXPECT_LT(timer.elapsed(), 2100000); +} + +TEST(UtilsTest, ScopedTimer) { + mscclpp::ScopedTimer timerA("UtilsTest.ScopedTimer.A"); + mscclpp::ScopedTimer timerB("UtilsTest.ScopedTimer.B"); + sleep(1); + int64_t elapsedA = timerA.elapsed(); + int64_t elapsedB = timerB.elapsed(); + EXPECT_GE(elapsedA, 1000000); + EXPECT_GE(elapsedB, 1000000); +} + +TEST(UtilsTest, getHostName) { + std::string hostname = mscclpp::getHostName(1024, '.'); + EXPECT_FALSE(hostname.empty()); +}