Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update unit tests #81

Merged
merged 28 commits into from
Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .azure-pipelines/ut.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
trigger:
- main

pr:
- main

jobs:
- job: UnitTest
timeoutInMinutes: 30
pool:
name: mscclpp
container:
image: superbench/superbench:v0.8.0-cuda12.1
options: --privileged --ipc=host --gpus=all --ulimit memlock=-1:-1

steps:
- task: Bash@3
name: Build
displayName: Build
inputs:
targetType: 'inline'
script: |
curl -L -C- https://github.com/Kitware/CMake/releases/download/v3.26.4/cmake-3.26.4-linux-x86_64.tar.gz -o /tmp/cmake-3.26.4-linux-x86_64.tar.gz
tar xzf /tmp/cmake-3.26.4-linux-x86_64.tar.gz -C /tmp
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/cuda-12.1/compat/lib.real
mkdir build && cd build
/tmp/cmake-3.26.4-linux-x86_64/bin/cmake ..
make -j
workingDirectory: '$(System.DefaultWorkingDirectory)'


- task: Bash@3
name: UnitTests
displayName: Run mscclpp unit tests
inputs:
targetType: 'inline'
script: |
./build/test/unit_tests
workingDirectory: '$(System.DefaultWorkingDirectory)'

- task: Bash@3
name: MpUnitTests
displayName: Run mscclpp multi-process unit tests
inputs:
targetType: 'inline'
script: |
mpirun -tag-output -np 2 ./build/test/mp_unit_tests
mpirun -tag-output -np 4 ./build/test/mp_unit_tests
mpirun -tag-output -np 8 ./build/test/mp_unit_tests
workingDirectory: '$(System.DefaultWorkingDirectory)'
9 changes: 5 additions & 4 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:

- name: Run cpplint
run: |
CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*" -not -path "./test/*")
CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*" -not -path "./python/*")
PYTHONCPPSOURCES=$(find ./python/src/ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)')
clang-format-12 -style=file --verbose --Werror --dry-run ${CPPSOURCES}
clang-format-12 --dry-run ${PYTHONCPPSOURCES}
Expand All @@ -40,10 +40,11 @@ jobs:
- name: Check out Git repository
uses: actions/checkout@v3

- name: Install dependencies
- name: Download misspell
run: |
curl -L https://git.io/misspell | sudo bash -s -- -b /bin
curl -L https://github.com/client9/misspell/releases/download/v0.3.4/misspell_0.3.4_linux_64bit.tar.gz -o /tmp/misspell_0.3.4_linux_64bit.tar.gz
tar -xzf /tmp/misspell_0.3.4_linux_64bit.tar.gz -C .

- name: Check spelling
run: |
misspell -error .
./misspell -error .
10 changes: 2 additions & 8 deletions include/mscclpp/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ struct DeviceChannel {
uint64_t curFifoHead = fifo_.push(
ChannelTrigger(TriggerData | TriggerFlag | TriggerSync, dst, dstOffset, src, srcOffset, size, channelId_)
.value);
while (*(volatile uint64_t*)&fifo_.triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo_.tailReplica <= curFifoHead)
;
fifo_.sync(curFifoHead);
}

__forceinline__ __device__ void putWithSignalAndFlush(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) {
Expand All @@ -142,11 +140,7 @@ struct DeviceChannel {

__forceinline__ __device__ void flush() {
uint64_t curFifoHead = fifo_.push(ChannelTrigger(TriggerSync, 0, 0, 0, 0, 1, channelId_).value);
// we need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail
// to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0.
while (*(volatile uint64_t*)&fifo_.triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0 &&
*(volatile uint64_t*)fifo_.tailReplica <= curFifoHead)
;
fifo_.sync(curFifoHead);
}

__forceinline__ __device__ void wait() { epoch_.wait(); }
Expand Down
4 changes: 2 additions & 2 deletions include/mscclpp/epoch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <memory>
#include <mscclpp/core.hpp>
#include <mscclpp/cuda_utils.hpp>
#include <mscclpp/poll.hpp>

namespace mscclpp {

Expand Down Expand Up @@ -51,8 +52,7 @@ class DeviceEpoch : BaseEpoch<CudaDeleter> {
#ifdef __CUDACC__
__forceinline__ __device__ void wait() {
(*expectedInboundEpochId) += 1;
while (*(volatile uint64_t*)&(epochIds->inboundReplica) < (*expectedInboundEpochId))
;
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(epochIds->inboundReplica) < (*expectedInboundEpochId), 1000000000);
}

__forceinline__ __device__ void epochIncrement() { *(volatile uint64_t*)&(epochIds->outbound) += 1; }
Expand Down
1 change: 1 addition & 0 deletions include/mscclpp/errors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum class ErrorCode {
SystemError,
InternalError,
InvalidUsage,
Timeout,
};

std::string errorToString(enum ErrorCode error);
Expand Down
28 changes: 17 additions & 11 deletions include/mscclpp/fifo.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
#ifndef MSCCLPP_FIFO_HPP_
#define MSCCLPP_FIFO_HPP_

#include <stdint.h>

#include <cstdint>
#include <functional>
#include <memory>
#include <mscclpp/poll.hpp>

namespace mscclpp {

// For every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER, a flush of the tail to device memory is triggered.
// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem.
#define MSCCLPP_PROXY_FIFO_SIZE 128
#define MSCCLPP_PROXY_FIFO_FLUSH_COUNTER 4

namespace mscclpp {

struct alignas(16) ProxyTrigger {
uint64_t fst, snd;
Expand All @@ -34,14 +31,23 @@ struct DeviceProxyFifo {
#ifdef __CUDACC__
__forceinline__ __device__ uint64_t push(ProxyTrigger trigger) {
uint64_t curFifoHead = atomicAdd((unsigned long long int*)this->head, 1);
while (curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica))
;
while (*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0)
;

POLL_MAYBE_JAILBREAK(curFifoHead >= MSCCLPP_PROXY_FIFO_SIZE + *((volatile uint64_t*)this->tailReplica), 1000000000);

POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE] != 0, 1000000000);

ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]);
asm volatile("st.volatile.global.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd));
return curFifoHead;
}

__forceinline__ __device__ void sync(uint64_t curFifoHead) {
// We need to wait for two conditions to be met to ensure the CPU is done flushing. (1) wait for the tail
// to go pass by curFifoHead (this is safety net) and (2) wait for the work element value to change to 0.
POLL_MAYBE_JAILBREAK(*(volatile uint64_t*)&(this->triggers[curFifoHead % MSCCLPP_PROXY_FIFO_SIZE]) != 0 &&
*(volatile uint64_t*)(this->tailReplica) <= curFifoHead,
1000000000);
}
#endif // __CUDACC__

ProxyTrigger* triggers; // Allocate on host via cudaHostAlloc. This space is used for pushing the workelements
Expand Down
43 changes: 43 additions & 0 deletions include/mscclpp/poll.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef MSCCLPP_POLL_HPP_
#define MSCCLPP_POLL_HPP_

#ifdef __CUDACC__

#ifndef NDEBUG
#include <stdio.h>
#define POLL_PRINT_ON_STUCK(__cond) \
do { \
printf("mscclpp: spin is stuck. condition: " #__cond "\n"); \
} while (0);
#else // NDEBUG
#define POLL_PRINT_ON_STUCK(__cond)
#endif // NDEBUG

// If a spin is stuck, escape from it and set status to 1.
#define POLL_MAYBE_JAILBREAK_ESCAPE(__cond, __max_spin_cnt, __status) \
do { \
uint64_t __spin_cnt = 0; \
__status = 0; \
while (__cond) { \
if (__spin_cnt++ == __max_spin_cnt) { \
POLL_PRINT_ON_STUCK(__cond); \
__status = 1; \
break; \
} \
} \
} while (0);

// If a spin is stuck, print a warning and keep spinning.
#define POLL_MAYBE_JAILBREAK(__cond, __max_spin_cnt) \
do { \
uint64_t __spin_cnt = 0; \
while (__cond) { \
if (__spin_cnt++ == __max_spin_cnt) { \
POLL_PRINT_ON_STUCK(__cond); \
} \
} \
} while (0);

#endif // __CUDACC__

#endif // MSCCLPP_POLL_HPP_
48 changes: 15 additions & 33 deletions include/mscclpp/utils.hpp
Original file line number Diff line number Diff line change
@@ -1,55 +1,37 @@
#ifndef MSCCLPP_UTILS_HPP_
#define MSCCLPP_UTILS_HPP_

#include <unistd.h>

#include <chrono>
#include <cstdio>
#include <cstring>
#include <mscclpp/errors.hpp>
#include <string>

namespace mscclpp {

struct Timer {
std::chrono::steady_clock::time_point start;
std::chrono::steady_clock::time_point start_;
int timeout_;

Timer(int timeout = -1);

~Timer();

Timer() { start = std::chrono::steady_clock::now(); }
int64_t elapsed() const;

int64_t elapsed() {
auto end = std::chrono::steady_clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
}
void set(int timeout);

void reset() { start = std::chrono::steady_clock::now(); }
void reset();

void print(const char* name) {
auto end = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
printf("%s: %ld us\n", name, elapsed);
}
void print(const std::string& name);
};

struct ScopedTimer {
Timer timer;
const char* name;
struct ScopedTimer : public Timer {
const std::string name_;

ScopedTimer(const char* name) : name(name) {}
ScopedTimer(const std::string& name);

~ScopedTimer() { timer.print(name); }
~ScopedTimer();
};

inline std::string getHostName(int maxlen, const char delim) {
std::string hostname(maxlen + 1, '\0');
if (gethostname(const_cast<char*>(hostname.data()), maxlen) != 0) {
std::strncpy(const_cast<char*>(hostname.data()), "unknown", maxlen);
throw Error("gethostname failed", ErrorCode::SystemError);
}
int i = 0;
while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen - 1)) i++;
hostname[i] = '\0';
return hostname;
}
std::string getHostName(int maxlen, const char delim);

} // namespace mscclpp

Expand Down
2 changes: 2 additions & 0 deletions src/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ std::string errorToString(enum ErrorCode error) {
return "InternalError";
case ErrorCode::InvalidUsage:
return "InvalidUsage";
case ErrorCode::Timeout:
return "Timeout";
default:
return "UnknownError";
}
Expand Down
5 changes: 2 additions & 3 deletions src/fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ MSCCLPP_API_CPP void HostProxyFifo::pop() {
}

MSCCLPP_API_CPP void HostProxyFifo::flushTail(bool sync) {
// Flush the tail to device memory. This is either triggered every MSCCLPP_PROXY_FIFO_FLUSH_COUNTER to make sure
// that the fifo can make progress even if there is no request mscclppSync. However, mscclppSync type is for flush
// request.
// Flush the tail to device memory. This is either triggered every ProxyFlushPeriod to make sure that the fifo can
// make progress even if there is no request mscclppSync. However, mscclppSync type is for flush request.
MSCCLPP_CUDATHROW(cudaMemcpyAsync(pimpl->tailReplica.get(), &pimpl->hostTail, sizeof(uint64_t),
cudaMemcpyHostToDevice, pimpl->stream));
if (sync) {
Expand Down
12 changes: 0 additions & 12 deletions src/include/utils_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,6 @@ using TimePoint = std::chrono::steady_clock::time_point;
TimePoint getClock();
int64_t elapsedClock(TimePoint start, TimePoint end);

/* get any bytes of random data from /dev/urandom */
inline void getRandomData(void* buffer, size_t bytes) {
if (bytes > 0) {
const size_t one = 1UL;
FILE* fp = fopen("/dev/urandom", "r");
if (buffer == NULL || fp == NULL || fread(buffer, bytes, one, fp) != one) {
throw Error("Failed to read random data", ErrorCode::SystemError);
}
if (fp) fclose(fp);
}
}

} // namespace mscclpp

#endif
2 changes: 2 additions & 0 deletions src/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace mscclpp {

const int ProxyStopCheckPeriod = 1000;

// Unless explicitly requested, a flush of the tail to device memory is triggered for every ProxyFlushPeriod.
// As long as MSCCLPP_PROXY_FIFO_SIZE is large enough, having a stale tail is not a problem.
const int ProxyFlushPeriod = 4;

struct Proxy::Impl {
Expand Down
Loading