Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[XPUPS]Fix trace profile #14

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ if(TRACE_PROFILE)
link_directories("${SCALOPUS_PATH}/scalopus/so/scalopus_tracing")
link_directories("${SCALOPUS_PATH}/scalopus/so/thirdparty/seasocks/src/main/c/")
endif()

#windows treat symbolic file as a real file, which is different with unix
#We create a hidden file and compile it instead of origin source file.
function(windows_symbolic TARGET)
Expand Down
16 changes: 15 additions & 1 deletion paddle/fluid/framework/boxps_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ limitations under the License. */
#include "paddle/fluid/platform/collective_helper.h"
#endif

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -43,6 +44,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

DECLARE_bool(enable_sync_dense_moment);
DECLARE_bool(check_nan_inf);
Expand Down Expand Up @@ -742,9 +744,13 @@ void BoxPSWorker::TrainFilesWithProfiler() {
main_timer.Resume();

reader_timer.Resume();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("PackBatchTask", dev_ctx_->Wait());
#endif
batch_size = PackBatchTask();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("PackBatchTask", dev_ctx_->Wait());
#endif
reader_timer.Pause();
if (batch_size <= 0) {
break;
Expand All @@ -756,19 +762,27 @@ void BoxPSWorker::TrainFilesWithProfiler() {
cal_timer.Resume();
int op_id = 0;
dev_ctx_->Wait();
std::vector<std::string> op_names;
// std::vector<std::string> op_names;
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("ops run",);
#endif
for (auto& op : ops_) {
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_START((op->Type()+" run").c_str(),);
#endif
timeline.Start();
op->Run(*thread_scope_, place_);
dev_ctx_->Wait();
timeline.Pause();
op_total_time[op_id++] += timeline.ElapsedUS();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_END((op->Type()+" run").c_str(),);
#endif
}
dev_ctx_->Wait();
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("ops run",);
#endif
cal_timer.Pause();
#if defined(PADDLE_WITH_CUDA)
if (FLAGS_check_nan_inf) {
Expand Down
9 changes: 8 additions & 1 deletion paddle/fluid/framework/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -561,22 +561,29 @@ void Executor::RunPartialPreparedContext(ExecutorPrepareContext* ctx,
}
}

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("executor ops run",);
#endif
for (int64_t i = start_op_index; i < end_op_index; ++i) {
auto& op = ctx->ops_[i];
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
xpu_wait();
RUNTIME_TRACE_SCOPE_START((op->Type()+" run").c_str(),);
#endif
op->Run(*local_scope, place_);
if (gc) {
platform::RecordEvent record(
"CheckGC", platform::TracerEventType::UserDefined, 10);
DeleteUnusedTensors(*local_scope, op.get(), ctx->unused_vars_, gc.get());
}
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_END((op->Type()+" run").c_str(),);
xpu_wait();
#endif
}
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("executor ops run",);

#endif
auto callback = [scope, local_scope, keep_kids]() {
if (local_scope != scope) {
VLOG(4) << "Delete scope: " << local_scope;
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ limitations under the License. */
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/device_context.h"

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -36,6 +37,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

namespace paddle {
namespace framework {
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/fleet/box_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void BoxWrapper::EndPass(bool need_save_delta) {
<< "MB, available: " << (available >> 20) << "MB";
}
#endif
#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && defined(PADDLE_WITH_XPU_KP)
static int trace_pass_count = std::getenv("TRACE_PASS_NUM")!=NULL ?
std::stoi(std::string(std::getenv("TRACE_PASS_NUM"))):
1;
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/framework/fleet/box_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ limitations under the License. */
#include "paddle/fluid/string/string_helper.h"
#include "paddle/fluid/framework/fleet/metrics.h"
#include "paddle/fluid/framework/fleet/box_wrapper_kernel.h"

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -56,6 +58,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif
#define BUF_SIZE 1024 * 1024

DECLARE_int32(fix_dayid);
Expand Down Expand Up @@ -393,7 +396,7 @@ class BoxWrapper {
use_xpu_sparse_map_ = true;
}
#endif
#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// Client side to produce the tracepoints.
factory = std::make_shared<scalopus::TransportLoopbackFactory>();
const auto server = factory->serve();
Expand Down Expand Up @@ -909,7 +912,7 @@ class BoxWrapper {
std::set<std::string> slot_eval_set_;
std::atomic<uint16_t> dataset_id_{0};
std::atomic<uint16_t> round_id_{0};
#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
scalopus::TransportLoopbackFactory::Ptr factory;
std::shared_ptr<scalopus::EndpointManagerPoll> manager;
scalopus::CatapultRecorder::Ptr catapult_recorder;
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/fleet/box_wrapper_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License. */
#include <glog/logging.h>
#include <vector>

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -24,6 +25,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

DECLARE_bool(enable_pullpush_dedup_keys);

Expand Down
8 changes: 8 additions & 0 deletions paddle/fluid/framework/naive_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,26 @@ void NaiveExecutor::Run() {
platform::RegisterModelLayout(ops_, place_);
#endif
platform::ScopedFlushDenormal flush;
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("naive_executor ops run",);
#endif
for (auto &op : ops_) {
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
xpu_wait();
RUNTIME_TRACE_SCOPE_START((op->Type()+" run").c_str(),);
#endif
VLOG(4) << std::this_thread::get_id() << " run "
<< op->DebugStringEx(scope_) << " on scope " << scope_;
op->SetIsCalledByExecutor(false);
op->Run(*scope_, place_);
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
RUNTIME_TRACE_SCOPE_END((op->Type()+" run").c_str(),);
xpu_wait();
#endif
}
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("naive_executor ops run",);
#endif
}

void NaiveExecutor::CreateVariables(const ProgramDesc &desc,
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/naive_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -32,6 +33,7 @@
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

namespace phi {
class DenseTensor;
Expand Down
6 changes: 6 additions & 0 deletions paddle/fluid/framework/operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class DenseTensor;
#ifdef PADDLE_WITH_XPU
#include "paddle/fluid/platform/device/xpu/xpu_info.h"
#include "paddle/fluid/platform/device/xpu/xpu_op_list.h"
#endif

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand Down Expand Up @@ -1669,10 +1671,14 @@ void OperatorWithKernel::RunImpl(const Scope& scope,
1,
platform::EventRole::kInnerOp);
if (need_prepare_data_) {
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_START("PrepareData",);
#endif
transfer_scope = PrepareData(
scope, *kernel_type_, &transfered_inplace_vars, runtime_ctx);
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
TRACE_SCOPE_END("PrepareData",);//wait?
#endif
}
}
// exec scope is the scope that kernel actually executed on.
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/inference/api/analysis_predictor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,7 @@ AnalysisPredictor::~AnalysisPredictor() {
}
device_contexts_.clear();

#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// need to guarantee we propagate the tracepoints before we stop the interval.
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
catapult_recorder->stopInterval();
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/inference/api/analysis_predictor.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class AnalysisPredictor : public PaddlePredictor {
config_.EnableMemoryOptim(false);
}
predictor_id_ = inference::GetUniqueId();
#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// Client side to produce the tracepoints.
factory = std::make_shared<scalopus::TransportLoopbackFactory>();
const auto server = factory->serve();
Expand Down Expand Up @@ -556,7 +556,7 @@ class AnalysisPredictor : public PaddlePredictor {
#endif
friend class paddle_infer::experimental::InternalUtils;

#ifdef TRACE_PROFILE
#if defined(TRACE_PROFILE) && defined(PADDLE_WITH_XPU_KP)
scalopus::TransportLoopbackFactory::Ptr factory;
std::shared_ptr<scalopus::EndpointManagerPoll> manager;
scalopus::CatapultRecorder::Ptr catapult_recorder;
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/operators/collective/c_mixallgather_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ limitations under the License. */
#endif
#include "paddle/fluid/operators/tensor_formatter.h"

#if defined(TRACE_PROFILE) && (defined(PADDLE_WITH_XPU_KP) || defined(PADDLE_WITH_XPU))
// The producer side.
#include <scalopus_tracing/tracing.h>
#include <scalopus_transport/transport_loopback.h>
Expand All @@ -37,6 +38,7 @@ limitations under the License. */
#include <scalopus_general/endpoint_manager_poll.h>
#include <scalopus_general/general_provider.h>
#include <scalopus_tracing/native_trace_provider.h>
#endif

namespace paddle {
namespace operators {
Expand Down