Skip to content

Commit

Permalink
fix bug in pscore (PaddlePaddle#35698)
Browse files Browse the repository at this point in the history
* add trainer desc config to distributed strategy

* code style modified

* data_feed set lod

* fix bug

* code style

* fix bug
  • Loading branch information
esythan authored and AnnaTrainingG committed Sep 29, 2021
1 parent e2e895b commit e506a12
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
14 changes: 14 additions & 0 deletions paddle/fluid/framework/hogwild_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ void HogwildWorker::TrainFiles() {
// how to accumulate fetched values here
device_reader_->Start();
int cur_batch;
int batch_cnt = 0;
while ((cur_batch = device_reader_->Next()) > 0) {
for (auto &op : ops_) {
bool need_skip = false;
Expand All @@ -230,13 +231,26 @@ void HogwildWorker::TrainFiles() {
}
}

if (need_dump_field_) {
DumpField(*thread_scope_, dump_mode_, dump_interval_);
}
if (need_dump_param_ && thread_id_ == 0) {
DumpParam(*thread_scope_, batch_cnt);
}

total_ins_num += cur_batch;
++batch_cnt;
PrintFetchVars();
thread_scope_->DropKids();
}
timeline.Pause();
VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec()
<< " seconds, ins_num: " << total_ins_num;

if (need_dump_field_ || need_dump_param_) {
writer_.Flush();
}

#if defined PADDLE_WITH_PSCORE
if (thread_barrier_) {
paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement();
Expand Down
8 changes: 6 additions & 2 deletions paddle/fluid/framework/multi_trainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,19 @@ void MultiTrainer::Finalize() {
if (need_dump_field_ || need_dump_param_) {
FinalizeDumpEnv();
}
#ifdef PADDLE_WITH_HETERPS

for (size_t i = 0; i < need_merge_var_names_.size(); i++) {
Variable* root_var = root_scope_->FindVar(need_merge_var_names_[i]);
if (root_var == nullptr) {
continue;
}
LoDTensor* root_tensor = root_var->GetMutable<LoDTensor>();

#ifdef PADDLE_WITH_HETERPS
for (size_t j = 0; j < places_.size(); j++) {
#else
for (int j = 1; j < thread_num_; j++) {
#endif
Scope* cur_thread_scope = workers_[j]->GetThreadScope();
Variable* thread_var =
cur_thread_scope->FindVar(need_merge_var_names_[i]);
Expand All @@ -246,8 +250,8 @@ void MultiTrainer::Finalize() {
_ForEachDataType_(MergeCallback);
}
}
#ifdef PADDLE_WITH_HETERPS
MergeDenseParam();

#endif
root_scope_->DropKids();
}
Expand Down
5 changes: 4 additions & 1 deletion python/paddle/fluid/device_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ def _gen_worker_desc(self, trainer_desc):
trainer_desc.device_worker_name = "HogwildWorker"
if self._infer:
# just ignore feed op for inference model
trainer_desc.hogwild_param.skip_ops.extend(["feed"])
trainer_desc.hogwild_param.skip_ops.extend([
"feed", "push_sparse", "push_sparse_v2", "push_dense",
"distributed_push_sparse", "send"
])

dense_table_set = set()
program_id = str(id(self._program))
Expand Down

0 comments on commit e506a12

Please sign in to comment.