From b490ec9bf4044736b6f80f1aad18a38001ffc1a5 Mon Sep 17 00:00:00 2001 From: zhaocaibei123 Date: Mon, 7 Mar 2022 08:03:26 +0000 Subject: [PATCH 1/2] fix & add log --- paddle/fluid/distributed/ps/wrapper/fleet.cc | 21 +- paddle/fluid/distributed/ps/wrapper/fleet.h | 7 + paddle/fluid/framework/device_worker.h | 4 +- paddle/fluid/framework/dist_multi_trainer.cc | 12 +- .../fluid/framework/downpour_lite_worker.cc | 235 +++++++++--------- paddle/fluid/framework/pull_dense_worker.cc | 15 ++ 6 files changed, 175 insertions(+), 119 deletions(-) diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.cc b/paddle/fluid/distributed/ps/wrapper/fleet.cc index 2e99d4e8c42cf..7b599ea5089e6 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.cc +++ b/paddle/fluid/distributed/ps/wrapper/fleet.cc @@ -33,6 +33,19 @@ bool FleetWrapper::is_initialized_ = false; std::shared_ptr FleetWrapper::pserver_ptr_ = NULL; std::shared_ptr FleetWrapper::worker_ptr_ = NULL; +int32_t FleetWrapper::CopyTable(const uint64_t src_table_id, + const uint64_t dest_table_id) { + VLOG(0) << "support later"; + return 0; +} + +int32_t FleetWrapper::CopyTableByFeasign( + const uint64_t src_table_id, const uint64_t dest_table_id, + const std::vector& feasign_list) { + VLOG(0) << "support later"; + return 0; +} + void FleetWrapper::SetClient2ClientConfig(int request_timeout_ms, int connect_timeout_ms, int max_retry) { @@ -374,6 +387,7 @@ void FleetWrapper::PullDenseVarsAsync( } auto status = worker_ptr_->pull_dense(regions.data(), regions.size(), tid); pull_dense_status->push_back(std::move(status)); + VLOG(0) << "debug zcb pscore fleet->PullDenseVarsAsync ret"; } void FleetWrapper::PullDenseVarsSync( @@ -741,8 +755,13 @@ void FleetWrapper::ShrinkDenseTable(int table_id, Scope* scope, } void FleetWrapper::ClientFlush() { - auto ret = pserver_ptr_->_worker_ptr->flush(); + VLOG(0) << "debug zcb begin client flush"; + auto ret = worker_ptr_->flush(); ret.wait(); + int32_t err_code = ret.get(); + if (err_code == -1) { + LOG(ERROR) << "Client Flush failed"; + } } int FleetWrapper::RegisterClientToClientMsgHandler(int msg_type, diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.h b/paddle/fluid/distributed/ps/wrapper/fleet.h index b87d235e471e4..20c1ff4a6157d 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.h +++ b/paddle/fluid/distributed/ps/wrapper/fleet.h @@ -69,6 +69,13 @@ class FleetWrapper { client2client_max_retry_ = 3; } + // TODO(zhaocaibei123: later) + int32_t CopyTable(const uint64_t src_table_id, const uint64_t dest_table_id); + + int32_t CopyTableByFeasign(const uint64_t src_table_id, + const uint64_t dest_table_id, + const std::vector& feasign_list); + // set client to client communication config void SetClient2ClientConfig(int request_timeout_ms, int connect_timeout_ms, int max_retry); diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index d313db2bb81fd..eeb1ead073b2b 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -27,6 +27,7 @@ limitations under the License. */ #include // NOLINT #include +#include "paddle/fluid/distributed/ps/wrapper/fleet.h" #include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/heter_util.h" @@ -108,6 +109,7 @@ class PullDenseWorker { private: std::shared_ptr fleet_ptr_; + std::shared_ptr new_fleet_ptr_; PullDenseWorkerParameter param_; DownpourWorkerParameter dwp_param_; Scope* root_scope_; @@ -350,7 +352,7 @@ class DownpourLiteWorker : public HogwildWorker { virtual void TrainFilesWithProfiler(); protected: - std::shared_ptr fleet_ptr_; + std::shared_ptr fleet_ptr_; std::shared_ptr pull_dense_worker_; void FillSparseValue(size_t table_id); void PushGradients(); diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 2be350ab28041..672198ddb1a3b 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include "paddle/fluid/distributed/ps/wrapper/fleet.h" #include "paddle/fluid/framework/convert_utils.h" #include "paddle/fluid/framework/device_worker_factory.h" #include "paddle/fluid/framework/trainer.h" @@ -54,10 +55,10 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc, workers_[i]->SetWorkerNum(thread_num_); } - VLOG(3) << "going to initialize pull dense worker"; + VLOG(0) << "going to initialize pull dense worker"; pull_dense_worker_ = PullDenseWorker::GetInstance(); pull_dense_worker_->Initialize(trainer_desc); - VLOG(3) << "initialize pull dense worker"; + VLOG(0) << "initialize pull dense worker"; SetDebug(trainer_desc.debug()); } @@ -176,8 +177,13 @@ void DistMultiTrainer::Finalize() { pull_dense_worker_->Stop(); root_scope_->DropKids(); - // flush local client push queue +// flush local client push queue +#ifdef PADDLE_WITH_PSLIB auto fleet_ptr_ = FleetWrapper::GetInstance(); +#else + auto fleet_ptr_ = paddle::distributed::FleetWrapper::GetInstance(); + VLOG(0) << "debug zcb dist multi trainer call client-> flush"; +#endif fleet_ptr_->ClientFlush(); } diff --git a/paddle/fluid/framework/downpour_lite_worker.cc b/paddle/fluid/framework/downpour_lite_worker.cc index 8be1b64a10526..a26502d2f982d 100644 --- a/paddle/fluid/framework/downpour_lite_worker.cc +++ b/paddle/fluid/framework/downpour_lite_worker.cc @@ -87,7 +87,7 @@ void DownpourLiteWorker::Initialize(const TrainerDesc& desc) { need_to_push_sparse_ = param_.push_sparse(); need_to_push_dense_ = param_.push_dense(); - fleet_ptr_ = FleetWrapper::GetInstance(); + fleet_ptr_ = paddle::distributed::FleetWrapper::GetInstance(); fetch_config_ = desc.fetch_config(); use_cvm_ = desc.use_cvm(); // for sparse value accessor, embedding only @@ -499,47 +499,49 @@ void DownpourLiteWorker::TrainFilesWithProfiler() { copy_table_time += timeline.ElapsedSec(); total_time += timeline.ElapsedSec(); - VLOG(3) << "program config size: " << param_.program_config_size(); - for (int i = 0; i < param_.program_config(0).pull_sparse_table_id_size(); - ++i) { - uint64_t tid = static_cast( - param_.program_config(0).pull_sparse_table_id(i)); - TableParameter table; - for (auto j : param_.sparse_table()) { - if (j.table_id() == tid) { - table = j; - break; - } - } - timeline.Start(); - fleet_ptr_->PullSparseVarsSync( - *thread_scope_, tid, sparse_key_names_[tid], &features_[tid], - &feature_values_[tid], table.fea_dim(), sparse_value_names_[tid]); - timeline.Pause(); - pull_sparse_time += timeline.ElapsedSec(); - total_time += timeline.ElapsedSec(); - timeline.Start(); - CollectLabelInfo(i); - timeline.Pause(); - collect_label_time += timeline.ElapsedSec(); - total_time += timeline.ElapsedSec(); - timeline.Start(); - FillSparseValue(i); - timeline.Pause(); - fill_sparse_time += timeline.ElapsedSec(); - total_time += timeline.ElapsedSec(); - timeline.Start(); - auto nid_iter = std::find(sparse_value_names_[tid].begin(), - sparse_value_names_[tid].end(), - adjust_ins_weight_config_.nid_slot()); - if (nid_iter != sparse_value_names_[tid].end()) { - AdjustInsWeight(); - } - timeline.Pause(); - adjust_ins_weight_time += timeline.ElapsedSec(); - total_time += timeline.ElapsedSec(); - } - VLOG(3) << "Fill sparse value for all sparse table done."; + // VLOG(3) << "program config size: " << param_.program_config_size(); + // for (int i = 0; i < + // param_.program_config(0).pull_sparse_table_id_size(); + // ++i) { + // uint64_t tid = static_cast( + // param_.program_config(0).pull_sparse_table_id(i)); + // TableParameter table; + // for (auto j : param_.sparse_table()) { + // if (j.table_id() == tid) { + // table = j; + // break; + // } + // } + // timeline.Start(); + // fleet_ptr_->PullSparseVarsSync( + // *thread_scope_, tid, sparse_key_names_[tid], &features_[tid], + // &feature_values_[tid], table.fea_dim(), + // sparse_value_names_[tid]); + // timeline.Pause(); + // pull_sparse_time += timeline.ElapsedSec(); + // total_time += timeline.ElapsedSec(); + // timeline.Start(); + // CollectLabelInfo(i); + // timeline.Pause(); + // collect_label_time += timeline.ElapsedSec(); + // total_time += timeline.ElapsedSec(); + // timeline.Start(); + // FillSparseValue(i); + // timeline.Pause(); + // fill_sparse_time += timeline.ElapsedSec(); + // total_time += timeline.ElapsedSec(); + // timeline.Start(); + // auto nid_iter = std::find(sparse_value_names_[tid].begin(), + // sparse_value_names_[tid].end(), + // adjust_ins_weight_config_.nid_slot()); + // if (nid_iter != sparse_value_names_[tid].end()) { + // AdjustInsWeight(); + // } + // timeline.Pause(); + // adjust_ins_weight_time += timeline.ElapsedSec(); + // total_time += timeline.ElapsedSec(); + // } + // VLOG(3) << "Fill sparse value for all sparse table done."; int run_op_idx = 0; for (auto& op : ops_) { @@ -579,30 +581,31 @@ void DownpourLiteWorker::TrainFilesWithProfiler() { "Tensor %s contains NAN.", var_name)); } - if (need_to_push_sparse_) { - for (int i = 0; i < param_.program_config(0).push_sparse_table_id_size(); - ++i) { - uint64_t tid = static_cast( - param_.program_config(0).push_sparse_table_id(i)); - TableParameter table; - for (auto i : param_.sparse_table()) { - if (i.table_id() == tid) { - table = i; - break; - } - } - timeline.Start(); - fleet_ptr_->PushSparseVarsWithLabelAsync( - *thread_scope_, tid, features_[tid], feature_labels_[tid], - sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), - &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, - dump_slot_, &sparse_push_keys_[tid], no_cvm_, - scale_sparse_gradient_with_batch_size_); - timeline.Pause(); - push_sparse_time += timeline.ElapsedSec(); - total_time += timeline.ElapsedSec(); - } - } +// if (need_to_push_sparse_) { +// for (int i = 0; i < +// param_.program_config(0).push_sparse_table_id_size(); +// ++i) { +// uint64_t tid = static_cast( +// param_.program_config(0).push_sparse_table_id(i)); +// TableParameter table; +// for (auto i : param_.sparse_table()) { +// if (i.table_id() == tid) { +// table = i; +// break; +// } +// } +// timeline.Start(); +// fleet_ptr_->PushSparseVarsWithLabelAsync( +// *thread_scope_, tid, features_[tid], feature_labels_[tid], +// sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(), +// &feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_, +// dump_slot_, &sparse_push_keys_[tid], no_cvm_, +// scale_sparse_gradient_with_batch_size_); +// timeline.Pause(); +// push_sparse_time += timeline.ElapsedSec(); +// total_time += timeline.ElapsedSec(); +// } +// } #if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_PSCORE) if (copy_table_config_.need_copy()) { @@ -616,54 +619,56 @@ void DownpourLiteWorker::TrainFilesWithProfiler() { } #endif - if (need_to_push_dense_) { - timeline.Start(); - for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); - ++i) { - uint64_t tid = static_cast( - param_.program_config(0).push_dense_table_id(i)); - fleet_ptr_->PushDenseVarsAsync( - *thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_, - scale_datanorm_, cur_batch); - } - timeline.Pause(); - push_dense_time += timeline.ElapsedSec(); - total_time += timeline.ElapsedSec(); - VLOG(3) << "push sparse and dense gradient done."; - int32_t tmp_push_dense_wait_times = -1; - static uint32_t push_dense_wait_times = - static_cast(tmp_push_dense_wait_times); - if (push_dense_status_.size() >= push_dense_wait_times) { - for (auto& t : push_dense_status_) { - t.wait(); - } - push_dense_status_.resize(0); - } - - if (tmp_push_dense_wait_times == -1) { - push_dense_status_.resize(0); - } - } - - if (need_to_push_sparse_) { - int32_t tmp_push_sparse_wait_times = -1; - static uint32_t push_sparse_wait_times = - static_cast(tmp_push_sparse_wait_times); - if (push_sparse_status_.size() >= push_sparse_wait_times) { - for (auto& t : push_sparse_status_) { - t.wait(); - } - push_sparse_status_.resize(0); - } - - if (tmp_push_sparse_wait_times == -1) { - push_sparse_status_.resize(0); - } - - VLOG(3) << "going to increase thread version"; - VLOG(3) << "push dense table id size: " - << param_.program_config(0).push_dense_table_id_size(); - } + // if (need_to_push_dense_) { + // timeline.Start(); + // for (int i = 0; i < + // param_.program_config(0).push_dense_table_id_size(); + // ++i) { + // uint64_t tid = static_cast( + // param_.program_config(0).push_dense_table_id(i)); + // fleet_ptr_->PushDenseVarsAsync( + // *thread_scope_, tid, dense_grad_names_[tid], + // &push_sparse_status_, + // scale_datanorm_, cur_batch); + // } + // timeline.Pause(); + // push_dense_time += timeline.ElapsedSec(); + // total_time += timeline.ElapsedSec(); + // VLOG(3) << "push sparse and dense gradient done."; + // int32_t tmp_push_dense_wait_times = -1; + // static uint32_t push_dense_wait_times = + // static_cast(tmp_push_dense_wait_times); + // if (push_dense_status_.size() >= push_dense_wait_times) { + // for (auto& t : push_dense_status_) { + // t.wait(); + // } + // push_dense_status_.resize(0); + // } + // + // if (tmp_push_dense_wait_times == -1) { + // push_dense_status_.resize(0); + // } + // } + // + // if (need_to_push_sparse_) { + // int32_t tmp_push_sparse_wait_times = -1; + // static uint32_t push_sparse_wait_times = + // static_cast(tmp_push_sparse_wait_times); + // if (push_sparse_status_.size() >= push_sparse_wait_times) { + // for (auto& t : push_sparse_status_) { + // t.wait(); + // } + // push_sparse_status_.resize(0); + // } + // + // if (tmp_push_sparse_wait_times == -1) { + // push_sparse_status_.resize(0); + // } + // + // VLOG(3) << "going to increase thread version"; + // VLOG(3) << "push dense table id size: " + // << param_.program_config(0).push_dense_table_id_size(); + // } if (need_to_push_dense_) { for (int i = 0; i < param_.program_config(0).push_dense_table_id_size(); @@ -765,6 +770,7 @@ void DownpourLiteWorker::TrainFiles() { int cur_batch; while ((cur_batch = device_reader_->Next()) > 0) { if (copy_table_config_.need_copy()) { + VLOG(3) << "Begin to copy table"; if (batch_cnt % copy_table_config_.batch_num() == 0) { CopySparseTable(); CopyDenseTable(); @@ -1000,6 +1006,7 @@ void DownpourLiteWorker::TrainFiles() { ++i) { uint64_t tid = static_cast( param_.program_config(0).push_dense_table_id(i)); + VLOG(0) << "debug zcb pull_dense_worker->IncreaseThreadVersion"; pull_dense_worker_->IncreaseThreadVersion(thread_id_, tid); } } diff --git a/paddle/fluid/framework/pull_dense_worker.cc b/paddle/fluid/framework/pull_dense_worker.cc index 7fb81a868d97f..94deb2319d9fa 100644 --- a/paddle/fluid/framework/pull_dense_worker.cc +++ b/paddle/fluid/framework/pull_dense_worker.cc @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include +#include "paddle/fluid/distributed/ps/wrapper/fleet.h" #include "paddle/fluid/framework/device_worker.h" namespace phi { @@ -61,7 +62,13 @@ void PullDenseWorker::Initialize(const TrainerDesc& param) { last_versions_[tid] = 0; current_version_[tid] = 0; } + +#ifdef PADDLE_WITH_PSLIB fleet_ptr_ = FleetWrapper::GetInstance(); +#else + new_fleet_ptr_ = paddle::distributed::FleetWrapper::GetInstance(); +#endif + #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) copy_streams_.clear(); #endif @@ -170,6 +177,11 @@ void PullDenseWorker::PullDense(bool force_update) { VLOG(3) << "pull dense " << force_update << " " << tid; fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid], &pull_dense_status_, false); +#elif defined(PADDLE_WITH_PSCORE) + VLOG(0) << "debug zcb pull dense worker begin pull dense"; + new_fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, + dense_value_names_[tid], + &pull_dense_status_, true); #else fleet_ptr_->PullDenseVarsAsync(*root_scope_, tid, dense_value_names_[tid], &pull_dense_status_, true); @@ -209,6 +221,9 @@ bool PullDenseWorker::CheckUpdateParam(uint64_t table_id) { auto& version = training_versions_[table_id]; current_version_[table_id] = *(std::min_element(version.begin(), version.end())); + VLOG(0) << "debug zcb PullDenseWorker::CheckUpdateParam " + << current_version_[table_id] << " " << last_versions_[table_id] + << " " << threshold_; if (current_version_[table_id] - last_versions_[table_id] < static_cast(threshold_)) { return false; From 4ce48806e7fc00f440b2ee808ff094cc5ccfb35f Mon Sep 17 00:00:00 2001 From: zhaocaibei123 Date: Mon, 7 Mar 2022 11:01:24 +0000 Subject: [PATCH 2/2] fix --- paddle/fluid/distributed/ps/wrapper/fleet.cc | 13 +++++++++---- paddle/fluid/distributed/ps/wrapper/fleet.h | 3 +++ paddle/fluid/framework/dist_multi_trainer.cc | 4 ++++ python/paddle/distributed/ps/the_one_ps.py | 2 +- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.cc b/paddle/fluid/distributed/ps/wrapper/fleet.cc index 7b599ea5089e6..040156afd5e44 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.cc +++ b/paddle/fluid/distributed/ps/wrapper/fleet.cc @@ -33,16 +33,21 @@ bool FleetWrapper::is_initialized_ = false; std::shared_ptr FleetWrapper::pserver_ptr_ = NULL; std::shared_ptr FleetWrapper::worker_ptr_ = NULL; +int FleetWrapper::RegisterHeterCallback(HeterCallBackFunc handler) { + VLOG(0) << "RegisterHeterCallback support later"; + return 0; +} + int32_t FleetWrapper::CopyTable(const uint64_t src_table_id, const uint64_t dest_table_id) { - VLOG(0) << "support later"; + VLOG(0) << "CopyTable support later"; return 0; } int32_t FleetWrapper::CopyTableByFeasign( const uint64_t src_table_id, const uint64_t dest_table_id, const std::vector& feasign_list) { - VLOG(0) << "support later"; + VLOG(0) << "CopyTableByFeasign support later"; return 0; } @@ -179,7 +184,7 @@ void FleetWrapper::StopServer() { void FleetWrapper::FinalizeWorker() { VLOG(3) << "Going to finalize worker"; - pserver_ptr_->finalize_worker(); + worker_ptr_->finalize_worker(); } void FleetWrapper::BarrierWithTable(uint32_t barrier_type) { @@ -445,7 +450,7 @@ void FleetWrapper::PushDenseVarsAsync( float* g = tensor->mutable_data(place); paddle::distributed::Region reg(g, tensor->numel()); regions.emplace_back(std::move(reg)); - VLOG(3) << "FleetWrapper::PushDenseVarsAsync Var " << t << " talbe_id " + VLOG(0) << "FleetWrapper::PushDenseVarsAsync Var " << t << " talbe_id " << table_id << " Temp_data[0] " << g[0] << " Temp_data[-1] " << g[tensor->numel() - 1]; } diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.h b/paddle/fluid/distributed/ps/wrapper/fleet.h index 20c1ff4a6157d..121314c113235 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.h +++ b/paddle/fluid/distributed/ps/wrapper/fleet.h @@ -76,6 +76,9 @@ class FleetWrapper { const uint64_t dest_table_id, const std::vector& feasign_list); + typedef std::function HeterCallBackFunc; + int RegisterHeterCallback(HeterCallBackFunc handler); + // set client to client communication config void SetClient2ClientConfig(int request_timeout_ms, int connect_timeout_ms, int max_retry); diff --git a/paddle/fluid/framework/dist_multi_trainer.cc b/paddle/fluid/framework/dist_multi_trainer.cc index 672198ddb1a3b..5bf331119e4c9 100644 --- a/paddle/fluid/framework/dist_multi_trainer.cc +++ b/paddle/fluid/framework/dist_multi_trainer.cc @@ -63,7 +63,11 @@ void DistMultiTrainer::Initialize(const TrainerDesc &trainer_desc, } void DistMultiTrainer::RegisterHeterCallback() { +#ifdef PADDLE_WITH_PSLIB auto fleet_ptr = FleetWrapper::GetInstance(); +#else + auto fleet_ptr = paddle::distributed::FleetWrapper::GetInstance(); +#endif fleet_ptr->RegisterHeterCallback( [this](int worker, int taskid) { workers_[worker]->Schedule(taskid); }); } diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index f4070428a717c..988e24354e5ab 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -1055,7 +1055,7 @@ def _run_server(self): self._server.run_server(host, int(port)) def _stop_worker(self): - self._communicator.stop() + self._worker.stop_worker() if self.is_heter_ps_mode: assert self._heter_client != None, "heter client should not be None in heterps mode" self._heter_client.stop()