Skip to content

Commit

Permalink
[BE] Update ProcessGroupWrapper to add deserializer and improve logs
Browse files Browse the repository at this point in the history
  • Loading branch information
H-Huang authored and pytorchmergebot committed Jun 20, 2022
1 parent ccccd0e commit 26b5129
Showing 1 changed file with 79 additions and 22 deletions.
101 changes: 79 additions & 22 deletions torch/csrc/distributed/c10d/ProcessGroupWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,38 @@ namespace {
struct CollectiveFingerPrint {
// Current collective's operation type.
OpType op_type_;
// Ref to input tensors, if given, of the collective. If given, shapes will be
// checked across processes to ensure valid input into the collective.
const std::vector<at::Tensor>& input_tensors_;
// Number of input tensors
std::size_t num_tensors_;
// input tensor data types
std::vector<int8_t> tensor_dtypes_;
// input tensor device types
std::vector<int8_t> tensor_device_types_;
// input tensor sizes
std::vector<c10::IntArrayRef> tensor_sizes_;

explicit CollectiveFingerPrint(
OpType op_type,
const std::vector<at::Tensor>& input_tensors)
: op_type_(op_type), input_tensors_(input_tensors) {
tensor_dtypes_.reserve(input_tensors.size());
tensor_device_types_.reserve(input_tensors.size());
for (const at::Tensor& t : input_tensors_) {
: op_type_(op_type), num_tensors_(input_tensors.size()) {
tensor_dtypes_.reserve(num_tensors_);
tensor_device_types_.reserve(num_tensors_);
tensor_sizes_.reserve(num_tensors_);
for (const at::Tensor& t : input_tensors) {
tensor_dtypes_.push_back(static_cast<int8_t>(t.dtype().toScalarType()));
tensor_device_types_.push_back(static_cast<int8_t>(t.device().type()));
tensor_sizes_.push_back(t.sizes());
}
}

// Constructor for the data received from deserialized fingerprint
CollectiveFingerPrint(
OpType op_type,
std::vector<int8_t> tensor_dtypes,
std::vector<int8_t> tensor_device_types)
: op_type_(op_type),
tensor_dtypes_(tensor_dtypes),
tensor_device_types_(tensor_device_types) {}

// Logs collective information in case of a failure.
friend std::ostream& operator<<(
std::ostream& output,
Expand All @@ -62,14 +75,51 @@ struct CollectiveFingerPrint {
verify_tensors(inp, pg);
}

// Takes a serialized fingerprint from
// CollectiveFingerPrint::serialize_fingerprint and deserializes it back to a
// CollectiveFingerPrint struct
CollectiveFingerPrint deserialize_fingerprint(at::Tensor serialized_tensor) {
// TODO: Need to add asserts to validate serialized_tensor.sizes() before
// deserializing
int index = 0;
// 1. OpType
OpType optype = OpType(serialized_tensor[index].item<int>());
index++;

std::vector<int8_t> dtypes = std::vector<int8_t>();
std::vector<int8_t> device_types = std::vector<int8_t>();
if (index < serialized_tensor.size(0)) {
// 2. Num tensors
int num_tensors = serialized_tensor[index].item<int>();
index++;

// 3. Tensor dtypes
for (int i = 0; i < num_tensors; i++) {
dtypes.push_back(serialized_tensor[index].item<int8_t>());
index++;
}
// 4. Device types
for (int i = 0; i < num_tensors; i++) {
device_types.push_back(serialized_tensor[index].item<int8_t>());
index++;
}
}
return CollectiveFingerPrint(optype, dtypes, device_types);
}

private:
void verify_tensors(
std::vector<at::Tensor>& tensors_to_verify,
c10::intrusive_ptr<ProcessGroup>& pg) {
// Create output tensor data structure to pass into allgather.
std::vector<std::vector<at::Tensor>> output_tensors;
// output tensors: [<tensor 0 outputs>, <tensor 1 outputs>, ..., <tensor n
// outputs>]
output_tensors.reserve(tensors_to_verify.size());
for (const auto& tensor_shape : tensors_to_verify) {
// Each rank has its own outputs shape, e.g.
// <tensor 0 outputs>: [<rank 0 tensor>, <rank 1 tensor>, ..., <rank n
// tensor>]
std::vector<at::Tensor> outputs;
outputs.reserve(pg->getSize());
for (const auto i : c10::irange(pg->getSize())) {
Expand All @@ -84,38 +134,45 @@ struct CollectiveFingerPrint {
for (const auto i : c10::irange(output_tensors.size())) {
const std::vector<at::Tensor> gathered_tensors = output_tensors[i];
const at::Tensor reference_tensor = tensors_to_verify[i];
for (const auto& rank_tensor : gathered_tensors) {
for (int rank = 0; rank < gathered_tensors.size(); rank++) {
const auto& rank_tensor = gathered_tensors[rank];
if (!rank_tensor.equal(reference_tensor)) {
CollectiveFingerPrint rank_fingerprint =
deserialize_fingerprint(rank_tensor);
std::stringstream ss;
ss << "Detected mismatch between collectives on ranks. Rank "
<< pg->getRank()
<< " is running inconsistent collective: " << *this;
<< pg->getRank() << " is running collective: " << *this
<< ", but Rank " << rank << " is running collective: "
<< opTypeToString(rank_fingerprint.op_type_) << ".";
TORCH_CHECK(false, ss.str());
}
}
}
}

// Serializes the information (op type, input shapes, data types, device
// types) about the collective fingerprint into a tensor
at::Tensor serialize_fingerprint() {
auto data = std::make_unique<std::vector<int64_t>>();
// std::vector<int64_t> data;
// OpType
// 1. OpType
data->push_back(static_cast<int64_t>(op_type_));
// Shapes
for (const auto& tensor : input_tensors_) {
auto sizes = tensor.sizes().vec();
for (const auto& s : sizes) {
data->push_back(s);
}
}
// tensor dtypes
// 2. Num tensors
data->push_back(static_cast<int64_t>(num_tensors_));
// 3. Tensor dtypes
for (const auto& type : tensor_dtypes_) {
data->push_back(type);
}
// device types
// 4. Device types
for (const auto& d : tensor_device_types_) {
data->push_back(d);
}
// 5. Shapes
for (const auto& sizes : tensor_sizes_) {
for (const auto& s : sizes) {
data->push_back(s);
}
}
// Serialize data into tensor
int64_t data_size = data->size();
// Need to release here and get the ptr due to C++ parameter evaluation
Expand All @@ -138,7 +195,7 @@ std::ostream& operator<<(
std::ostream& output,
const CollectiveFingerPrint& collective_fingerprint) {
std::string collectiveInfo;
if (!collective_fingerprint.input_tensors_.empty()) {
if (collective_fingerprint.num_tensors_ != 0) {
// Convert dtype and device type info to string.
std::vector<std::string> dtype_strs;
std::vector<std::string> device_type_strs;
Expand All @@ -157,7 +214,7 @@ std::ostream& operator<<(
"OpType=",
opTypeToString(collective_fingerprint.op_type_),
", TensorShape=",
(collective_fingerprint.input_tensors_)[0].sizes(),
(collective_fingerprint.tensor_sizes_)[0],
", TensorDtypes=",
(dtype_strs),
", TensorDeviceTypes=",
Expand Down

0 comments on commit 26b5129

Please sign in to comment.