From 3a8355fbf2ad599931d69f1396d78923159aa0a2 Mon Sep 17 00:00:00 2001 From: Maxim Kovalev Date: Wed, 20 Dec 2023 07:26:11 +0000 Subject: [PATCH 1/2] YQL-17250: Add operation information into hybrid statistics --- .../providers/yt/provider/yql_yt_datasink.cpp | 12 +++++++- .../yt/provider/yql_yt_datasink_exec.cpp | 29 +++++++++++-------- .../yt/provider/yql_yt_dq_hybrid.cpp | 24 +++++++-------- .../providers/yt/provider/yql_yt_provider.h | 1 + 4 files changed, 41 insertions(+), 25 deletions(-) diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp index b76aeaba4d65..0517acb9edf5 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp @@ -160,7 +160,17 @@ class TYtDataSink : public TDataProviderBase { return false; } - NCommon::WriteStatistics(writer, totalOnly, State_->Statistics); + writer.OnBeginMap(); + writer.OnKeyedItem("All"); + NCommon::WriteStatistics(writer, totalOnly, State_->Statistics); + writer.OnKeyedItem("Hybrid"); + writer.OnBeginMap(); + for (const auto& [opName, stats] : State_->HybridStatistics) { + writer.OnKeyedItem(opName); + NCommon::WriteStatistics(writer, totalOnly, {{0, stats}}); + } + writer.OnEndMap(); + writer.OnEndMap(); return true; } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 617a7197708d..23f2c16d4fef 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -57,21 +57,23 @@ bool NeedFallback(const TIssues& issues) { TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues) { TIssue result(pos, "Hybrid execution fallback on YT"); - result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING); + result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO); - const std::function toWarning = [&](TIssue& issue) { - if (issue.Severity == TSeverityIds::S_ERROR || issue.Severity == TSeverityIds::S_FATAL) { - issue.Severity = TSeverityIds::S_WARNING; + const std::function toInfo = [&](TIssue& issue) { + if (issue.Severity == TSeverityIds::S_ERROR + || issue.Severity == TSeverityIds::S_FATAL + || issue.Severity == TSeverityIds::S_WARNING) { + issue.Severity = TSeverityIds::S_INFO; } for (const auto& subissue : issue.GetSubIssues()) { - toWarning(*subissue); + toInfo(*subissue); } }; for (const auto& issue : issues) { - TIssuePtr warning(new TIssue(issue)); - toWarning(*warning); - result.AddSubIssue(std::move(warning)); + TIssuePtr info(new TIssue(issue)); + toInfo(*info); + result.AddSubIssue(std::move(info)); } return result; @@ -125,6 +127,9 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { static TExprNode::TPtr FinalizeOutputOp(const TYtState::TPtr& state, const TString& operationHash, const IYtGateway::TRunResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, bool markFinished) { + with_lock(state->StatisticsMutex) { + state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1); + } auto outSection = TYtOutputOpBase(input).Output(); YQL_ENSURE(outSection.Size() == res.OutTableStats.size(), "Invalid output table count in IYtGateway::TRunResult"); TExprNode::TListType newOutTables; @@ -281,19 +286,19 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { } TStatusCallbackPair HandleTryFirst(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext&) { - auto statWriter = [this](TStringBuf name) { + auto statWriter = [this](TStringBuf statName, TStringBuf opName) { with_lock(State_->StatisticsMutex) { - State_->Statistics[Max()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1); + State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); } }; switch (input->Head().GetState()) { case TExprNode::EState::ExecutionComplete: - statWriter("HybridExecution"); + statWriter("Execution", input->TailPtr()->Content()); output = input->HeadPtr(); break; case TExprNode::EState::Error: { - statWriter("HybridFallback"); + statWriter("Fallback", input->TailPtr()->Content()); if (State_->Configuration->HybridDqExecutionFallback.Get().GetOrElse(true)) { output = input->TailPtr(); } else { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 5ca774525f42..433a2f58c4d8 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -275,10 +275,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (const auto stat = CanReadHybrid(sort.Input().Item(0))) { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { YQL_CLOG(INFO, ProviderYt) << "Sort on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Sort_try"); + PushStat("Try", node.Raw()->Content()); return MakeYtSortByDq(sort, ctx); } - PushStat("Hybrid_Sort_over_limits"); + PushStat("SkipOverLimits", node.Raw()->Content()); } } } @@ -294,10 +294,10 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (const auto stat = CanReadHybrid(merge.Input().Item(0))) { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { YQL_CLOG(INFO, ProviderYt) << "Merge on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Merge_try"); + PushStat("Try", node.Raw()->Content()); return MakeYtSortByDq(merge, ctx); } - PushStat("Hybrid_Merge_over_limits"); + PushStat("SkipOverLimits", node.Raw()->Content()); } } } @@ -347,7 +347,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { if (CanExecuteInHybrid(map.Mapper().Ptr(), chunksLimit, sizeLimit)) { YQL_CLOG(INFO, ProviderYt) << "Map on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Map_try"); + PushStat("Try", node.Raw()->Content()); TSyncMap syncList; const auto& paths = map.Input().Item(0).Paths(); for (auto i = 0U; i < paths.Size(); ++i) { @@ -430,7 +430,7 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Done(); } } - PushStat("Hybrid_Map_over_limits"); + PushStat("SkipOverLimits", node.Raw()->Content()); } } @@ -626,12 +626,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (CanExecuteInHybrid(reduce.Reducer().Ptr(), chunksLimit, sizeLimit)) { if (ETypeAnnotationKind::Struct == GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { YQL_CLOG(INFO, ProviderYt) << "Reduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_Reduce_try"); + PushStat("Try", node.Raw()->Content()); return MakeYtReduceByDq(reduce, ctx); } } } - PushStat("Hybrid_Reduce_over_limits"); + PushStat("SkipOverLimits", node.Raw()->Content()); } } @@ -647,21 +647,21 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (CanExecuteInHybrid(mapReduce.Reducer().Ptr(), chunksLimit, sizeLimit) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), chunksLimit, sizeLimit)) { if (ETypeAnnotationKind::Struct == GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { YQL_CLOG(INFO, ProviderYt) << "MapReduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Hybrid_MapReduce_try"); + PushStat("Try", node.Raw()->Content()); return MakeYtReduceByDq(mapReduce, ctx); } } } - PushStat("Hybrid_MapReduce_over_limits"); + PushStat("SkipOverLimits", node.Raw()->Content()); } } return node; } - void PushStat(const std::string_view& name) const { + void PushStat(TStringBuf statName, TStringBuf opName) const { with_lock(State_->StatisticsMutex) { - State_->Statistics[Max()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1); + State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); } }; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h index 90ad1d58fa27..06d9d8f747a6 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h @@ -96,6 +96,7 @@ struct TYtState : public TThrRefBase { THashMap, TString> AnonymousLabels; // cluster + label -> name std::unordered_map NodeHash; // unique id -> hash THashMap Statistics; // public id -> stat + THashMap HybridStatistics; // operation name -> stat TMutex StatisticsMutex; THashSet> Checkpoints; // Set of checkpoint tables THolder DqIntegration_; From b333f2c9616ab97422d252256770222cea759b7d Mon Sep 17 00:00:00 2001 From: Maxim Kovalev Date: Wed, 20 Dec 2023 13:39:00 +0000 Subject: [PATCH 2/2] Fix review comments --- .../common/provider/yql_provider.cpp | 11 ++++-- .../providers/common/provider/yql_provider.h | 2 +- .../providers/yt/provider/yql_yt_datasink.cpp | 3 +- .../yt/provider/yql_yt_datasink_exec.cpp | 19 +++++++--- .../yt/provider/yql_yt_dq_hybrid.cpp | 38 +++++++++++++------ 5 files changed, 50 insertions(+), 23 deletions(-) diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index fc62e922c874..b2bbb2933f23 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -1264,14 +1264,16 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta writer.OnEndMap(); } -void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap& statistics) { +void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap& statistics, bool addExternalMap) { if (statistics.empty()) { return; } THashMap>> total; // sum, count, max, min - writer.OnBeginMap(); + if (addExternalMap) { + writer.OnBeginMap(); + } for (const auto& opStatistics : statistics) { for (auto& el : opStatistics.second.Entries) { @@ -1331,8 +1333,9 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap< writer.OnEndMap(); } writer.OnEndMap(); // total - - writer.OnEndMap(); + if (addExternalMap) { + writer.OnEndMap(); + } } bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx) { diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index a6ba6d285d79..ca75ad6909e7 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -179,7 +179,7 @@ void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const NNodes::TCo double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx); -void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap& statistics); +void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap& statistics, bool addExternalMap = true); void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics); bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp index 0517acb9edf5..ac8b7b9d4465 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp @@ -161,8 +161,7 @@ class TYtDataSink : public TDataProviderBase { } writer.OnBeginMap(); - writer.OnKeyedItem("All"); - NCommon::WriteStatistics(writer, totalOnly, State_->Statistics); + NCommon::WriteStatistics(writer, totalOnly, State_->Statistics, false); writer.OnKeyedItem("Hybrid"); writer.OnBeginMap(); for (const auto& [opName, stats] : State_->HybridStatistics) { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 23f2c16d4fef..74e69aed0838 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -127,8 +127,10 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { static TExprNode::TPtr FinalizeOutputOp(const TYtState::TPtr& state, const TString& operationHash, const IYtGateway::TRunResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, bool markFinished) { - with_lock(state->StatisticsMutex) { - state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1); + if (markFinished && !TYtDqProcessWrite::Match(input.Get())) { + with_lock(state->StatisticsMutex) { + state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1); + } } auto outSection = TYtOutputOpBase(input).Output(); YQL_ENSURE(outSection.Size() == res.OutTableStats.size(), "Invalid output table count in IYtGateway::TRunResult"); @@ -286,7 +288,12 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { } TStatusCallbackPair HandleTryFirst(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext&) { - auto statWriter = [this](TStringBuf statName, TStringBuf opName) { + auto statWriter = [this](TStringBuf name) { + with_lock(State_->StatisticsMutex) { + State_->Statistics[Max()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1); + } + }; + auto hybridStatWriter = [this](TStringBuf statName, TStringBuf opName) { with_lock(State_->StatisticsMutex) { State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); } @@ -294,11 +301,13 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase { switch (input->Head().GetState()) { case TExprNode::EState::ExecutionComplete: - statWriter("Execution", input->TailPtr()->Content()); + statWriter("HybridExecution"); + hybridStatWriter("Execution", input->TailPtr()->Content()); output = input->HeadPtr(); break; case TExprNode::EState::Error: { - statWriter("Fallback", input->TailPtr()->Content()); + statWriter("HybridFallback"); + hybridStatWriter("Fallback", input->TailPtr()->Content()); if (State_->Configuration->HybridDqExecutionFallback.Get().GetOrElse(true)) { output = input->TailPtr(); } else { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 433a2f58c4d8..7f79548d85d6 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -275,10 +275,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (const auto stat = CanReadHybrid(sort.Input().Item(0))) { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { YQL_CLOG(INFO, ProviderYt) << "Sort on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Try", node.Raw()->Content()); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); return MakeYtSortByDq(sort, ctx); } - PushStat("SkipOverLimits", node.Raw()->Content()); + PushStat("HybridSkipOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } } @@ -294,10 +296,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (const auto stat = CanReadHybrid(merge.Input().Item(0))) { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { YQL_CLOG(INFO, ProviderYt) << "Merge on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Try", node.Raw()->Content()); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); return MakeYtSortByDq(merge, ctx); } - PushStat("SkipOverLimits", node.Raw()->Content()); + PushStat("HybridSkipOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } } @@ -347,7 +351,8 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { if (CanExecuteInHybrid(map.Mapper().Ptr(), chunksLimit, sizeLimit)) { YQL_CLOG(INFO, ProviderYt) << "Map on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Try", node.Raw()->Content()); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); TSyncMap syncList; const auto& paths = map.Input().Item(0).Paths(); for (auto i = 0U; i < paths.Size(); ++i) { @@ -430,7 +435,8 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { .Done(); } } - PushStat("SkipOverLimits", node.Raw()->Content()); + PushStat("HybridOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } @@ -626,12 +632,14 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (CanExecuteInHybrid(reduce.Reducer().Ptr(), chunksLimit, sizeLimit)) { if (ETypeAnnotationKind::Struct == GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { YQL_CLOG(INFO, ProviderYt) << "Reduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Try", node.Raw()->Content()); + PushStat("HybridTry"); + PushHybridStat("Try", node.Raw()->Content()); return MakeYtReduceByDq(reduce, ctx); } } } - PushStat("SkipOverLimits", node.Raw()->Content()); + PushStat("HybridSkipOverLimits"); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); } } @@ -647,19 +655,27 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { if (CanExecuteInHybrid(mapReduce.Reducer().Ptr(), chunksLimit, sizeLimit) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), chunksLimit, sizeLimit)) { if (ETypeAnnotationKind::Struct == GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { YQL_CLOG(INFO, ProviderYt) << "MapReduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("Try", node.Raw()->Content()); + PushHybridStat("Try", node.Raw()->Content()); + PushStat("HybridTry"); return MakeYtReduceByDq(mapReduce, ctx); } } } - PushStat("SkipOverLimits", node.Raw()->Content()); + PushHybridStat("SkipOverLimits", node.Raw()->Content()); + PushStat("HybridOverLimits"); } } return node; } - void PushStat(TStringBuf statName, TStringBuf opName) const { + void PushStat(const std::string_view& name) const { + with_lock(State_->StatisticsMutex) { + State_->Statistics[Max()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1); + } + }; + + void PushHybridStat(TStringBuf statName, TStringBuf opName) const { with_lock(State_->StatisticsMutex) { State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); }