Skip to content

Commit

Permalink
colltrace: introduce hang progress mock for testing (NVIDIA#41)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: facebookresearch#41

Reviewed By: YulunW

Differential Revision: D54004780

fbshipit-source-id: 41627e6eafa14411c3d4dfdb83374622b08fca4b
  • Loading branch information
minsii authored and facebook-github-bot committed Mar 13, 2024
1 parent f53029e commit cb1c1f5
Show file tree
Hide file tree
Showing 10 changed files with 887 additions and 2 deletions.
16 changes: 16 additions & 0 deletions README.cvars
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,22 @@ Description:
Type: string
Default:

NCCL_PROXYMOCK_NET_SEND_FAILURE
Description:
Backdoor to mock a send failure in proxy thread following the format of
"<opCount>,<rank>,<remoteRank>,<step>,<num_match>,<delay_sec>".
Set any of <opCount>,<rank>,<remoteRank>,<step> to -1 to match any.
Set <delay_sec> to 0 to always skip this send as a network failure,
otherwise delay it in seconds to mock slow network.
Set <num_match> to control how many times to match the send failure.
Example 1: "10,1,0,5,1,30" - mock a slow send by delaying 30 seconds
for the send operation matching [opCount 10, channel 1, rank 0, remote
rank 1, starting from step 5].
Example 2: "10,1,-1,-1,2,0" - mock 2 send failures matching [opCount 10,
channel 1, rank 0, *any* remote rank, any step].
Type: stringlist
Default: None

NCCL_PROXYTRACE
Description:
Enable proxy operation trace collection on the proxy thread. Valid options
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ LIBSRCFILES += algorithms/AlgoDirector.cc \
algorithms/allreduce/AlgoManagerAllReduce.cc
LIBSRCFILES += collectives/all_to_allv.cc collectives/all_to_all.cc
LIBSRCFILES += colltrace/CollTrace.cc
LIBSRCFILES += colltrace/ProxyTrace.cc
LIBSRCFILES += colltrace/ProxyTrace.cc colltrace/ProxyMock.cc
LIBSRCFILES += window.cc
LIBSRCFILES += commHash.cc

Expand Down
173 changes: 173 additions & 0 deletions src/colltrace/ProxyMock.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.

#include "ProxyMock.h"
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "ExtUtils.h"
#include "TraceUtils.h"
#include "checks.h"
#include "comm.h"
#include "nccl.h"

/*
=== BEGIN_NCCL_CVAR_INFO_BLOCK ===
- name : NCCL_PROXYMOCK_NET_SEND_FAILURE
type : stringlist
default :
description : |-
Backdoor to mock a send failure in proxy thread following the format of
"<opCount>,<rank>,<remoteRank>,<step>,<num_match>,<delay_sec>".
Set any of <opCount>,<rank>,<remoteRank>,<step> to -1 to match any.
Set <delay_sec> to 0 to always skip this send as a network failure,
otherwise delay it in seconds to mock slow network.
Set <num_match> to control how many times to match the send failure.
Example 1: "10,1,0,5,1,30" - mock a slow send by delaying 30 seconds
for the send operation matching [opCount 10, channel 1, rank 0, remote
rank 1, starting from step 5].
Example 2: "10,1,-1,-1,2,0" - mock 2 send failures matching [opCount 10,
channel 1, rank 0, *any* remote rank, any step].
=== END_NCCL_CVAR_INFO_BLOCK ===
*/

static std::vector<std::string> netSendFailureKeys =
{"opCount", "rank", "remoteRank", "step", "numMatch", "delaySec"};

std::string ProxyMockNetSendFailure::serialize(bool quoted) {
std::unordered_map<std::string, std::string> map;
map["opCount"] = std::to_string(opCount_);
map["rank"] = std::to_string(rank_);
map["remoteRank"] = std::to_string(remoteRank_);
map["step"] = std::to_string(step_);
map["numMatch"] = std::to_string(numMatch_);
map["delaySec"] = std::to_string(delaySec_);

return serializeMap(netSendFailureKeys, map, quoted);
}

ProxyMockNetSendFailure::ProxyMockNetSendFailure() {
initialize();
}

void ProxyMockNetSendFailure::initialize() {
// no lock here since we assume direct call would be only in tests and only
// after communicator destroy

// Reset all fields if set
enabled_ = false;
mockStartMap_.clear();
numMatched_ = 0;

// Enable mock only when all required fields are set
if (NCCL_PROXYMOCK_NET_SEND_FAILURE.size() == netSendFailureKeys.size()) {
opCount_ = std::stoi(NCCL_PROXYMOCK_NET_SEND_FAILURE[0]);
rank_ = std::stoi(NCCL_PROXYMOCK_NET_SEND_FAILURE[1]);
remoteRank_ = std::stoi(NCCL_PROXYMOCK_NET_SEND_FAILURE[2]);
step_ = std::stoi(NCCL_PROXYMOCK_NET_SEND_FAILURE[3]);
numMatch_ = std::stoi(NCCL_PROXYMOCK_NET_SEND_FAILURE[4]);
delaySec_ = std::stoi(NCCL_PROXYMOCK_NET_SEND_FAILURE[5]);

enabled_ = true;
std::string configStr = serialize();
INFO(
NCCL_ENV | NCCL_INIT,
"PROXYMOCK: enabled NetSendFailure with config %s",
configStr.c_str());
} else if (!NCCL_PROXYMOCK_NET_SEND_FAILURE.empty()) {
WARN(
"PROXYMOCK: invalid value of NCCL_PROXYMOCK_NET_SEND_FAILURE. Valid format: %s",
vecToStr(netSendFailureKeys, ",").c_str());
}
}

bool ProxyMockNetSendFailure::mockImpl(
struct ncclProxySubArgs* sub,
int currStep,
void** request) {
std::lock_guard<std::mutex> lock(mutex_);
bool mocked = false;

if ((opCount_ == PROXYMOCK_MATCH_ANY ||
sub->traceArgs.collInfo.opCount == opCount_) &&
(rank_ == PROXYMOCK_MATCH_ANY || sub->traceArgs.rank == rank_) &&
(remoteRank_ == PROXYMOCK_MATCH_ANY ||
sub->traceArgs.remoteRank == remoteRank_) &&
(step_ == PROXYMOCK_MATCH_ANY || currStep >= step_)) {
std::string mockConfigStr = this->serialize();

// Only warn the first time, because proxy thread will hang here and repeat
// the mock
auto cond = NetSendFailureCond(
sub->traceArgs.collInfo.commHash,
sub->traceArgs.collInfo.opCount,
sub->traceArgs.proxyOpId,
sub->traceArgs.rank,
sub->traceArgs.remoteRank,
currStep);
auto condStr = cond.toString();

// A new mock match if the SEND is not matched yet and within numMatch_
// limit
if (mockStartMap_.count(cond) == 0 && numMatched_ < numMatch_) {
WARN(
"PROXYMOCK: Mocked send failure (matched %d times), skiped SEND at %s with config %s",
numMatched_,
condStr.c_str(),
mockConfigStr.c_str());
// Record hanging start time
mockStartMap_[cond] = std::chrono::high_resolution_clock::now();
mocked = true;

// Use seperate counter to track how many times it is mocked, since a mock
// can be erased from mockStartMap_ if delaySec is set
numMatched_++;
} else if (mockStartMap_.count(cond)) {
mocked = true;
// If being mocked, check if it is time to continue or stay hanging
if (delaySec_ > 0) {
auto now = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
now - mockStartMap_[cond]);
if (duration.count() > delaySec_) {
WARN(
"PROXYMOCK: Mocked send failure at %s has been hanging for more than %d seconds, continue execution",
condStr.c_str(),
delaySec_);

mockStartMap_.erase(cond);
mocked = false;
}
} else {
// always stay hanging
mocked = true;
}
}
}

if (mocked) {
// Set request to NULL so that net.cc would not check CQE associated with
// it and assumes post_send is not yet called
*request = NULL;
}

return mocked;
}

ProxyMockNetSendFailure& ProxyMockNetSendFailure::getInstance() {
static ProxyMockNetSendFailure instance_;
return instance_;
}

bool ProxyMockNetSendFailure::mock(
struct ncclProxySubArgs* sub,
int step,
void** request) {
auto& instance = getInstance();
if (instance.enabled_) {
return instance.mockImpl(sub, step, request);
} else {
return false;
}
}
113 changes: 113 additions & 0 deletions src/colltrace/ProxyMock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
#ifndef PROXY_MOCK_H
#define PROXY_MOCK_H

#include <ExtUtils.h>
#include <unistd.h>
#include <atomic>
#include <cstddef>
#include <cstdlib>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

#define PROXYMOCK_MATCH_ANY (-1)

class NetSendFailureCond {
public:
uint64_t commHash;
uint64_t opCount;
int proxyOpId;
int rank;
int remoteRank;
int step;

NetSendFailureCond() {}
NetSendFailureCond(
uint64_t commHash,
uint64_t opCount,
int proxyOpId,
int rank,
int remoteRank,
int step)
: commHash(commHash),
opCount(opCount),
proxyOpId(proxyOpId),
rank(rank),
remoteRank(remoteRank),
step(step){};

std::string toString() {
std::stringstream ss;
ss << "commHash:" << hashToHexStr(commHash) << ", opCount:" << opCount
<< ", proxyOpId:" << proxyOpId << ", rank:" << rank
<< ", remoteRank:" << remoteRank << ", step:" << step;
return ss.str();
}

bool operator==(const NetSendFailureCond& c) const {
if (c.commHash == this->commHash && c.opCount == this->opCount &&
c.proxyOpId == this->proxyOpId && c.rank == this->rank &&
c.remoteRank == this->remoteRank) {
return true;
} else {
return false;
}
}
};

template <>
struct std::hash<NetSendFailureCond> {
size_t operator()(const NetSendFailureCond& x) const {
return x.commHash ^ x.opCount ^ x.proxyOpId ^ x.rank ^ x.remoteRank ^
x.step;
}
};

// Allow proxy thread to mock a send failure if the current send operation
// matches user specified config (see NCCL_PROXYMOCK_NET_SEND_FAILURE).
class ProxyMockNetSendFailure {
public:
ProxyMockNetSendFailure(ProxyMockNetSendFailure& other) = delete;
ProxyMockNetSendFailure& operator=(const ProxyMockNetSendFailure&) = delete;

// Return failure config serialized as json format string
std::string serialize(bool quoted = false);

// Return true if mocked, otherwise return false.
static bool mock(struct ncclProxySubArgs* sub, int step, void** request);

// Return singleton instance
static ProxyMockNetSendFailure& getInstance();

// Reset any existing state in the mock instance and reinitialize based on
// NCCL_PROXYMOCK_NET_SEND_FAILURE. For testing only where we want to change
// NCCL_PROXYMOCK_NET_SEND_FAILURE during a process lifetime. Note that direct
// call to initialize is not thread safe. Test should call it only after
// destroyed all existing communicators to ensure all proxy threads have been
// terminated.
void initialize();

private:
ProxyMockNetSendFailure();
~ProxyMockNetSendFailure(){};

bool mockImpl(struct ncclProxySubArgs* sub, int step, void** request);

bool enabled_{false};
uint64_t opCount_{0};
int rank_{0};
int remoteRank_{0};
int step_{0};
int numMatch_{0};
int delaySec_{0};
std::mutex mutex_; // protect initialize/reset and mock by multiple threads
std::unordered_map<
NetSendFailureCond,
std::chrono::time_point<std::chrono::high_resolution_clock>>
mockStartMap_;
int numMatched_{0};
};

#endif
Loading

0 comments on commit cb1c1f5

Please sign in to comment.