Skip to content

Commit

Permalink
[JitLayer]Polish PEFuntion to speed up JitLayer and fix memory leak (#…
Browse files Browse the repository at this point in the history
…44738)

* Polish PEFuntion to speed up JitLayer

* Polish PEFunction code

* Fix comments
  • Loading branch information
0x45f authored Aug 1, 2022
1 parent 212f015 commit 7512231
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 49 deletions.
14 changes: 10 additions & 4 deletions paddle/fluid/jit/executor_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/phi/core/enforce.h"

#include "paddle/fluid/jit/base_function.h"
#include "paddle/fluid/jit/function_schema.h"
Expand All @@ -36,9 +37,14 @@ class ExecutorFunction : public BaseFunction {
const Name2VariableMap &params_dict,
const phi::Place &place)
: info_(info), place_(place), inner_exe_(place_) {
info_->RemoveDescFeedFetch();
PADDLE_ENFORCE_GT(
static_cast<int64_t>(info_->ProgramDesc().Block(0).OpSize()),
0,
platform::errors::PreconditionNotMet(
"There is no operator in ProgramDesc."));
utils::ShareParamsIntoScope(info_->ParamNames(), params_dict, &scope_);
VLOG(6) << framework::GenScopeTreeDebugInfo(&scope_);
info_->RemoveDescFeedFetch();
}

~ExecutorFunction() noexcept {}
Expand All @@ -56,9 +62,9 @@ class ExecutorFunction : public BaseFunction {
false,
true,
info_->OutputArgNames());
std::vector<DenseTensor> res;
utils::FetchOuts(info_->OutputArgNames(), scope_, &res);
return res;
std::vector<DenseTensor> outputs;
utils::FetchOuts(info_->OutputArgNames(), scope_, &outputs);
return outputs;
}

const std::shared_ptr<FunctionInfo> &Info() const { return info_; }
Expand Down
123 changes: 78 additions & 45 deletions paddle/fluid/jit/pe_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
#include <vector>

#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/framework/details/build_strategy.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/executor_cache.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/phi/core/enforce.h"

#include "paddle/fluid/jit/base_function.h"
#include "paddle/fluid/jit/function_schema.h"
Expand All @@ -31,72 +35,99 @@
namespace paddle {
namespace jit {

using ExecutionStrategy = framework::details::ExecutionStrategy;
using ParallelExecutor = framework::ParallelExecutor;
using Graph = framework::ir::Graph;

class PEFunction : public BaseFunction {
public:
PEFunction(const std::shared_ptr<FunctionInfo> &info,
const Name2VariableMap &params_dict,
const phi::Place &place)
: info_(info), place_(place) {
info_->RemoveDescFeedFetch();
PADDLE_ENFORCE_GT(
static_cast<int64_t>(info_->ProgramDesc().Block(0).OpSize()),
0,
platform::errors::PreconditionNotMet(
"There is no operator in ProgramDesc."));
utils::ShareParamsIntoScope(info_->ParamNames(), params_dict, &scope_);
VLOG(6) << framework::GenScopeTreeDebugInfo(&scope_);
info_->RemoveDescFeedFetch();
CreateGraphAndPE();
}

~PEFunction() noexcept {}

std::vector<Tensor> operator()(const std::vector<Tensor> &inputs) {
auto dense_tensors = utils::ToDenseTensors(inputs);
return utils::ToTensors(this->operator()(dense_tensors));
static ExecutionStrategy GetExecutionStrategy(const platform::Place &place) {
ExecutionStrategy execution_strategy;

auto device_type = platform::Place2DeviceType(place);
switch (device_type) {
case platform::DeviceType::CPU: {
execution_strategy.num_threads_ = 2;
break;
}
case platform::DeviceType::CUDA: {
// NOTE: According experiments, one thread is faster in
// most model training.
execution_strategy.num_threads_ = 1;
break;
}
case platform::DeviceType::XPU: {
execution_strategy.num_threads_ = 1;
break;
}
case platform::DeviceType::IPU: {
execution_strategy.num_threads_ = 1;
break;
}
default:
PADDLE_THROW(platform::errors::Unavailable(
"Unsupported Device type %d.", device_type));
}
execution_strategy.use_device_ = device_type;

return execution_strategy;
}

std::vector<DenseTensor> operator()(const std::vector<DenseTensor> &inputs) {
std::string prog_string;
std::hash<std::string> string_hash;
void CreateGraphAndPE() {
framework::details::BuildStrategy build_strategy;
auto execution_strategy = GetExecutionStrategy(place_);

auto &program_desc = info_->ProgramDesc();
// TODO(dev): Serialize is very slow.
const_cast<framework::ProgramDesc *>(&program_desc)
->Proto()
->SerializePartialToString(&prog_string);
int64_t program_id = static_cast<int64_t>(string_hash(prog_string));

const framework::BlockDesc &global_block = program_desc.Block(0);
int64_t start_op_index = 0;
int64_t end_op_index = static_cast<int64_t>(global_block.OpSize());

graph_ =
std::make_shared<Graph>(program_desc, start_op_index, end_op_index);
inner_pe_ = std::make_shared<ParallelExecutor>(
place_, &scope_, execution_strategy, build_strategy, graph_.get());
inner_pe_->PrepareVariables(&scope_);
inner_pe_->SkipMemoryReuse(/*scope_idx=*/0, info_->InputArgNames());
}

std::vector<Tensor> operator()(const std::vector<Tensor> &inputs) {
auto dense_tensors = utils::ToDenseTensors(inputs);
return utils::ToTensors(this->operator()(dense_tensors));
}

std::vector<DenseTensor> operator()(const std::vector<DenseTensor> &inputs) {
utils::ShareIntoScope(info_->InputArgNames(), inputs, &scope_);
std::vector<std::string> input_var_names = info_->InputArgNames();
std::vector<std::string> output_var_names = info_->OutputArgNames();

if (end_op_index > start_op_index) {
auto cache_info = framework::GetExecutorInfoFromCache(program_desc,
place_,
start_op_index,
end_op_index,
/*is_grad=*/false,
program_id,
&scope_);
auto &parallel_executor = cache_info.first;
auto &skip_eager_delete_vars =
framework::ExecutorInfoCache::Instance().SkipEagerDeleteVars(
program_id, false);
if (cache_info.second /*is_new_created*/) {
parallel_executor->SkipMemoryReuse(/*scope_idx=*/0, input_var_names);
skip_eager_delete_vars.insert(skip_eager_delete_vars.end(),
output_var_names.begin(),
output_var_names.end());

framework::details::ParseSafeEagerDeletionSkipVars(
program_desc,
end_op_index,
output_var_names,
&skip_eager_delete_vars);
}
parallel_executor->RunWithoutFetch(skip_eager_delete_vars);
}
std::vector<DenseTensor> res;
utils::FetchOuts(info_->OutputArgNames(), scope_, &res);
return res;

// update op_handle scope_map in pe->executor_->Graph
std::unordered_map<framework::Scope *, framework::Scope *> scope_map = {
{inner_pe_->GetLocalScopes().front(), &scope_}};
inner_pe_->ResetOpHandleScopeMapOfGraphs(scope_map);
// need to recreate tmp variables in new scope
inner_pe_->PrepareVariables(&scope_);

inner_pe_->RunWithoutFetch(info_->OutputArgNames());

std::vector<DenseTensor> outputs;
utils::FetchOuts(info_->OutputArgNames(), scope_, &outputs);
scope_.DropKids();
return outputs;
}

const std::shared_ptr<FunctionInfo> &Info() const { return info_; }
Expand All @@ -105,6 +136,8 @@ class PEFunction : public BaseFunction {
std::shared_ptr<FunctionInfo> info_;
framework::Scope scope_;
phi::Place place_;
std::shared_ptr<ParallelExecutor> inner_pe_;
std::shared_ptr<Graph> graph_;
};

} // namespace jit
Expand Down

0 comments on commit 7512231

Please sign in to comment.