Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[c10d] switch ProcessGroup::Work to be managed by intrusive_ptr #44046

Closed
wants to merge 40 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
229d170
[c10d] switch ProcessGroup::Work to be managed by intrusive_ptr
wanchaol Sep 2, 2020
9f6506e
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Sep 10, 2020
24cc652
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Sep 10, 2020
0b77de1
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Sep 11, 2020
0fabd3f
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Sep 24, 2020
50df2c1
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 2, 2020
b3b5479
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 5, 2020
e5fdc7f
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 5, 2020
1b75374
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 8, 2020
4045f9d
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 8, 2020
7a03e67
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 8, 2020
5309485
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 8, 2020
639c487
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 8, 2020
ecd1cc6
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 8, 2020
a72c82e
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 9, 2020
a392a08
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 16, 2020
7a6b5d9
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 17, 2020
ccbcbcf
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 19, 2020
afab677
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 19, 2020
afa3532
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 20, 2020
b2453ed
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 20, 2020
ead03ba
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 20, 2020
3975a34
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 21, 2020
f58c771
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 21, 2020
6e6f6f0
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 21, 2020
c9f85df
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 22, 2020
2029658
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 23, 2020
716dd6f
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 23, 2020
e7c5d38
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 23, 2020
911807b
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 26, 2020
9c3579a
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 27, 2020
c3070d3
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 29, 2020
ec58283
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Oct 31, 2020
534296e
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Nov 2, 2020
db53f39
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Nov 2, 2020
b20a3de
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Nov 4, 2020
b15a6d0
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Nov 4, 2020
1cb14d9
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Nov 10, 2020
e462a46
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Nov 10, 2020
056232f
Update on "[c10d] switch ProcessGroup::Work to be managed by intrusiv…
wanchaol Nov 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion torch/csrc/distributed/c10d/comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <deque>

#include <ATen/core/functional.h>
#include <c10/util/intrusive_ptr.h>
#include <torch/csrc/distributed/c10d/reducer.h>
#include <torch/csrc/jit/python/pybind_utils.h>
#include <torch/csrc/utils/tensor_flatten.h>
Expand Down Expand Up @@ -46,7 +47,7 @@ class BroadcastWork {
std::vector<at::Tensor> flat_tensor_;

// The broadcast work that is kicked off upon construction.
std::shared_ptr<c10d::ProcessGroup::Work> work_;
c10::intrusive_ptr<c10d::ProcessGroup::Work> work_;
};

} // namespace
Expand Down
6 changes: 5 additions & 1 deletion torch/csrc/distributed/c10d/init.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <torch/csrc/python_headers.h>

#include <c10/util/intrusive_ptr.h>
#include <c10d/FileStore.hpp>
#ifndef _WIN32
#include <c10d/HashStore.hpp>
Expand Down Expand Up @@ -53,6 +54,9 @@ std::vector<std::string> split(char separator, const std::string& string) {
template <typename T>
using shared_ptr_class_ = py::class_<T, std::shared_ptr<T>>;

template <typename T>
using intrusive_ptr_class_ = py::class_<T, c10::intrusive_ptr<T>>;

// PythonStore is a pybind11 trampoline class to allow a Python
// class to inherit from c10d.Store and implement its interface.
class PythonStore : public ::c10d::Store {
Expand Down Expand Up @@ -966,7 +970,7 @@ that adds a prefix to each key inserted to the store.
py::call_guard<py::gil_scoped_release>());
#endif

shared_ptr_class_<::c10d::ProcessGroup::Work>(module, "Work")
intrusive_ptr_class_<::c10d::ProcessGroup::Work>(module, "Work")
.def("is_completed", &::c10d::ProcessGroup::Work::isCompleted)
.def("is_success", &::c10d::ProcessGroup::Work::isSuccess)
.def("exception", &::c10d::ProcessGroup::Work::exception)
Expand Down
2 changes: 1 addition & 1 deletion torch/csrc/distributed/c10d/reducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ std::vector<std::vector<at::Tensor>> Reducer::get_bucket_tensors() const {
}

void Reducer::set_forward_pass_work_handle(
std::shared_ptr<c10d::ProcessGroup::Work> forwardPassWorkHandle,
c10::intrusive_ptr<c10d::ProcessGroup::Work> forwardPassWorkHandle,
bool useStaticWorldSize) {
std::lock_guard<std::mutex> lock(mutex_);
forwardPassWorkHandle_.workHandle = std::move(forwardPassWorkHandle);
Expand Down
9 changes: 5 additions & 4 deletions torch/csrc/distributed/c10d/reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <unordered_map>
#include <vector>

#include <c10/util/intrusive_ptr.h>
#include <c10d/ProcessGroup.hpp>
#include <torch/csrc/autograd/function.h>
#include <torch/csrc/autograd/variable.h>
Expand Down Expand Up @@ -89,7 +90,7 @@ class Reducer {
// Creates and sets ForwardPassWorkHandle given a ProcessGroup::Work and the
// corresponding tensor being reduced.
void set_forward_pass_work_handle(
std::shared_ptr<c10d::ProcessGroup::Work> forwardPassWorkHandle,
c10::intrusive_ptr<c10d::ProcessGroup::Work> forwardPassWorkHandle,
bool useStaticWorldSize);

// Retrieve on-device tensors used to track locally unused parameters. For
Expand Down Expand Up @@ -150,7 +151,7 @@ class Reducer {
bool local_used_maps_reduced_;

// Work handle for allreduce on local_used_maps_
std::shared_ptr<c10d::ProcessGroup::Work> local_used_work_;
c10::intrusive_ptr<c10d::ProcessGroup::Work> local_used_work_;

void verify_replicas_within_process();

Expand Down Expand Up @@ -267,7 +268,7 @@ class Reducer {
size_t pending;

// Keep work handle around when this set of buckets is being reduced.
std::shared_ptr<c10d::ProcessGroup::Work> work;
c10::intrusive_ptr<c10d::ProcessGroup::Work> work;

// Keep future work handle around if DDP comm hook is registered.
c10::intrusive_ptr<torch::jit::Future> future_work;
Expand Down Expand Up @@ -325,7 +326,7 @@ class Reducer {
// A struct containing work handle and tensor for allreduce scheduled in
// forward pass, if applicable.
struct ForwardPassAllreduceWork {
std::shared_ptr<c10d::ProcessGroup::Work> workHandle;
c10::intrusive_ptr<c10d::ProcessGroup::Work> workHandle;
at::Tensor resultTensor;
// whether we should divide by the initial world_size or the no. of
// remaining DDP ranks.
Expand Down
2 changes: 1 addition & 1 deletion torch/csrc/distributed/rpc/process_group_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ void ProcessGroupAgent::handleSend(const SendWork& work) {

// ProcessGroup is not thread-safe when sending with the same tag,
// hence the lock
std::vector<std::shared_ptr<c10d::ProcessGroup::Work>> pendingSends;
std::vector<c10::intrusive_ptr<c10d::ProcessGroup::Work>> pendingSends;
const auto dst = work.to_.id_;

// NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
Expand Down
4 changes: 2 additions & 2 deletions torch/csrc/distributed/rpc/process_group_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ class TORCH_API ProcessGroupAgent : public RpcAgent {
// Lock and shared ptr to currently pending work, set in listenloop() and
// interruptible in shutdown().
std::mutex recvWorkMutex_;
std::shared_ptr<c10d::ProcessGroup::Work> recvWork_;
c10::intrusive_ptr<c10d::ProcessGroup::Work> recvWork_;
// Map of dst rank to current oustanding sends that we are waiting on. In the
// case of a call to ::shutdown() while we are still waiting on these sends,
// the pending sends contained in this map will be aborted, allowing the
// waiting thread to be unblocked.
std::unordered_map<
worker_id_t,
std::set<std::shared_ptr<c10d::ProcessGroup::Work>>>
std::set<c10::intrusive_ptr<c10d::ProcessGroup::Work>>>
currentPendingSends_;
// Lock to serialize access to the above map.
std::mutex pendingSendMutex_;
Expand Down
2 changes: 1 addition & 1 deletion torch/lib/c10d/ProcessGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ProcessGroup::~ProcessGroup() {}

// This is introduced so that implementors of ProcessGroup would not need to
// have this implmentation.
std::shared_ptr<ProcessGroup::Work> ProcessGroup::allgather_coalesced(
c10::intrusive_ptr<ProcessGroup::Work> ProcessGroup::allgather_coalesced(
std::vector<std::vector<at::Tensor>>& /* usused */,
std::vector<at::Tensor>& /* usused */,
const AllgatherOptions& /* usused */) {
Expand Down
34 changes: 17 additions & 17 deletions torch/lib/c10d/ProcessGroup.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace c10d {
//
class ProcessGroup {
public:
class Work {
class Work : public torch::CustomClassHolder {
public:
virtual ~Work();

Expand Down Expand Up @@ -119,33 +119,33 @@ class ProcessGroup {
return size_;
}

virtual std::shared_ptr<ProcessGroup::Work> broadcast(
virtual c10::intrusive_ptr<ProcessGroup::Work> broadcast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is probably going to break other third party backends like: https://github.com/intel/torch-ccl/blob/master/src/ProcessGroupCCL.hpp#L136 and https://github.com/openucx/torch-ucc/blob/master/include/torch_ucc.hpp#L77.

I'm guessing this is necessary for TorchScript and there is no way around it, so should we ask the third-party libraries to make this change as well? (we can probably file issues on those repos).

cc @agolynski Since this affects the c10d extension.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed that we should ask them to make the changes. Do you have a list of third party backends or are these two the only two that's currently using c10d extension?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ones I am aware of are Intel and UCX. See:

  1. https://github.com/openucx/torch-ucc
  2. add c10d dynamic loading mechanism and unit test #28068

Could you please check with @agolynski, he might know more context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @chengjunlu @mshiryaev @Sergei-Lebedev @srinivas212

This PR will break master -> master dependency in torch-ccl and torch-ucc.

In near future we'll be changing ProcessGroup API which will break these repos as well. Would it be okay if you depend on 1.6 (upgrade to 1.7 when released) and not on master meanwhile?

@Sergei-Lebedev @srinivas212: How does torch-ucc depend on pytorch, do you require users to install from pytorch from master branch?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @agolynski @mrshenli @pritamdamania87 - after discussions with both Torch-UCC and Torch-CCL teams, the near term plan is to fix the issue in the third-party repo once this change lands.

In general, it is best we keep third-party plugins in sync w/ master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to land the stack soon, created openucx/torch-ucc#23 and intel/torch-ccl#11 to ucc and ccl to do the API migration.

std::vector<at::Tensor>& data,
const BroadcastOptions& opts = BroadcastOptions()) = 0;

virtual std::shared_ptr<ProcessGroup::Work> allreduce(
virtual c10::intrusive_ptr<ProcessGroup::Work> allreduce(
std::vector<at::Tensor>& data,
const AllreduceOptions& opts = AllreduceOptions()) = 0;

// This will be moved out of ProcessGroup, do not add dependencies on this
// function.
virtual std::shared_ptr<ProcessGroup::Work> allreduce_coalesced(
virtual c10::intrusive_ptr<ProcessGroup::Work> allreduce_coalesced(
std::vector<at::Tensor>& tensors,
const AllreduceCoalescedOptions& opts = AllreduceCoalescedOptions()) = 0;

virtual std::shared_ptr<ProcessGroup::Work> reduce(
virtual c10::intrusive_ptr<ProcessGroup::Work> reduce(
std::vector<at::Tensor>& tensors,
const ReduceOptions& opts = ReduceOptions()) = 0;

virtual std::shared_ptr<ProcessGroup::Work> allgather(
virtual c10::intrusive_ptr<ProcessGroup::Work> allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) = 0;

// Gathers a single tensor inputBuffer into a single buffer outputBuffer that
// is interpreted as a contigious collection of size inputBuffer * WORLD_SIZE.
// For implementers of ProcessGroup API and advanced users only.
virtual std::shared_ptr<ProcessGroup::Work> allgather_base(
virtual c10::intrusive_ptr<ProcessGroup::Work> allgather_base(
at::Tensor& outputBuffer,
at::Tensor& inputBuffer,
const AllgatherOptions& opts = AllgatherOptions()) = 0;
Expand All @@ -154,27 +154,27 @@ class ProcessGroup {
// * do not add dependencies on this function,
// * do not implement it in your ProcessGroup, implement allgather_base
// instead.
virtual std::shared_ptr<ProcessGroup::Work> allgather_coalesced(
virtual c10::intrusive_ptr<ProcessGroup::Work> allgather_coalesced(
std::vector<std::vector<at::Tensor>>& outputTensorLists,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions());

virtual std::shared_ptr<ProcessGroup::Work> gather(
virtual c10::intrusive_ptr<ProcessGroup::Work> gather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const GatherOptions& opts = GatherOptions()) = 0;

virtual std::shared_ptr<ProcessGroup::Work> scatter(
virtual c10::intrusive_ptr<ProcessGroup::Work> scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ScatterOptions& opts = ScatterOptions()) = 0;

virtual std::shared_ptr<ProcessGroup::Work> reduce_scatter(
virtual c10::intrusive_ptr<ProcessGroup::Work> reduce_scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ReduceScatterOptions& opts = ReduceScatterOptions()) = 0;

virtual std::shared_ptr<ProcessGroup::Work> alltoall_base(
virtual c10::intrusive_ptr<ProcessGroup::Work> alltoall_base(
at::Tensor& outputTensor,
at::Tensor& inputTensor,
std::vector<int64_t>& outputSplitSizes,
Expand All @@ -183,28 +183,28 @@ class ProcessGroup {
throw std::runtime_error("ProcessGroup does not support alltoall");
}

virtual std::shared_ptr<ProcessGroup::Work> alltoall(
virtual c10::intrusive_ptr<ProcessGroup::Work> alltoall(
std::vector<at::Tensor>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllToAllOptions& opts = AllToAllOptions()) {
throw std::runtime_error("ProcessGroup does not support alltoall");
}

virtual std::shared_ptr<ProcessGroup::Work> send(
virtual c10::intrusive_ptr<ProcessGroup::Work> send(
std::vector<at::Tensor>& tensors,
int dstRank,
int tag) = 0;

virtual std::shared_ptr<ProcessGroup::Work> recv(
virtual c10::intrusive_ptr<ProcessGroup::Work> recv(
std::vector<at::Tensor>& tensors,
int srcRank,
int tag) = 0;

virtual std::shared_ptr<ProcessGroup::Work> recvAnysource(
virtual c10::intrusive_ptr<ProcessGroup::Work> recvAnysource(
std::vector<at::Tensor>& tensors,
int tag) = 0;

virtual std::shared_ptr<ProcessGroup::Work> barrier(
virtual c10::intrusive_ptr<ProcessGroup::Work> barrier(
const BarrierOptions& opts = BarrierOptions()) = 0;

protected:
Expand Down
Loading