Skip to content

Commit

Permalink
[feat][AutoParallel] Visualize flow parallel timing diagram in static…
Browse files Browse the repository at this point in the history
… graph mode (#58313)

* merge from openvino master

* add InterpreterRunTime() to record interpreter's run time

* add profiler helper static to produce json file

* add color map and support perfetto format

* recover codes

* control include env for gpu_timer.h

* fix logic for profiler_helper_static.py

* fix build error

* fix build error

* recover thirdparty

* add flag control: not support new ir now

* set auto_parallel_profiler flag to false

* fix

* add auto_parallel_profiler as command parameter

* fix value name

* support gettimeofday for win env

* fix win build error

* fix win build error

* use job_type_to_id

* Fixed repeatedly timing the same stream

* add step line for timeline

* add step timeline and fix logic when job overlap

* update time record logic

* fix bug when start profile start from none zero step

* fix note

* remove FLAGS_auto_parallel_profiler

* use run config instead FLAGS_auto_parallelxx

* fix color map logic

* fix color map logic

* fix bug when log step does not start from 0

* fix

* fix

* don't use set_enable_auto_parallel_profiler

* fix bug

* disable auto_parallel_profiler when not open flag by command line

* fix bug

* remove resettime

* fix build bug

* fix

* remove set enable

* fix build error

* fix build error

* fix build error

* fix ci error

* fix

* fix run error

* fix

* fix

* fix calculate_stream_timer logic

* remove fluid head

* fix build error

* set default value for enable_job_schedule_profiler
  • Loading branch information
AndSonder authored Nov 21, 2023
1 parent 16aa5c4 commit 192a5f8
Show file tree
Hide file tree
Showing 15 changed files with 467 additions and 21 deletions.
6 changes: 5 additions & 1 deletion paddle/fluid/framework/new_executor/interpreter_base_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class InterpreterBaseImpl {
bool need_fetch = true) = 0;

virtual paddle::framework::FetchList Run(
const std::vector<std::string>& feed_names, bool need_fetch = true) = 0;
const std::vector<std::string>& feed_names,
bool need_fetch = true,
bool enable_job_schedule_profiler = false) = 0;

virtual void ShareWorkQueueFrom(InterpreterBaseImpl* src) = 0;

Expand Down Expand Up @@ -104,6 +106,8 @@ class InterpreterBaseImpl {
std::vector<paddle::framework::OpFuncNode>* op_func_nodes) = 0;

virtual bool IsStaticBuild() const = 0;

virtual std::tuple<double, double> InterpreterRunTime() = 0;
};

inline void SetDeviceId(const platform::Place& place) {
Expand Down
9 changes: 7 additions & 2 deletions paddle/fluid/framework/new_executor/interpretercore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ FetchList InterpreterCore::Run(
}

FetchList InterpreterCore::Run(const std::vector<std::string>& feed_names,
bool need_fetch) {
return impl_->Run(feed_names, need_fetch);
bool need_fetch,
bool enable_job_schedule_profiler) {
return impl_->Run(feed_names, need_fetch, enable_job_schedule_profiler);
}

void InterpreterCore::ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src) {
Expand Down Expand Up @@ -130,5 +131,9 @@ void InterpreterCore::Build(

bool InterpreterCore::IsStaticBuild() const { return impl_->IsStaticBuild(); }

std::tuple<double, double> InterpreterCore::InterpreterRunTime() {
return impl_->InterpreterRunTime();
}

} // namespace framework
} // namespace paddle
5 changes: 4 additions & 1 deletion paddle/fluid/framework/new_executor/interpretercore.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class InterpreterCore {
bool need_fetch = true);

paddle::framework::FetchList Run(const std::vector<std::string>& feed_names,
bool need_fetch = true);
bool need_fetch = true,
bool enable_job_schedule_profiler = false);

void ShareWorkQueueFrom(std::shared_ptr<InterpreterCore> src);

Expand Down Expand Up @@ -80,6 +81,8 @@ class InterpreterCore {

bool IsStaticBuild() const;

std::tuple<double, double> InterpreterRunTime();

private:
DISABLE_COPY_AND_ASSIGN(InterpreterCore);

Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/framework/new_executor/pir_interpreter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ void PirInterpreter::ShareBuildResultsFrom(const InterpreterBaseImpl& src) {
<< ") to InterpreterCore(" << this << ")";
}

std::tuple<double, double> PirInterpreter::InterpreterRunTime() {
PADDLE_THROW(platform::errors::Unimplemented(
"PirInterpreter::InterpreterRunTime is not implemented."));
}

const interpreter::PirDependencyBuilder&
PirInterpreter::GetPirDependencyBuilder() const {
return ir_dependency_builder_;
Expand Down Expand Up @@ -1188,7 +1193,8 @@ paddle::framework::FetchList PirInterpreter::Run(
}

FetchList PirInterpreter::Run(const std::vector<std::string>& feed_names,
bool need_fetch) {
bool need_fetch,
bool enable_job_schedule_profiler) {
SetDeviceId(place_);
CheckCUDAGraphBeforeRun(feed_names);

Expand Down
8 changes: 6 additions & 2 deletions paddle/fluid/framework/new_executor/pir_interpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ class PirInterpreter : public InterpreterBaseImpl {
const std::vector<phi::DenseTensor>& feed_tensors,
bool need_fetch = true) override;

paddle::framework::FetchList Run(const std::vector<std::string>& feed_names,
bool need_fetch = true) override;
paddle::framework::FetchList Run(
const std::vector<std::string>& feed_names,
bool need_fetch = true,
bool enable_job_schedule_profiler = false) override;

void ShareWorkQueueFrom(InterpreterBaseImpl* src) override;

void ShareBuildResultsFrom(const InterpreterBaseImpl& src) override;

std::tuple<double, double> InterpreterRunTime() override;

std::shared_ptr<std::vector<size_t>> GetDependencyCount() const override;

bool IsSharedResultsBuild() const override;
Expand Down
53 changes: 51 additions & 2 deletions paddle/fluid/framework/new_executor/program_interpreter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ ProgramInterpreter::ProgramInterpreter(const platform::Place& place,
block_(block),
stream_analyzer_(place),
execution_config_(execution_config),
var_scope_(scope) {
var_scope_(scope),
enable_job_schedule_profiler_(false) {
VLOG(4) << "ProgramInterpreter(): " << this << " on " << place_;

exception_notifier_ = main_thread_blocker_.RegisterEvent(kExceptionCaught);
Expand Down Expand Up @@ -90,6 +91,10 @@ ProgramInterpreter::ProgramInterpreter(const platform::Place& place,
};

PrepareForCUDAGraphCapture();

#if defined(PADDLE_WITH_CUDA)
calculate_stream_timer_ = std::make_unique<phi::CalculateStreamTimer>(place);
#endif
}

ProgramInterpreter::~ProgramInterpreter() {
Expand Down Expand Up @@ -126,6 +131,7 @@ void ProgramInterpreter::RunImpl() {
async_work_queue_ = GetWorkQueue();
ExecuteInstructionList(vec_instruction_);
}

#ifdef PADDLE_WITH_CUSTOM_DEVICE
if (platform::is_custom_place(place_)) {
platform::DeviceContextPool::Instance().Get(place_)->Wait();
Expand All @@ -134,7 +140,10 @@ void ProgramInterpreter::RunImpl() {
}

FetchList ProgramInterpreter::Run(const std::vector<std::string>& feed_names,
bool need_fetch) {
bool need_fetch,
bool enable_job_schedule_profiler) {
enable_job_schedule_profiler_ = enable_job_schedule_profiler;

std::vector<paddle::framework::OpFuncNode> op_func_nodes;
Build(feed_names, &op_func_nodes);

Expand Down Expand Up @@ -633,6 +642,15 @@ void ProgramInterpreter::ClearLoDTensorArrayInLocalScope() {
}
}

std::tuple<double, double> ProgramInterpreter::InterpreterRunTime() {
double start_time = 0, end_time = 0;
#if defined(PADDLE_WITH_CUDA)
start_time = calculate_stream_timer_->StartTime();
end_time = calculate_stream_timer_->EndTime();
#endif
return std::make_tuple(start_time, end_time);
}

void ProgramInterpreter::Convert(
std::vector<paddle::framework::OpFuncNode>* op_func_nodes) {
auto& vec_meta_info = var_scope_.MutableVecMetaInfo();
Expand Down Expand Up @@ -1040,6 +1058,15 @@ void ProgramInterpreter::RunInstruction(const Instruction& instr_node) {

try {
instr_node.WaitEvent(place_);
#if defined(PADDLE_WITH_CUDA)
if (enable_job_schedule_profiler_) {
if (!calculate_stream_timer_->IsStarted() &&
!interpreter::IsCommunicationOp(instr_node)) {
VLOG(3) << "Start calculated stream timer from op: " << op->Type();
calculate_stream_timer_->Start();
}
}
#endif

if (!instr_node.IsArtificial()) {
RunOperator(instr_node);
Expand Down Expand Up @@ -1094,6 +1121,17 @@ void ProgramInterpreter::ExecuteInstructionList(

exception_holder_.Clear();

if (enable_job_schedule_profiler_) {
for (int i = vec_instr.size() - 1; i >= 0; --i) {
auto& instr_node = vec_instr[i];
if (!interpreter::IsCommunicationOp(instr_node)) {
VLOG(3) << "Last calculated op type: " << instr_node.OpBase()->Type();
last_calculate_instr_id_ = i;
break;
}
}
}

for (size_t i = 0; i < dependecy_count_->size(); ++i) {
if ((*dependecy_count_)[i] == 0) {
// NOTE(zhiqiu): hot fix for jit input var
Expand Down Expand Up @@ -1205,6 +1243,17 @@ void ProgramInterpreter::RunInstructionAsync(size_t instr_id) {

RunInstruction(instr_node);

#if defined(PADDLE_WITH_CUDA)
if (enable_job_schedule_profiler_) {
if (instr_id == last_calculate_instr_id_ &&
calculate_stream_timer_->IsStarted()) {
VLOG(3) << "Stop calculated stream timer from op: "
<< instr_node.OpBase()->Type();
calculate_stream_timer_->Stop();
}
}
#endif

if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(4) << "Exception caught";
if (exception_notifier_ != nullptr) {
Expand Down
18 changes: 16 additions & 2 deletions paddle/fluid/framework/new_executor/program_interpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

#include "paddle/fluid/framework/new_executor/interpreter_base_impl.h"

#if defined(PADDLE_WITH_CUDA)
#include "paddle/phi/kernels/autotune/gpu_timer.h"
#endif

namespace paddle {
namespace framework {

Expand Down Expand Up @@ -46,8 +50,10 @@ class ProgramInterpreter : public InterpreterBaseImpl {
const std::vector<phi::DenseTensor>& feed_tensors,
bool need_fetch = true) override;

paddle::framework::FetchList Run(const std::vector<std::string>& feed_names,
bool need_fetch = true) override;
paddle::framework::FetchList Run(
const std::vector<std::string>& feed_names,
bool need_fetch = true,
bool enable_job_schedule_profiler = false) override;

void Build(
const std::vector<std::string>& feed_names,
Expand Down Expand Up @@ -99,6 +105,8 @@ class ProgramInterpreter : public InterpreterBaseImpl {

bool IsStaticBuild() const override { return static_build_; }

std::tuple<double, double> InterpreterRunTime() override;

private:
// build graph
void Convert(std::vector<paddle::framework::OpFuncNode>* op_func_nodes);
Expand Down Expand Up @@ -211,6 +219,12 @@ class ProgramInterpreter : public InterpreterBaseImpl {
InstructionSchedulingPriorityLess instruction_scheduling_priority_less;

std::vector<HookFunc> hookfuncs_;

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
std::unique_ptr<phi::CalculateStreamTimer> calculate_stream_timer_;
#endif
size_t last_calculate_instr_id_;
bool enable_job_schedule_profiler_;
};

} // namespace framework
Expand Down
38 changes: 34 additions & 4 deletions paddle/fluid/framework/new_executor/standalone_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ StandaloneExecutor::StandaloneExecutor(const platform::Place& place,
}

paddle::framework::FetchList StandaloneExecutor::Run(
const std::vector<std::string>& feed_names) {
const std::vector<std::string>& feed_names,
const bool enable_job_schedule_profiler) {
platform::RecordEvent record_event(
"StandaloneExecutor::run", platform::TracerEventType::UserDefined, 1);

Expand Down Expand Up @@ -190,7 +191,7 @@ paddle::framework::FetchList StandaloneExecutor::Run(
VLOG(6) << "Run job (" << job_idx << "), type = " << job_type
<< ", micro_batch_id =" << job->MicroBatchId();

// Note(sonder): Share build results don't work for new IR now.
// NOTE(sonder): Share build results don't work for new IR now.
if (type_to_first_id.count(job_type) != 0 &&
!FLAGS_enable_pir_in_executor) {
interpretercores_[job_idx]->ShareBuildResultsFrom(
Expand All @@ -211,13 +212,42 @@ paddle::framework::FetchList StandaloneExecutor::Run(
if (jobs.size() > 1 && job_type != "forward") {
const std::vector<std::string> tmp_feed_names = {};
interpretercores_[job_idx]->Run(tmp_feed_names,
/*need_fetch = */ false);
/*need_fetch = */ false,
/*enable_job_schedule_profiler = */
enable_job_schedule_profiler);
} else {
interpretercores_[job_idx]->Run(feed_names, /*need_fetch = */ false);
interpretercores_[job_idx]->Run(feed_names,
/*need_fetch = */ false,
/*enable_job_schedule_profiler = */
enable_job_schedule_profiler);
}
}
}

// record each job's run time
#if defined(PADDLE_WITH_CUDA)
if (enable_job_schedule_profiler) {
for (size_t job_idx = 0; job_idx < jobs.size(); ++job_idx) {
const auto& job = jobs[job_idx];
const std::string& job_type = job->Type();
double start_time, end_time;
std::tie(start_time, end_time) =
interpretercores_[job_idx]->InterpreterRunTime();

// Note(sonder): Used to record the runtime of each job in order to
// generate a parallel pipeline timeline. Job runtime information can be
// extracted from the logs using the scripts "profiler_helper_static.py".
// Do not modify, as it may affect the results of regular expression
// matching.
VLOG(0) << "Profiler Info: Job (" << job->MicroBatchId()
<< "), type = " << job_type
<< ", micro_batch_id = " << job->MicroBatchId()
<< ", job_start_time = " << std::to_string(start_time)
<< ", job_end_time = " << std::to_string(end_time);
}
}
#endif

// return Fetch Tensors
if (FLAGS_enable_pir_in_executor) {
framework::FetchList fetch_res;
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/new_executor/standalone_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class StandaloneExecutor {

~StandaloneExecutor() {}

paddle::framework::FetchList Run(const std::vector<std::string>& feed_names);
paddle::framework::FetchList Run(
const std::vector<std::string>& feed_names,
const bool enable_job_schedule_profiler = false);

private:
bool is_interpretercore_build_result_shared_{false};
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2012,11 +2012,13 @@ All parameter, weight, gradient are variables in Paddle.
const interpreter::Plan &,
Scope *>())
.def("run",
[](StandaloneExecutor &self, std::vector<std::string> feed_names) {
[](StandaloneExecutor &self,
std::vector<std::string> feed_names,
bool enable_job_schedule_profiler = false) {
paddle::framework::FetchList ret;
{
pybind11::gil_scoped_release release;
ret = self.Run(feed_names);
ret = self.Run(feed_names, enable_job_schedule_profiler);
}
return py::cast(std::move(ret));
});
Expand Down
Loading

0 comments on commit 192a5f8

Please sign in to comment.