Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions ydb/library/yql/providers/common/provider/yql_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1264,14 +1264,16 @@ void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& sta
writer.OnEndMap();
}

void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics) {
void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap) {
if (statistics.empty()) {
return;
}

THashMap<TString, std::tuple<i64, i64, i64, TMaybe<i64>>> total; // sum, count, max, min

writer.OnBeginMap();
if (addExternalMap) {
writer.OnBeginMap();
}

for (const auto& opStatistics : statistics) {
for (auto& el : opStatistics.second.Entries) {
Expand Down Expand Up @@ -1331,8 +1333,9 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<
writer.OnEndMap();
}
writer.OnEndMap(); // total

writer.OnEndMap();
if (addExternalMap) {
writer.OnEndMap();
}
}

bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/common/provider/yql_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ void WriteStreams(NYson::TYsonWriter& writer, TStringBuf name, const NNodes::TCo

double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx);

void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics);
void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics, bool addExternalMap = true);
void WriteStatistics(NYson::TYsonWriter& writer, const TOperationStatistics& statistics);

bool ValidateCompressionForInput(std::string_view format, std::string_view compression, TExprContext& ctx);
Expand Down
11 changes: 10 additions & 1 deletion ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,16 @@ class TYtDataSink : public TDataProviderBase {
return false;
}

NCommon::WriteStatistics(writer, totalOnly, State_->Statistics);
writer.OnBeginMap();
NCommon::WriteStatistics(writer, totalOnly, State_->Statistics, false);
writer.OnKeyedItem("Hybrid");
writer.OnBeginMap();
for (const auto& [opName, stats] : State_->HybridStatistics) {
writer.OnKeyedItem(opName);
NCommon::WriteStatistics(writer, totalOnly, {{0, stats}});
}
writer.OnEndMap();
writer.OnEndMap();

return true;
}
Expand Down
30 changes: 22 additions & 8 deletions ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,23 @@ bool NeedFallback(const TIssues& issues) {

TIssue WrapIssuesOnHybridFallback(TPosition pos, const TIssues& issues) {
TIssue result(pos, "Hybrid execution fallback on YT");
result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING);
result.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO);

const std::function<void(TIssue& issue)> toWarning = [&](TIssue& issue) {
if (issue.Severity == TSeverityIds::S_ERROR || issue.Severity == TSeverityIds::S_FATAL) {
issue.Severity = TSeverityIds::S_WARNING;
const std::function<void(TIssue& issue)> toInfo = [&](TIssue& issue) {
if (issue.Severity == TSeverityIds::S_ERROR
|| issue.Severity == TSeverityIds::S_FATAL
|| issue.Severity == TSeverityIds::S_WARNING) {
issue.Severity = TSeverityIds::S_INFO;
}
for (const auto& subissue : issue.GetSubIssues()) {
toWarning(*subissue);
toInfo(*subissue);
}
};

for (const auto& issue : issues) {
TIssuePtr warning(new TIssue(issue));
toWarning(*warning);
result.AddSubIssue(std::move(warning));
TIssuePtr info(new TIssue(issue));
toInfo(*info);
result.AddSubIssue(std::move(info));
}

return result;
Expand Down Expand Up @@ -125,6 +127,11 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
static TExprNode::TPtr FinalizeOutputOp(const TYtState::TPtr& state, const TString& operationHash,
const IYtGateway::TRunResult& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx, bool markFinished)
{
if (markFinished && !TYtDqProcessWrite::Match(input.Get())) {
with_lock(state->StatisticsMutex) {
state->HybridStatistics[input->Content()].Entries.emplace_back(TString{"YtExecution"}, 0, 0, 0, 0, 1);
}
}
auto outSection = TYtOutputOpBase(input).Output();
YQL_ENSURE(outSection.Size() == res.OutTableStats.size(), "Invalid output table count in IYtGateway::TRunResult");
TExprNode::TListType newOutTables;
Expand Down Expand Up @@ -286,14 +293,21 @@ class TYtDataSinkExecTransformer : public TExecTransformerBase {
State_->Statistics[Max<ui32>()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1);
}
};
auto hybridStatWriter = [this](TStringBuf statName, TStringBuf opName) {
with_lock(State_->StatisticsMutex) {
State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1);
}
};

switch (input->Head().GetState()) {
case TExprNode::EState::ExecutionComplete:
statWriter("HybridExecution");
hybridStatWriter("Execution", input->TailPtr()->Content());
output = input->HeadPtr();
break;
case TExprNode::EState::Error: {
statWriter("HybridFallback");
hybridStatWriter("Fallback", input->TailPtr()->Content());
if (State_->Configuration->HybridDqExecutionFallback.Get().GetOrElse(true)) {
output = input->TailPtr();
} else {
Expand Down
36 changes: 26 additions & 10 deletions ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
if (const auto stat = CanReadHybrid(sort.Input().Item(0))) {
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
YQL_CLOG(INFO, ProviderYt) << "Sort on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
PushStat("Hybrid_Sort_try");
PushStat("HybridTry");
PushHybridStat("Try", node.Raw()->Content());
return MakeYtSortByDq(sort, ctx);
}
PushStat("Hybrid_Sort_over_limits");
PushStat("HybridSkipOverLimits");
PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}
}
Expand All @@ -294,10 +296,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
if (const auto stat = CanReadHybrid(merge.Input().Item(0))) {
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
YQL_CLOG(INFO, ProviderYt) << "Merge on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
PushStat("Hybrid_Merge_try");
PushStat("HybridTry");
PushHybridStat("Try", node.Raw()->Content());
return MakeYtSortByDq(merge, ctx);
}
PushStat("Hybrid_Merge_over_limits");
PushStat("HybridSkipOverLimits");
PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}
}
Expand Down Expand Up @@ -347,7 +351,8 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) {
if (CanExecuteInHybrid(map.Mapper().Ptr(), chunksLimit, sizeLimit)) {
YQL_CLOG(INFO, ProviderYt) << "Map on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
PushStat("Hybrid_Map_try");
PushStat("HybridTry");
PushHybridStat("Try", node.Raw()->Content());
TSyncMap syncList;
const auto& paths = map.Input().Item(0).Paths();
for (auto i = 0U; i < paths.Size(); ++i) {
Expand Down Expand Up @@ -430,7 +435,8 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
.Done();
}
}
PushStat("Hybrid_Map_over_limits");
PushStat("HybridOverLimits");
PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}

Expand Down Expand Up @@ -626,12 +632,14 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
if (CanExecuteInHybrid(reduce.Reducer().Ptr(), chunksLimit, sizeLimit)) {
if (ETypeAnnotationKind::Struct == GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) {
YQL_CLOG(INFO, ProviderYt) << "Reduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
PushStat("Hybrid_Reduce_try");
PushStat("HybridTry");
PushHybridStat("Try", node.Raw()->Content());
return MakeYtReduceByDq(reduce, ctx);
}
}
}
PushStat("Hybrid_Reduce_over_limits");
PushStat("HybridSkipOverLimits");
PushHybridStat("SkipOverLimits", node.Raw()->Content());
}
}

Expand All @@ -647,12 +655,14 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
if (CanExecuteInHybrid(mapReduce.Reducer().Ptr(), chunksLimit, sizeLimit) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), chunksLimit, sizeLimit)) {
if (ETypeAnnotationKind::Struct == GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) {
YQL_CLOG(INFO, ProviderYt) << "MapReduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks.";
PushStat("Hybrid_MapReduce_try");
PushHybridStat("Try", node.Raw()->Content());
PushStat("HybridTry");
return MakeYtReduceByDq(mapReduce, ctx);
}
}
}
PushStat("Hybrid_MapReduce_over_limits");
PushHybridStat("SkipOverLimits", node.Raw()->Content());
PushStat("HybridOverLimits");
}
}

Expand All @@ -665,6 +675,12 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase {
}
};

void PushHybridStat(TStringBuf statName, TStringBuf opName) const {
with_lock(State_->StatisticsMutex) {
State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1);
}
};

const TYtState::TPtr State_;
const THolder<IGraphTransformer> Finalizer_;
};
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct TYtState : public TThrRefBase {
THashMap<std::pair<TString, TString>, TString> AnonymousLabels; // cluster + label -> name
std::unordered_map<ui64, TString> NodeHash; // unique id -> hash
THashMap<ui32, TOperationStatistics> Statistics; // public id -> stat
THashMap<TString, TOperationStatistics> HybridStatistics; // operation name -> stat
TMutex StatisticsMutex;
THashSet<std::pair<TString, TString>> Checkpoints; // Set of checkpoint tables
THolder<IDqIntegration> DqIntegration_;
Expand Down