From caf225fa60bbaee4973a1ffcd8d66fb6c37710a8 Mon Sep 17 00:00:00 2001 From: lixinqi Date: Thu, 9 Dec 2021 16:55:16 +0800 Subject: [PATCH 1/2] remove StreamDesc::num_machines --- oneflow/core/eager/critical_section_stream_type.cpp | 1 - oneflow/core/eager/lazy_job_stream_type.cpp | 1 - oneflow/core/vm/async_cuda_stream_type.cpp | 1 - oneflow/core/vm/control_stream_type.cpp | 1 - oneflow/core/vm/cpu_stream_type.cpp | 1 - oneflow/core/vm/cuda_copy_d2h_stream_type.cpp | 1 - oneflow/core/vm/cuda_copy_h2d_stream_type.cpp | 1 - oneflow/core/vm/cuda_stream_type.cpp | 1 - oneflow/core/vm/device_helper_stream_type.cpp | 1 - oneflow/core/vm/host_stream_type.cpp | 1 - oneflow/core/vm/stream_desc.cpp | 7 +++---- oneflow/core/vm/stream_desc.h | 10 +++------- oneflow/core/vm/test_util.cpp | 2 +- oneflow/core/vm/transport_stream_type.cpp | 1 - 14 files changed, 7 insertions(+), 23 deletions(-) diff --git a/oneflow/core/eager/critical_section_stream_type.cpp b/oneflow/core/eager/critical_section_stream_type.cpp index 42101671331..e9e6884e9c1 100644 --- a/oneflow/core/eager/critical_section_stream_type.cpp +++ b/oneflow/core/eager/critical_section_stream_type.cpp @@ -58,7 +58,6 @@ intrusive::shared_ptr CriticalSectionStreamType::MakeStreamDesc( const Resource& resource, int64_t this_machine_id) const { auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(1); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/eager/lazy_job_stream_type.cpp b/oneflow/core/eager/lazy_job_stream_type.cpp index da2d3bf7f45..2b03e5286a6 100644 --- a/oneflow/core/eager/lazy_job_stream_type.cpp +++ b/oneflow/core/eager/lazy_job_stream_type.cpp @@ -59,7 +59,6 @@ intrusive::shared_ptr LazyJobStreamType::MakeStreamDesc(const Resour int64_t this_machine_id) const { auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(1); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/async_cuda_stream_type.cpp b/oneflow/core/vm/async_cuda_stream_type.cpp index 0361e603ddb..e95aee59469 100644 --- a/oneflow/core/vm/async_cuda_stream_type.cpp +++ b/oneflow/core/vm/async_cuda_stream_type.cpp @@ -74,7 +74,6 @@ intrusive::shared_ptr AsyncCudaStreamType::MakeStreamDesc( std::size_t device_num = resource.gpu_device_num(); auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(device_num); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/control_stream_type.cpp b/oneflow/core/vm/control_stream_type.cpp index d89971dcd15..d040f1fda23 100644 --- a/oneflow/core/vm/control_stream_type.cpp +++ b/oneflow/core/vm/control_stream_type.cpp @@ -120,7 +120,6 @@ intrusive::shared_ptr ControlStreamType::MakeStreamDesc(const Resour int64_t this_machine_id) const { auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(1); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/cpu_stream_type.cpp b/oneflow/core/vm/cpu_stream_type.cpp index 3ad184088ea..48bf3c93743 100644 --- a/oneflow/core/vm/cpu_stream_type.cpp +++ b/oneflow/core/vm/cpu_stream_type.cpp @@ -67,7 +67,6 @@ intrusive::shared_ptr CpuStreamType::MakeStreamDesc(const Resource& std::size_t device_num = resource.cpu_device_num(); auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(device_num); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp b/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp index c7a14b9df6c..f4a0de91f28 100644 --- a/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp +++ b/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp @@ -72,7 +72,6 @@ intrusive::shared_ptr CudaCopyD2HStreamType::MakeStreamDesc( std::size_t device_num = resource.gpu_device_num(); auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(device_num); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp b/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp index 8cfd355a4b7..4f4ba2f73c1 100644 --- a/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp +++ b/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp @@ -65,7 +65,6 @@ intrusive::shared_ptr CudaCopyH2DStreamType::MakeStreamDesc( std::size_t device_num = resource.gpu_device_num(); auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(device_num); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/cuda_stream_type.cpp b/oneflow/core/vm/cuda_stream_type.cpp index ad53e87f943..d183b308a9e 100644 --- a/oneflow/core/vm/cuda_stream_type.cpp +++ b/oneflow/core/vm/cuda_stream_type.cpp @@ -74,7 +74,6 @@ intrusive::shared_ptr CudaStreamType::MakeStreamDesc(const Resource& std::size_t device_num = resource.gpu_device_num(); auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(device_num); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/device_helper_stream_type.cpp b/oneflow/core/vm/device_helper_stream_type.cpp index 0c603a0a226..27515ec7d20 100644 --- a/oneflow/core/vm/device_helper_stream_type.cpp +++ b/oneflow/core/vm/device_helper_stream_type.cpp @@ -65,7 +65,6 @@ intrusive::shared_ptr DeviceHelperStreamType::MakeStreamDesc( CHECK_GT(device_num, 0); auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(device_num); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/host_stream_type.cpp b/oneflow/core/vm/host_stream_type.cpp index b5c024f843b..596d6e734ff 100644 --- a/oneflow/core/vm/host_stream_type.cpp +++ b/oneflow/core/vm/host_stream_type.cpp @@ -57,7 +57,6 @@ intrusive::shared_ptr HostStreamType::MakeStreamDesc(const Resource& int64_t this_machine_id) const { auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); - ret->set_num_machines(1); ret->set_num_streams_per_machine(1); ret->set_num_streams_per_thread(1); return ret; diff --git a/oneflow/core/vm/stream_desc.cpp b/oneflow/core/vm/stream_desc.cpp index fc2d7e0960d..5e8ae478ce0 100644 --- a/oneflow/core/vm/stream_desc.cpp +++ b/oneflow/core/vm/stream_desc.cpp @@ -18,16 +18,15 @@ limitations under the License. namespace oneflow { namespace vm { -void StreamDesc::__Init__(const StreamTypeId& stream_type_id, int32_t num_machines, - int32_t num_streams_per_machine, int32_t num_streams_per_thread) { +void StreamDesc::__Init__(const StreamTypeId& stream_type_id, int32_t num_streams_per_machine, + int32_t num_streams_per_thread) { mut_stream_type_id()->CopyFrom(stream_type_id); - set_num_machines(num_machines); set_num_streams_per_machine(num_streams_per_machine); set_num_streams_per_thread(num_streams_per_thread); } int32_t StreamDesc::num_threads() const { - int32_t num_devices = num_machines() * num_streams_per_machine(); + int32_t num_devices = num_streams_per_machine(); CHECK_EQ(num_devices % num_streams_per_thread(), 0); return num_devices / num_streams_per_thread(); } diff --git a/oneflow/core/vm/stream_desc.h b/oneflow/core/vm/stream_desc.h index 948e2df27ff..e254bba576a 100644 --- a/oneflow/core/vm/stream_desc.h +++ b/oneflow/core/vm/stream_desc.h @@ -59,22 +59,20 @@ class StreamId final { class StreamDesc final : public intrusive::Base { public: // Getters - int32_t num_machines() const { return num_machines_; } int32_t num_streams_per_machine() const { return num_streams_per_machine_; } int32_t num_streams_per_thread() const { return num_streams_per_thread_; } const StreamTypeId& stream_type_id() const { return stream_type_id_.key().Get(); } // Setters - void set_num_machines(int32_t val) { num_machines_ = val; } void set_num_streams_per_machine(int32_t val) { num_streams_per_machine_ = val; } void set_num_streams_per_thread(int32_t val) { num_streams_per_thread_ = val; } StreamTypeId* mut_stream_type_id() { return stream_type_id_.mut_key()->Mutable(); } // methods void __Init__() {} - void __Init__(const StreamTypeId& stream_type_id, int32_t num_machines, - int32_t num_streams_per_machine, int32_t num_streams_per_thread); + void __Init__(const StreamTypeId& stream_type_id, int32_t num_streams_per_machine, + int32_t num_streams_per_thread); int32_t num_threads() const; - int32_t parallel_num() const { return num_machines() * num_streams_per_machine(); } + int32_t parallel_num() const { return num_streams_per_machine(); } private: friend class intrusive::Ref; @@ -82,13 +80,11 @@ class StreamDesc final : public intrusive::Base { StreamDesc() : intrusive_ref_(), - num_machines_(), num_streams_per_machine_(), num_streams_per_thread_(), stream_type_id_() {} intrusive::Ref intrusive_ref_; // fields - int32_t num_machines_; int32_t num_streams_per_machine_; int32_t num_streams_per_thread_; diff --git a/oneflow/core/vm/test_util.cpp b/oneflow/core/vm/test_util.cpp index eb115990bcc..1e399f39d1c 100644 --- a/oneflow/core/vm/test_util.cpp +++ b/oneflow/core/vm/test_util.cpp @@ -114,7 +114,7 @@ void TestUtil::AddStreamDescByInstrNames(VmDesc* vm_desc, int64_t parallel_num, const std::vector& instr_names) { auto Insert = [&](const std::string& instr_name) { const auto& stream_type_id = LookupInstrTypeId(instr_name).stream_type_id(); - auto stream_desc = intrusive::make_shared(stream_type_id, 1, parallel_num, 1); + auto stream_desc = intrusive::make_shared(stream_type_id, parallel_num, 1); vm_desc->mut_stream_type_id2desc()->Insert(stream_desc.Mutable()); }; for (const auto& instr_name : instr_names) { diff --git a/oneflow/core/vm/transport_stream_type.cpp b/oneflow/core/vm/transport_stream_type.cpp index cd64e9832f7..dd0d39f0e84 100644 --- a/oneflow/core/vm/transport_stream_type.cpp +++ b/oneflow/core/vm/transport_stream_type.cpp @@ -60,7 +60,6 @@ intrusive::shared_ptr TransportStreamType::MakeTransportStreamDesc( auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); // TODO(lixinqi): remove this ugly field - ret->set_num_machines(1); ret->set_num_streams_per_machine(device_num); // TODO(lixinqi): refactor to a num_threads_per_machine field ret->set_num_streams_per_thread(1); From 41b450772265b271e5a70401925a4cb490c9e7e0 Mon Sep 17 00:00:00 2001 From: lixinqi Date: Thu, 9 Dec 2021 17:13:40 +0800 Subject: [PATCH 2/2] Prepare one thread for one stream_type --- oneflow/core/vm/async_cuda_stream_type.cpp | 2 +- oneflow/core/vm/cpu_stream_type.cpp | 2 +- oneflow/core/vm/cuda_copy_d2h_stream_type.cpp | 2 +- oneflow/core/vm/cuda_copy_h2d_stream_type.cpp | 2 +- oneflow/core/vm/cuda_stream_type.cpp | 2 +- oneflow/core/vm/device_helper_stream_type.cpp | 2 +- oneflow/core/vm/stream_desc.cpp | 1 + oneflow/core/vm/transport_stream_type.cpp | 15 ++++++++++----- 8 files changed, 17 insertions(+), 11 deletions(-) diff --git a/oneflow/core/vm/async_cuda_stream_type.cpp b/oneflow/core/vm/async_cuda_stream_type.cpp index e95aee59469..c0d8519f882 100644 --- a/oneflow/core/vm/async_cuda_stream_type.cpp +++ b/oneflow/core/vm/async_cuda_stream_type.cpp @@ -75,7 +75,7 @@ intrusive::shared_ptr AsyncCudaStreamType::MakeStreamDesc( auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); ret->set_num_streams_per_machine(device_num); - ret->set_num_streams_per_thread(1); + ret->set_num_streams_per_thread(device_num); return ret; } diff --git a/oneflow/core/vm/cpu_stream_type.cpp b/oneflow/core/vm/cpu_stream_type.cpp index 48bf3c93743..7629d678947 100644 --- a/oneflow/core/vm/cpu_stream_type.cpp +++ b/oneflow/core/vm/cpu_stream_type.cpp @@ -68,7 +68,7 @@ intrusive::shared_ptr CpuStreamType::MakeStreamDesc(const Resource& auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); ret->set_num_streams_per_machine(device_num); - ret->set_num_streams_per_thread(1); + ret->set_num_streams_per_thread(device_num); return ret; } diff --git a/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp b/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp index f4a0de91f28..472f2a74995 100644 --- a/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp +++ b/oneflow/core/vm/cuda_copy_d2h_stream_type.cpp @@ -73,7 +73,7 @@ intrusive::shared_ptr CudaCopyD2HStreamType::MakeStreamDesc( auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); ret->set_num_streams_per_machine(device_num); - ret->set_num_streams_per_thread(1); + ret->set_num_streams_per_thread(device_num); return ret; } diff --git a/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp b/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp index 4f4ba2f73c1..7332eed2345 100644 --- a/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp +++ b/oneflow/core/vm/cuda_copy_h2d_stream_type.cpp @@ -66,7 +66,7 @@ intrusive::shared_ptr CudaCopyH2DStreamType::MakeStreamDesc( auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); ret->set_num_streams_per_machine(device_num); - ret->set_num_streams_per_thread(1); + ret->set_num_streams_per_thread(device_num); return ret; } diff --git a/oneflow/core/vm/cuda_stream_type.cpp b/oneflow/core/vm/cuda_stream_type.cpp index d183b308a9e..cef73d7f36f 100644 --- a/oneflow/core/vm/cuda_stream_type.cpp +++ b/oneflow/core/vm/cuda_stream_type.cpp @@ -75,7 +75,7 @@ intrusive::shared_ptr CudaStreamType::MakeStreamDesc(const Resource& auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); ret->set_num_streams_per_machine(device_num); - ret->set_num_streams_per_thread(1); + ret->set_num_streams_per_thread(device_num); return ret; } diff --git a/oneflow/core/vm/device_helper_stream_type.cpp b/oneflow/core/vm/device_helper_stream_type.cpp index 27515ec7d20..3a30518a4fc 100644 --- a/oneflow/core/vm/device_helper_stream_type.cpp +++ b/oneflow/core/vm/device_helper_stream_type.cpp @@ -66,7 +66,7 @@ intrusive::shared_ptr DeviceHelperStreamType::MakeStreamDesc( auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex()); ret->set_num_streams_per_machine(device_num); - ret->set_num_streams_per_thread(1); + ret->set_num_streams_per_thread(device_num); return ret; } diff --git a/oneflow/core/vm/stream_desc.cpp b/oneflow/core/vm/stream_desc.cpp index 5e8ae478ce0..6a39ba9b42a 100644 --- a/oneflow/core/vm/stream_desc.cpp +++ b/oneflow/core/vm/stream_desc.cpp @@ -27,6 +27,7 @@ void StreamDesc::__Init__(const StreamTypeId& stream_type_id, int32_t num_stream int32_t StreamDesc::num_threads() const { int32_t num_devices = num_streams_per_machine(); + if (num_devices == 0) { return 0; } CHECK_EQ(num_devices % num_streams_per_thread(), 0); return num_devices / num_streams_per_thread(); } diff --git a/oneflow/core/vm/transport_stream_type.cpp b/oneflow/core/vm/transport_stream_type.cpp index dd0d39f0e84..33d806eab8f 100644 --- a/oneflow/core/vm/transport_stream_type.cpp +++ b/oneflow/core/vm/transport_stream_type.cpp @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "oneflow/core/vm/transport_stream_type.h" +#include "oneflow/core/common/multi_client.h" namespace oneflow { namespace vm { @@ -51,11 +52,15 @@ template intrusive::shared_ptr TransportStreamType::MakeTransportStreamDesc( const Resource& resource, int64_t this_machine_id) const { std::size_t device_num = 0; - if (resource.has_cpu_device_num()) { - device_num = std::max(device_num, resource.cpu_device_num()); - } - if (resource.has_gpu_device_num()) { - device_num = std::max(device_num, resource.gpu_device_num()); + if (!CHECK_JUST(IsMultiClient())) { + if (resource.has_cpu_device_num()) { + device_num = std::max(device_num, resource.cpu_device_num()); + } + if (resource.has_gpu_device_num()) { + device_num = std::max(device_num, resource.gpu_device_num()); + } + } else { + // Keep device_num = 0. TransportStreamType is not used in multi-client mode. } auto ret = intrusive::make_shared(); ret->mut_stream_type_id()->__Init__(LookupStreamType4TypeIndex());