From b6f6c249840c5c07d4b0ba24d32cccc26c3b22f7 Mon Sep 17 00:00:00 2001 From: danleifeng Date: Wed, 28 Apr 2021 04:10:27 +0000 Subject: [PATCH 1/6] support cuda11 for heterps;add profiler in oneps; test=develop --- paddle/fluid/framework/device_worker.h | 3 +++ .../framework/fleet/heter_ps/CMakeLists.txt | 6 ++++- paddle/fluid/framework/hogwild_worker.cc | 24 +++++++++++++++++++ .../distributed/fleet/dataset/dataset.py | 4 ++-- python/paddle/fluid/dataset.py | 4 ++-- 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index a49e492e48028..d33809a0a2b7c 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -266,6 +266,9 @@ class HogwildWorker : public CPUWorkerBase { HogwildWorkerParameter param_; std::vector skip_ops_; std::map stat_var_name_map_; +#ifdef PADDLE_WITH_HETERPS + platform::DeviceContext* dev_ctx_ = nullptr; +#endif }; class DownpourWorker : public HogwildWorker { diff --git a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt index 6df2cd52bb401..ebb7775129e71 100644 --- a/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/heter_ps/CMakeLists.txt @@ -1,5 +1,9 @@ IF(WITH_GPU) - nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context) + if (${CMAKE_CUDA_COMPILER_VERSION} LESS 11.0) + nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS cub device_context) + else() + nv_library(heter_comm SRCS heter_comm.h feature_value.h heter_resource.cc heter_resource.h hashtable.h DEPS device_context) + endif() nv_test(test_heter_comm SRCS test_heter_comm.cu feature_value.h DEPS heter_comm) nv_library(heter_ps SRCS heter_ps.cu DEPS heter_comm) ENDIF() diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index 89dc5c7d3ea93..8a9979613ea9c 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -39,6 +39,9 @@ void HogwildWorker::Initialize(const TrainerDesc &desc) { for (int i = 0; i < param_.stat_var_names_size(); ++i) { stat_var_name_map_[param_.stat_var_names(i)] = 1; } +#ifdef PADDLE_WITH_HETERPS + dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_); +#endif } void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) { @@ -150,6 +153,9 @@ void HogwildWorker::TrainFilesWithProfiler() { VLOG(3) << "Going to run op " << op_name[i]; if (!need_skip) { ops_[i]->Run(*thread_scope_, place_); +#ifdef PADDLE_WITH_HETERPS + dev_ctx_->Wait(); +#endif } VLOG(3) << "Op " << op_name[i] << " Finished"; timeline.Pause(); @@ -167,6 +173,16 @@ void HogwildWorker::TrainFilesWithProfiler() { total_inst += cur_batch; ++batch_cnt; PrintFetchVars(); +#ifdef PADDLE_WITH_HETERPS + dev_ctx_->Wait(); + VLOG(1) << "GpuPs worker " << thread_id_ << " train cost " << total_time + << " seconds, ins_num: " << total_inst; + for (size_t i = 0; i < op_name.size(); ++i) { + VLOG(1) << "card:" << thread_id_ << ", op: " << op_name[i] + << ", mean time: " << op_total_time[i] / total_inst + << "s, totol time:" << op_total_time[i] << "sec"; + } +#else if (thread_id_ == 0) { if (batch_cnt > 0 && batch_cnt % 100 == 0) { for (size_t i = 0; i < ops_.size(); ++i) { @@ -178,6 +194,7 @@ void HogwildWorker::TrainFilesWithProfiler() { fprintf(stderr, "%6.2f instances/s\n", total_inst / total_time); } } +#endif thread_scope_->DropKids(); timeline.Start(); } @@ -195,7 +212,10 @@ void HogwildWorker::TrainFilesWithProfiler() { void HogwildWorker::TrainFiles() { platform::SetNumThreads(1); + platform::Timer timeline; + timeline.Start(); + int total_ins_num = 0; // how to accumulate fetched values here device_reader_->Start(); int cur_batch; @@ -213,9 +233,13 @@ void HogwildWorker::TrainFiles() { } } + total_ins_num += cur_batch; PrintFetchVars(); thread_scope_->DropKids(); } + timeline.Pause(); + VLOG(1) << "worker " << thread_id_ << " train cost " + << timeline.ElapsedSec() << " seconds, ins_num: " << total_ins_num; #if defined PADDLE_WITH_PSCORE if (thread_barrier_) { paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement(); diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 10c27ea91d249..3de94425f65cc 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -529,12 +529,12 @@ def _prepare_to_run(self): def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, False) + self.dataset.dynamic_adjust_channel_num(thread_num, True) self.dataset.dynamic_adjust_readers_num(thread_num) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, False) + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) self.dataset.dynamic_adjust_readers_num(self.thread_num) def _set_queue_num(self, queue_num): diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 86c63ababbbfd..0841b339d6655 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -391,7 +391,7 @@ def _prepare_to_run(self): ) def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, False) + self.dataset.dynamic_adjust_channel_num(thread_num, True) self.dataset.dynamic_adjust_readers_num(thread_num) @deprecated( @@ -400,7 +400,7 @@ def _dynamic_adjust_before_train(self, thread_num): ) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, False) + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) self.dataset.dynamic_adjust_readers_num(self.thread_num) @deprecated( From e7b9a51bd3b8148f42f1275f48222ad006934144 Mon Sep 17 00:00:00 2001 From: danleifeng Date: Wed, 28 Apr 2021 05:21:51 +0000 Subject: [PATCH 2/6] support cuda11 for heterps;add profiler in oneps; test=develop --- paddle/fluid/framework/hogwild_worker.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index 8a9979613ea9c..56244f6264ab4 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -238,8 +238,8 @@ void HogwildWorker::TrainFiles() { thread_scope_->DropKids(); } timeline.Pause(); - VLOG(1) << "worker " << thread_id_ << " train cost " - << timeline.ElapsedSec() << " seconds, ins_num: " << total_ins_num; + VLOG(1) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec() + << " seconds, ins_num: " << total_ins_num; #if defined PADDLE_WITH_PSCORE if (thread_barrier_) { paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement(); From 05f7f7117f281ab59172d8febb42c2fcdea5ae0a Mon Sep 17 00:00:00 2001 From: danleifeng Date: Thu, 29 Apr 2021 03:45:10 +0000 Subject: [PATCH 3/6] add set_use_ps_gpu; test=develop --- .../paddle/distributed/fleet/dataset/dataset.py | 14 ++++++++++++-- python/paddle/fluid/dataset.py | 17 +++++++++++++++-- python/paddle/fluid/executor.py | 1 + 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 3de94425f65cc..707cdd29b3682 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -267,6 +267,7 @@ def __init__(self): self.enable_pv_merge = False self.merge_by_lineid = False self.fleet_send_sleep_seconds = None + self.use_ps_gpu = False def _init_distributed_settings(self, **kwargs): """ @@ -529,14 +530,23 @@ def _prepare_to_run(self): def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, True) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_readers_num(thread_num) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) + def _set_use_ps_gpu(self, use_ps_gpu): + self.use_ps_gpu = use_ps_gpu + def _set_queue_num(self, queue_num): """ Set Dataset output queue num, training threads get data from queues diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 0841b339d6655..3d404ea864b92 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -352,6 +352,7 @@ def __init__(self): self.merge_by_lineid = False self.fleet_send_sleep_seconds = None self.trainer_num = -1 + self.use_ps_gpu = False @deprecated( since="2.0.0", @@ -391,7 +392,10 @@ def _prepare_to_run(self): ) def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(thread_num, True) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_readers_num(thread_num) @deprecated( @@ -400,9 +404,18 @@ def _dynamic_adjust_before_train(self, thread_num): ) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: - self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + if self.use_ps_gpu: + self.dataset.dynamic_adjust_channel_num(self.thread_num, True) + else: + self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) + @deprecated( + since="2.1.0", + update_to="paddle.distributed.InMemoryDataset._set_use_ps_gpu") + def _set_use_ps_gpu(self, use_ps_gpu): + self.use_ps_gpu = use_ps_gpu + @deprecated( since="2.0.0", update_to="paddle.distributed.InMemoryDataset._set_queue_num") diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 62a9c42ee0a61..071bfce1e99cc 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1507,6 +1507,7 @@ def _run_from_dataset(self, trainer._gen_trainer_desc() self._dump_debug_info(program=program, trainer=trainer) + dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu) dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) trainer_instance = self._default_executor.init_for_dataset( From b666fe7ba0250242bd9b0cde0b0db120f35ea00d Mon Sep 17 00:00:00 2001 From: danleifeng Date: Thu, 29 Apr 2021 07:25:43 +0000 Subject: [PATCH 4/6] edit glogv for train log;test=develop --- paddle/fluid/framework/hogwild_worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/framework/hogwild_worker.cc b/paddle/fluid/framework/hogwild_worker.cc index 56244f6264ab4..b2d170888e28f 100644 --- a/paddle/fluid/framework/hogwild_worker.cc +++ b/paddle/fluid/framework/hogwild_worker.cc @@ -238,7 +238,7 @@ void HogwildWorker::TrainFiles() { thread_scope_->DropKids(); } timeline.Pause(); - VLOG(1) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec() + VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec() << " seconds, ins_num: " << total_ins_num; #if defined PADDLE_WITH_PSCORE if (thread_barrier_) { From dc6d15dffba7e016019a9765d504bb3d4c70914e Mon Sep 17 00:00:00 2001 From: danleifeng Date: Thu, 29 Apr 2021 09:16:32 +0000 Subject: [PATCH 5/6] fix queuedataset set_use_ps_gpu fun;test=develop --- .../paddle/distributed/fleet/dataset/dataset.py | 14 ++++++++++---- python/paddle/fluid/dataset.py | 17 ++++++++++------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 707cdd29b3682..af8b63924d1d2 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -31,6 +31,7 @@ def __init__(self): self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + self.use_ps_gpu = False def init(self, batch_size=1, @@ -212,6 +213,15 @@ def _prepare_to_run(self): self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() + def _set_use_ps_gpu(self, use_ps_gpu): + """ + set use_ps_gpu flag + + Args: + use_ps_gpu: bool + """ + self.use_ps_gpu = use_ps_gpu + def _finish_to_run(self): self.dataset.destroy_readers() @@ -267,7 +277,6 @@ def __init__(self): self.enable_pv_merge = False self.merge_by_lineid = False self.fleet_send_sleep_seconds = None - self.use_ps_gpu = False def _init_distributed_settings(self, **kwargs): """ @@ -544,9 +553,6 @@ def _dynamic_adjust_after_train(self): self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) - def _set_use_ps_gpu(self, use_ps_gpu): - self.use_ps_gpu = use_ps_gpu - def _set_queue_num(self, queue_num): """ Set Dataset output queue num, training threads get data from queues diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 3d404ea864b92..b4cd3326ddec5 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -74,6 +74,7 @@ def __init__(self): self.dataset = core.Dataset("MultiSlotDataset") self.thread_num = 1 self.filelist = [] + self.use_ps_gpu = False def set_pipe_command(self, pipe_command): """ @@ -300,6 +301,15 @@ def _prepare_to_run(self): self.dataset.set_data_feed_desc(self.desc()) self.dataset.create_readers() + def _set_use_ps_gpu(self, use_ps_gpu): + """ + set use_ps_gpu flag + + Args: + use_ps_gpu: bool + """ + self.use_ps_gpu = use_ps_gpu + def _finish_to_run(self): self.dataset.destroy_readers() @@ -352,7 +362,6 @@ def __init__(self): self.merge_by_lineid = False self.fleet_send_sleep_seconds = None self.trainer_num = -1 - self.use_ps_gpu = False @deprecated( since="2.0.0", @@ -410,12 +419,6 @@ def _dynamic_adjust_after_train(self): self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) - @deprecated( - since="2.1.0", - update_to="paddle.distributed.InMemoryDataset._set_use_ps_gpu") - def _set_use_ps_gpu(self, use_ps_gpu): - self.use_ps_gpu = use_ps_gpu - @deprecated( since="2.0.0", update_to="paddle.distributed.InMemoryDataset._set_queue_num") From a8801cdc2580b97d0afb8958dc4581844f1b0dcb Mon Sep 17 00:00:00 2001 From: danleifeng Date: Sat, 8 May 2021 04:04:06 +0000 Subject: [PATCH 6/6] fix queuedataset set_use_ps_gpu fun;test=develop --- python/paddle/fluid/executor.py | 4 +++- .../paddle/fluid/tests/unittests/test_communicator_ps_gpu.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 071bfce1e99cc..620729795bc20 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1507,7 +1507,9 @@ def _run_from_dataset(self, trainer._gen_trainer_desc() self._dump_debug_info(program=program, trainer=trainer) - dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu) + # in case of calling _set_use_ps_gpu explicitly + if dataset.use_ps_gpu is False: + dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu) dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) trainer_instance = self._default_executor.init_for_dataset( diff --git a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py index 5de1ebf581372..0b956d5031fec 100644 --- a/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/test_communicator_ps_gpu.py @@ -73,6 +73,7 @@ def test_communicator_ps_gpu(self): dataset.init( batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) dataset.set_filelist(["test_communicator_ps_gpu.txt"]) + dataset._set_use_ps_gpu(1) dataset.load_into_memory() os.environ["TEST_MODE"] = "1"