From 35df747f98ef48f64cd91b323f5889c65b63d088 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Wed, 7 Jul 2021 11:37:21 +0800 Subject: [PATCH 1/2] program cache startup, test=allcase --- paddle/fluid/framework/device_worker.h | 3 +++ paddle/fluid/framework/pipeline_trainer.cc | 10 +++++----- paddle/fluid/framework/section_worker.cc | 6 +++++- python/paddle/fluid/executor.py | 14 ++++++-------- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index b40099542cfd5..c44bda490bb6f 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -581,6 +581,7 @@ class SectionWorker : public DeviceWorker { void RunUpdate( std::unique_ptr&, std::unordered_map>&); + void PrepareUnusedVar(); protected: int section_id_; @@ -595,6 +596,8 @@ class SectionWorker : public DeviceWorker { std::vector> ops_; std::shared_ptr program_; + std::unordered_map> + unused_vars_; static uint64_t batch_id_; platform::DeviceContext* dev_ctx_ = nullptr; diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 3bd50229b94de..9775e43ddb30f 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -113,19 +113,19 @@ void PipelineTrainer::InitTrainerEnv(const ProgramDesc& main_program, this_worker->SetRootScope(root_scope_); this_worker->SetMinibatchScope(minibatch_scope_); this_worker->SetMicrobatchScopes(microbatch_scopes_); + this_worker->PrepareUnusedVar(); } void PipelineTrainer::Run() { VLOG(5) << "Going to run PipelineTrainer::Run()"; - section_thread_ = std::async(&DeviceWorker::TrainFiles, worker_.get()); -} - -void PipelineTrainer::Finalize() { try { - section_thread_.get(); + worker_->TrainFiles(); } catch (platform::EOFException& e) { std::rethrow_exception(std::current_exception()); } +} + +void PipelineTrainer::Finalize() { if (need_dump_field_) { FinalizeDumpEnv(); } diff --git a/paddle/fluid/framework/section_worker.cc b/paddle/fluid/framework/section_worker.cc index 993b9ac52c5b5..a7e84b34b2436 100644 --- a/paddle/fluid/framework/section_worker.cc +++ b/paddle/fluid/framework/section_worker.cc @@ -96,12 +96,16 @@ void SectionWorker::RunUpdate( } } +void SectionWorker::PrepareUnusedVar() { + VLOG(5) << "begin prepare the unsed vars"; + unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_); +} + void SectionWorker::TrainFiles() { VLOG(5) << "begin section_worker TrainFiles"; int64_t max_memory_size = GetEagerDeletionThreshold(); std::unique_ptr gc; - auto unused_vars_ = GetUnusedVars(program_->Block(0), ops_, skip_vars_); if (max_memory_size >= 0) { #if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) if (platform::is_gpu_place(place_)) { diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 81f4ae32397b4..e1bfb683bb230 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1638,8 +1638,12 @@ def _get_real_program_fetch_list(): dataset._dynamic_adjust_before_train(trainer.proto_desc.thread_num) trainer_desc = trainer._desc() # slow, cache - ctx = [trainer_desc, dataset, scope, real_fetch_list] + trainer_instance = self._default_executor.init_for_dataset( + program.desc, trainer_desc, scope, dataset.dataset) + + ctx = [trainer_desc, dataset, scope, real_fetch_list, trainer_instance] if use_program_cache: self._add_ctx_cache(cache_key, ctx) + return ctx def _run_pipeline(self, @@ -1654,20 +1658,14 @@ def _run_pipeline(self, print_period=100, fetch_handler=None, use_program_cache=False): - trainer_desc, dataset, scope, real_fetch_list = \ + trainer_desc, dataset, scope, real_fetch_list, trainer_instance = \ self._prepare_pipeline_ctx(program, dataset, scope, thread, is_infer, debug, fetch_list, fetch_info, print_period, fetch_handler, use_program_cache) - trainer_instance = self._default_executor.init_for_dataset( - program.desc, trainer_desc, scope, dataset.dataset) - self._default_executor.run_from_dataset(trainer_instance) - self._default_executor.release_trainer(trainer_instance) - dataset._dynamic_adjust_after_train() - dataset._finish_to_run() if real_fetch_list: arr = scope.find_var('fetch').get_fetch_list() tensors = arr._move_to_list() From f3b63c675995dfc2e840a03635b4287d7ca64d6c Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 9 Jul 2021 11:27:43 +0800 Subject: [PATCH 2/2] fix bug when no cache, test=allcase --- paddle/fluid/framework/pipeline_trainer.cc | 9 +++++++++ python/paddle/fluid/executor.py | 7 +++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/framework/pipeline_trainer.cc b/paddle/fluid/framework/pipeline_trainer.cc index 9775e43ddb30f..42577972e9b79 100644 --- a/paddle/fluid/framework/pipeline_trainer.cc +++ b/paddle/fluid/framework/pipeline_trainer.cc @@ -123,6 +123,15 @@ void PipelineTrainer::Run() { } catch (platform::EOFException& e) { std::rethrow_exception(std::current_exception()); } + for (auto* micro_scop : microbatch_scopes_) { + // By default, we should delete all kid scopes after run executor because + // some operators may create local scope when running, such as while_op. + // But when while_op also create a local executor to run it's sub block, + // the sub scopes it created should not be dropped immediately, because + // while_grad_op will use some variables created during while_op run, so + // we need to keep the kids and wait for the outer executor to drop them. + micro_scop->DropKids(); + } } void PipelineTrainer::Finalize() { diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index e1bfb683bb230..4f56666a64ba3 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1641,7 +1641,7 @@ def _get_real_program_fetch_list(): trainer_instance = self._default_executor.init_for_dataset( program.desc, trainer_desc, scope, dataset.dataset) - ctx = [trainer_desc, dataset, scope, real_fetch_list, trainer_instance] + ctx = [scope, real_fetch_list, trainer_instance] if use_program_cache: self._add_ctx_cache(cache_key, ctx) return ctx @@ -1658,7 +1658,7 @@ def _run_pipeline(self, print_period=100, fetch_handler=None, use_program_cache=False): - trainer_desc, dataset, scope, real_fetch_list, trainer_instance = \ + scope, real_fetch_list, trainer_instance = \ self._prepare_pipeline_ctx(program, dataset, scope, thread, is_infer, debug, fetch_list, fetch_info, print_period, fetch_handler, @@ -1666,6 +1666,9 @@ def _run_pipeline(self, self._default_executor.run_from_dataset(trainer_instance) + if not use_program_cache: + self._default_executor.release_trainer(trainer_instance) + if real_fetch_list: arr = scope.find_var('fetch').get_fetch_list() tensors = arr._move_to_list()