Skip to content

Commit

Permalink
cherrypick for PaddlePaddle#32640 :add profile and fix dataset hang i…
Browse files Browse the repository at this point in the history
…n heterps;test=develop
  • Loading branch information
danleifeng committed May 18, 2021
1 parent 4639f5d commit 7166cd0
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 6 deletions.
3 changes: 3 additions & 0 deletions paddle/fluid/framework/device_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ class HogwildWorker : public CPUWorkerBase {
HogwildWorkerParameter param_;
std::vector<std::string> skip_ops_;
std::map<std::string, int> stat_var_name_map_;
#ifdef PADDLE_WITH_HETERPS
platform::DeviceContext* dev_ctx_ = nullptr;
#endif
};

class DownpourWorker : public HogwildWorker {
Expand Down
24 changes: 24 additions & 0 deletions paddle/fluid/framework/hogwild_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ void HogwildWorker::Initialize(const TrainerDesc &desc) {
for (int i = 0; i < param_.stat_var_names_size(); ++i) {
stat_var_name_map_[param_.stat_var_names(i)] = 1;
}
#ifdef PADDLE_WITH_HETERPS
dev_ctx_ = platform::DeviceContextPool::Instance().Get(place_);
#endif
}

void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) {
Expand Down Expand Up @@ -150,6 +153,9 @@ void HogwildWorker::TrainFilesWithProfiler() {
VLOG(3) << "Going to run op " << op_name[i];
if (!need_skip) {
ops_[i]->Run(*thread_scope_, place_);
#ifdef PADDLE_WITH_HETERPS
dev_ctx_->Wait();
#endif
}
VLOG(3) << "Op " << op_name[i] << " Finished";
timeline.Pause();
Expand All @@ -167,6 +173,16 @@ void HogwildWorker::TrainFilesWithProfiler() {
total_inst += cur_batch;
++batch_cnt;
PrintFetchVars();
#ifdef PADDLE_WITH_HETERPS
dev_ctx_->Wait();
VLOG(1) << "GpuPs worker " << thread_id_ << " train cost " << total_time
<< " seconds, ins_num: " << total_inst;
for (size_t i = 0; i < op_name.size(); ++i) {
VLOG(1) << "card:" << thread_id_ << ", op: " << op_name[i]
<< ", mean time: " << op_total_time[i] / total_inst
<< "s, totol time:" << op_total_time[i] << "sec";
}
#else
if (thread_id_ == 0) {
if (batch_cnt > 0 && batch_cnt % 100 == 0) {
for (size_t i = 0; i < ops_.size(); ++i) {
Expand All @@ -178,6 +194,7 @@ void HogwildWorker::TrainFilesWithProfiler() {
fprintf(stderr, "%6.2f instances/s\n", total_inst / total_time);
}
}
#endif
thread_scope_->DropKids();
timeline.Start();
}
Expand All @@ -195,7 +212,10 @@ void HogwildWorker::TrainFilesWithProfiler() {

void HogwildWorker::TrainFiles() {
platform::SetNumThreads(1);
platform::Timer timeline;
timeline.Start();

int total_ins_num = 0;
// how to accumulate fetched values here
device_reader_->Start();
int cur_batch;
Expand All @@ -213,9 +233,13 @@ void HogwildWorker::TrainFiles() {
}
}

total_ins_num += cur_batch;
PrintFetchVars();
thread_scope_->DropKids();
}
timeline.Pause();
VLOG(3) << "worker " << thread_id_ << " train cost " << timeline.ElapsedSec()
<< " seconds, ins_num: " << total_ins_num;
#if defined PADDLE_WITH_PSCORE
if (thread_barrier_) {
paddle::distributed::Communicator::GetInstance()->BarrierTriggerDecrement();
Expand Down
21 changes: 18 additions & 3 deletions python/paddle/distributed/fleet/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self):
self.dataset = core.Dataset("MultiSlotDataset")
self.thread_num = 1
self.filelist = []
self.use_ps_gpu = False

def init(self,
batch_size=1,
Expand Down Expand Up @@ -212,7 +213,15 @@ def _prepare_to_run(self):
self.dataset.set_data_feed_desc(self._desc())
self.dataset.create_readers()

def _finish_to_run(self):
def _set_use_ps_gpu(self, use_ps_gpu):
"""
set use_ps_gpu flag
Args:
use_ps_gpu: bool
"""
self.use_ps_gpu = use_ps_gpu

def _finish_to_run(self):
self.dataset.destroy_readers()

def _desc(self):
Expand Down Expand Up @@ -529,12 +538,18 @@ def _prepare_to_run(self):

def _dynamic_adjust_before_train(self, thread_num):
if not self.is_user_set_queue_num:
self.dataset.dynamic_adjust_channel_num(thread_num, False)
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(thread_num, False)
self.dataset.dynamic_adjust_readers_num(thread_num)

def _dynamic_adjust_after_train(self):
if not self.is_user_set_queue_num:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(self.thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
self.dataset.dynamic_adjust_readers_num(self.thread_num)

def _set_queue_num(self, queue_num):
Expand Down
19 changes: 17 additions & 2 deletions python/paddle/fluid/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self):
self.dataset = core.Dataset("MultiSlotDataset")
self.thread_num = 1
self.filelist = []
self.use_ps_gpu = False

def set_pipe_command(self, pipe_command):
"""
Expand Down Expand Up @@ -300,6 +301,14 @@ def _prepare_to_run(self):
self.dataset.set_data_feed_desc(self.desc())
self.dataset.create_readers()

def _set_use_ps_gpu(self, use_ps_gpu):
"""
set use_ps_gpu flag
Args:
use_ps_gpu: bool
"""
self.use_ps_gpu = use_ps_gpu

def _finish_to_run(self):
self.dataset.destroy_readers()

Expand Down Expand Up @@ -391,7 +400,10 @@ def _prepare_to_run(self):
)
def _dynamic_adjust_before_train(self, thread_num):
if not self.is_user_set_queue_num:
self.dataset.dynamic_adjust_channel_num(thread_num, False)
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(thread_num, False)
self.dataset.dynamic_adjust_readers_num(thread_num)

@deprecated(
Expand All @@ -400,7 +412,10 @@ def _dynamic_adjust_before_train(self, thread_num):
)
def _dynamic_adjust_after_train(self):
if not self.is_user_set_queue_num:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
if self.use_ps_gpu:
self.dataset.dynamic_adjust_channel_num(self.thread_num, True)
else:
self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
self.dataset.dynamic_adjust_readers_num(self.thread_num)

@deprecated(
Expand Down
5 changes: 4 additions & 1 deletion python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,10 @@ def _run_from_dataset(self,
trainer._gen_trainer_desc()

self._dump_debug_info(program=program, trainer=trainer)
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)
# in case of calling _set_use_ps_gpu explicitly
if dataset.use_ps_gpu is False:
dataset._set_use_ps_gpu(trainer.proto_desc.use_ps_gpu)
dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num)

trainer_instance = self._default_executor.init_for_dataset(
program.desc, trainer._desc(), scope, dataset.dataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def test_communicator_ps_gpu(self):
dataset.init(
batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_filelist(["test_communicator_ps_gpu.txt"])
dataset._set_use_ps_gpu(1)
dataset.load_into_memory()

os.environ["TEST_MODE"] = "1"
Expand Down

0 comments on commit 7166cd0

Please sign in to comment.