diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index 90cd077c9133..9342e1227053 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1779,8 +1779,7 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con return {}; } -TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx) { - TNodeSet flowSources; +TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx, TNodeSet flowSources) { TExprNode::TPtr from = root; if (root->IsLambda()) { if (IsIdentityLambda(*root)) { diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index c89e705e96b6..2980de885f1c 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -135,7 +135,7 @@ bool IsIdentityLambda(const TExprNode& lambda); TExprNode::TPtr MakeExpandMap(TPositionHandle pos, const TVector& columns, const TExprNode::TPtr& input, TExprContext& ctx); TExprNode::TPtr MakeNarrowMap(TPositionHandle pos, const TVector& columns, const TExprNode::TPtr& input, TExprContext& ctx); -TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx); +TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx, TNodeSet flowSources = TNodeSet()); bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx); bool IsStrict(const TExprNode::TPtr& node); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 1ced14fa6f39..7a8e6088f735 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -64,109 +64,162 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { } bool CanReplaceOnHybrid(const TYtOutputOpBase& operation) const { - if (!State_->IsHybridEnabledForCluster(operation.DataSink().Cluster().Value())) + if (!State_->IsHybridEnabledForCluster(operation.DataSink().Cluster().Value())) { + PushHybridStat("SkipDisabledCluster", operation.Raw()->Content()); return false; + } if (operation.Ref().StartsExecution() || operation.Ref().HasResult()) return false; - if (operation.Output().Size() != 1U) + if (operation.Output().Size() != 1U) { + PushHybridStat("SkipMultipleOutputs", operation.Raw()->Content()); + return false; + } + + if (const auto& trans = operation.Maybe(); trans && trans.Cast().Input().Size() != 1U) { + PushHybridStat("SkipMultipleInputs", operation.Raw()->Content()); return false; + } - if (const auto& trans = operation.Maybe(); trans && trans.Cast().Input().Size() != 1U) + const auto& settings = *operation.Ref().Child(4U); + if (HasSettingsExcept(settings, DqOpSupportedSettings)) { + if (!NYql::HasSetting(settings, EYtSettingType::NoDq)) { + PushHybridStat("SkipUnsupportedDqOpSettings", operation.Raw()->Content()); + } return false; + } - return !HasSettingsExcept(*operation.Ref().Child(4U), DqOpSupportedSettings); + return true; } bool HasDescOrderOutput(const TYtOutputOpBase& operation) const { TYqlRowSpecInfo outRowSpec(operation.Output().Item(0).RowSpec()); - return outRowSpec.IsSorted() && outRowSpec.HasAuxColumns(); + if (outRowSpec.IsSorted() && outRowSpec.HasAuxColumns()) { + PushHybridStat("SkipDescSort", operation.Raw()->Content()); + return true; + } + return false; } - std::optional> CanReadHybrid(const TYtSection& section) const { - if (HasSettingsExcept(section.Settings().Ref(), DqReadSupportedSettings)) - return std::nullopt; + bool CanReadHybrid(const TYtSection& section, const TStringBuf& nodeName, bool orderedInput) const { + if (HasSettingsExcept(section.Settings().Ref(), DqReadSupportedSettings)) { + PushHybridStat("SkipUnsupportedDqReadSettings", nodeName); + return false; + } - std::array stat = {{0ULL, 0ULL}}; + ui64 dataSize = 0ULL, dataChunks = 0ULL; for (const auto& path : section.Paths()) { - if (const TYtPathInfo info(path); info.Ranges) - return std::nullopt; - else if (const auto& tableInfo = info.Table; - !tableInfo || !tableInfo->Stat || !tableInfo->Meta || !tableInfo->RowSpec || tableInfo->Meta->IsDynamic || NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) - return std::nullopt; - else { - auto tableSize = tableInfo->Stat->DataSize; - if (tableInfo->Meta->Attrs.Value("erasure_codec", "none") != "none") { - if (const auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(tableInfo->Cluster)) { - tableSize *=* codecCpu; - } + const TYtPathInfo info(path); + const auto& tableInfo = info.Table; + if (!tableInfo || !tableInfo->Stat || !tableInfo->Meta || !tableInfo->RowSpec) { + return false; + } + const auto canUseYtPartitioningApi = State_->Configuration->_EnableYtPartitioning.Get(tableInfo->Cluster).GetOrElse(false); + if ((info.Ranges || tableInfo->Meta->IsDynamic) && !canUseYtPartitioningApi) { + return false; + } + if (NYql::HasSetting(tableInfo->Settings.Ref(), EYtSettingType::WithQB)) { + PushHybridStat("SkipWithQB", nodeName); + return false; + } + auto tableSize = tableInfo->Stat->DataSize; + if (tableInfo->Meta->Attrs.Value("erasure_codec", "none") != "none") { + if (const auto codecCpu = State_->Configuration->ErasureCodecCpuForDq.Get(tableInfo->Cluster)) { + tableSize *=* codecCpu; } - - stat.front() += tableSize; - stat.back() += tableInfo->Stat->ChunkCount; } + + dataSize += tableSize; + dataChunks += tableInfo->Stat->ChunkCount; + } + const auto chunksLimit = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); + + const auto sizeLimit = orderedInput ? + State_->Configuration->HybridDqDataSizeLimitForOrdered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForOrdered): + State_->Configuration->HybridDqDataSizeLimitForUnordered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForUnordered); + + if (dataSize > sizeLimit || dataChunks > chunksLimit) { + PushHybridStat("SkipOverLimits", nodeName); + return false; } - return stat; + + return true; } - TMaybeNode TryYtFillByDq(TExprBase node, TExprContext& ctx) const { - if (const auto fill = node.Cast(); CanReplaceOnHybrid(fill)) { - const auto sizeLimit = State_->Configuration->HybridDqDataSizeLimitForOrdered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForOrdered); - const auto chunksLimit = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); - if (bool flow = false; !FindNode(fill.Content().Ptr(), [&flow, sizeLimit, chunksLimit, this] (const TExprNode::TPtr& node) { - if (node->IsCallable(TCoForwardList::CallableName())) - return true; - if (node->IsCallable(TCoCollect::CallableName()) && ETypeAnnotationKind::List != node->Head().GetTypeAnn()->GetKind()) + bool CanExecuteInHybrid(const TExprNode::TPtr& handler, const TStringBuf& nodeName, bool orderedInput) const { + bool flow = false; + auto sources = FindNodes(handler, + [](const TExprNode::TPtr& node) { + return !TYtOutput::Match(node.Get()); + }, + [](const TExprNode::TPtr& node) { + return TYtTableContent::Match(node.Get()); + }); + TNodeSet flowSources; + std::for_each(sources.cbegin(), sources.cend(), [&flowSources](const TExprNode::TPtr& node) { flowSources.insert(node.Get()); }); + return !FindNonYieldTransparentNode(handler, *State_->Types, flowSources) && + !FindNode(handler, [&flow, this, &nodeName, orderedInput] (const TExprNode::TPtr& node) { + if (TCoScriptUdf::Match(node.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(node->Head().Content()))) { return true; - if (const auto tableContent = TMaybeNode(node)) { + } + + if (const auto& tableContent = TMaybeNode(node)) { if (!flow) return true; if (const auto& maybeRead = tableContent.Cast().Input().Maybe()) { - if (const auto& read = maybeRead.Cast(); 1U != read.Input().Size()) + const auto& read = maybeRead.Cast(); + if (1U != read.Input().Size()) { + PushHybridStat("SkipMultipleInputs", nodeName); return true; - else { - const auto stat = CanReadHybrid(read.Input().Item(0)); - return !stat || stat->front() > sizeLimit || stat->back() > chunksLimit; } + if(!CanReadHybrid(read.Input().Item(0), nodeName, orderedInput)) { + return true; + } + return false; } } flow = node->IsCallable(TCoToFlow::CallableName()) && node->Head().IsCallable(TYtTableContent::CallableName()); return false; - })) { - return Build(ctx, fill.Pos()) - .First() - .World(fill.World()) - .DataSink(fill.DataSink()) - .Output(fill.Output()) - .Input() - .Output() - .Stage() - .Inputs().Build() - .Program() - .Args({}) - .Body() - .Input(CloneCompleteFlow(fill.Content().Body().Ptr(), ctx)) - .Provider().Value(YtProviderName).Build() - .Settings().Build() - .Build() + }); + } + + TMaybeNode TryYtFillByDq(TExprBase node, TExprContext& ctx) const { + const TStringBuf nodeName = node.Raw()->Content(); + const auto fill = node.Cast(); + if (CanReplaceOnHybrid(fill) && CanExecuteInHybrid(fill.Content().Ptr(), nodeName, true)) { + YQL_CLOG(INFO, ProviderYt) << "Rewrite " << nodeName << " node by hybrid"; + return Build(ctx, fill.Pos()) + .First() + .World(fill.World()) + .DataSink(fill.DataSink()) + .Output(fill.Output()) + .Input() + .Output() + .Stage() + .Inputs().Build() + .Program() + .Args({}) + .Body() + .Input(CloneCompleteFlow(fill.Content().Body().Ptr(), ctx)) + .Provider().Value(YtProviderName).Build() + .Settings().Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos())) .Build() - .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) + .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos())) .Build() - .ColumnHints().Build() + .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() - .Flags().Add(GetHybridFlags(fill, ctx)).Build() + .ColumnHints().Build() .Build() - .Second() - .InitFrom(fill) - .Settings(NYql::AddSetting(fill.Settings().Ref(), EYtSettingType::NoDq, {}, ctx)) - .Build() - .Done(); - } + .Flags().Add(GetHybridFlags(fill, ctx)).Build() + .Build() + .Second() + .InitFrom(fill) + .Settings(NYql::AddSetting(fill.Settings().Ref(), EYtSettingType::NoDq, {}, ctx)) + .Build() + .Done(); } - return node; } @@ -265,182 +318,117 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { } TMaybeNode TryYtSortByDq(TExprBase node, TExprContext& ctx) const { - if (const auto sort = node.Cast(); CanReplaceOnHybrid(sort)) { - if (const auto sizeLimit = State_->Configuration->HybridDqDataSizeLimitForOrdered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForOrdered)) { - const auto info = TYtTableBaseInfo::Parse(sort.Input().Item(0).Paths().Item(0).Table()); - const auto chunksLimit = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); - if (const auto stat = CanReadHybrid(sort.Input().Item(0))) { - if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { - if (HasDescOrderOutput(sort)) { - PushStat("HybridSkipDescSort"); - return node; - } - YQL_CLOG(INFO, ProviderYt) << "Sort on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("HybridTry"); - PushHybridStat("Try", node.Raw()->Content()); - return MakeYtSortByDq(sort, ctx); - } - PushStat("HybridSkipOverLimits"); - PushHybridStat("SkipOverLimits", node.Raw()->Content()); - } - } + const auto sort = node.Cast(); + const TStringBuf nodeName = node.Raw()->Content(); + if (CanReplaceOnHybrid(sort) && CanReadHybrid(sort.Input().Item(0), node.Raw()->Content(), true) && !HasDescOrderOutput(sort)) { + PushHybridStat("Try", nodeName); + YQL_CLOG(INFO, ProviderYt) << "Rewrite " << nodeName << " node by hybrid"; + return MakeYtSortByDq(sort, ctx); } - return node; } TMaybeNode TryYtMergeByDq(TExprBase node, TExprContext& ctx) const { - if (const auto merge = node.Cast(); CanReplaceOnHybrid(merge)) { - if (const auto sizeLimit = State_->Configuration->HybridDqDataSizeLimitForOrdered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForOrdered)) { - const auto info = TYtTableBaseInfo::Parse(merge.Input().Item(0).Paths().Item(0).Table()); - const auto chunksLimit = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); - if (const auto stat = CanReadHybrid(merge.Input().Item(0))) { - if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { - if (HasDescOrderOutput(merge)) { - PushStat("HybridSkipDescSort"); - return node; - } - YQL_CLOG(INFO, ProviderYt) << "Merge on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("HybridTry"); - PushHybridStat("Try", node.Raw()->Content()); - return MakeYtSortByDq(merge, ctx); - } - PushStat("HybridSkipOverLimits"); - PushHybridStat("SkipOverLimits", node.Raw()->Content()); - } - } + const auto merge = node.Cast(); + const TStringBuf nodeName = node.Raw()->Content(); + if (CanReplaceOnHybrid(merge) && CanReadHybrid(merge.Input().Item(0), node.Raw()->Content(), true) && !HasDescOrderOutput(merge)) { + PushHybridStat("Try", nodeName); + YQL_CLOG(INFO, ProviderYt) << "Rewrite " << nodeName << " node by hybrid"; + return MakeYtSortByDq(merge, ctx); } - return node; } - bool CanExecuteInHybrid(const TExprNode::TPtr& handler, ui64 chunksLimit, ui64 sizeLimit) const { - bool flow = false; - return IsYieldTransparent(handler, *State_->Types) && - !FindNode(handler, [&flow, sizeLimit, chunksLimit, this] (const TExprNode::TPtr& node) { - if (TCoScriptUdf::Match(node.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(node->Head().Content()))) { - return true; - } - - if (const auto& tableContent = TMaybeNode(node)) { - if (!flow) - return true; - if (const auto& maybeRead = tableContent.Cast().Input().Maybe()) { - if (const auto& read = maybeRead.Cast(); 1U != read.Input().Size()) - return true; - else { - const auto stat = CanReadHybrid(read.Input().Item(0)); - return !stat || stat->front() > sizeLimit || stat->back() > chunksLimit; - } - } - } - flow = node->IsCallable(TCoToFlow::CallableName()) && node->Head().IsCallable(TYtTableContent::CallableName()); - return false; - }); - } - TMaybeNode TryYtMapByDq(TExprBase node, TExprContext& ctx) const { - if (const auto map = node.Cast(); CanReplaceOnHybrid(map)) { - const auto chunksLimit = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); - bool ordered = NYql::HasSetting(map.Settings().Ref(), EYtSettingType::Ordered); - if (!ordered) { - auto setting = NYql::GetSetting(map.Settings().Ref(), EYtSettingType::JobCount); - if (setting && FromString(setting->Child(1)->Content()) == 1) { - ordered = true; + const TStringBuf& nodeName = node.Raw()->Content(); + const auto map = node.Cast(); + bool ordered = NYql::HasSetting(map.Settings().Ref(), EYtSettingType::Ordered); + if (!ordered) { + auto setting = NYql::GetSetting(map.Settings().Ref(), EYtSettingType::JobCount); + if (setting && FromString(setting->Child(1)->Content()) == 1) { + ordered = true; + } + } + if (CanReplaceOnHybrid(map) && CanReadHybrid(map.Input().Item(0), nodeName, ordered) && CanExecuteInHybrid(map.Mapper().Ptr(), nodeName, ordered)) { + YQL_CLOG(INFO, ProviderYt) << "Rewrite " << nodeName << " node by hybrid"; + PushHybridStat("Try", nodeName); + TSyncMap syncList; + const auto& paths = map.Input().Item(0).Paths(); + for (auto i = 0U; i < paths.Size(); ++i) { + if (const auto mayOut = paths.Item(i).Table().Maybe()) { + syncList.emplace(GetOutputOp(mayOut.Cast()).Ptr(), syncList.size()); } } - const auto sizeLimit = ordered ? - State_->Configuration->HybridDqDataSizeLimitForOrdered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForOrdered): - State_->Configuration->HybridDqDataSizeLimitForUnordered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForUnordered); - if (const auto stat = CanReadHybrid(map.Input().Item(0))) { - if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { - if (CanExecuteInHybrid(map.Mapper().Ptr(), chunksLimit, sizeLimit)) { - YQL_CLOG(INFO, ProviderYt) << "Map on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("HybridTry"); - PushHybridStat("Try", node.Raw()->Content()); - TSyncMap syncList; - const auto& paths = map.Input().Item(0).Paths(); - for (auto i = 0U; i < paths.Size(); ++i) { - if (const auto mayOut = paths.Item(i).Table().Maybe()) { - syncList.emplace(GetOutputOp(mayOut.Cast()).Ptr(), syncList.size()); - } - } - auto newWorld = ApplySyncListToWorld(map.World().Ptr(), syncList, ctx); + auto newWorld = ApplySyncListToWorld(map.World().Ptr(), syncList, ctx); - auto settings = ctx.NewList(map.Input().Pos(), {}); - if (!ordered) { - settings = NYql::AddSetting(*settings, EYtSettingType::Split, nullptr, ctx); - } + auto settings = ctx.NewList(map.Input().Pos(), {}); + if (!ordered) { + settings = NYql::AddSetting(*settings, EYtSettingType::Split, nullptr, ctx); + } - auto stage = Build(ctx, map.Pos()) - .Inputs().Build() - .Program() - .Args({}) - .Body() - .Input() - .Apply(map.Mapper()) - .With(0) - .Input() - .Input() - .World().Build() - .DataSource() - .Category(map.DataSink().Category()) - .Cluster(map.DataSink().Cluster()) - .Build() - .Input(map.Input()) - .Build() - .Settings(std::move(settings)) - .Build() + auto stage = Build(ctx, map.Pos()) + .Inputs().Build() + .Program() + .Args({}) + .Body() + .Input() + .Apply(map.Mapper()) + .With(0) + .Input() + .Input() + .World().Build() + .DataSource() + .Category(map.DataSink().Category()) + .Cluster(map.DataSink().Cluster()) .Build() + .Input(map.Input()) .Build() - .Provider().Value(YtProviderName).Build() - .Settings().Build() + .Settings(std::move(settings)) .Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = ordered ? TDqStageSettings::EPartitionMode::Single : TDqStageSettings::EPartitionMode::Default}.BuildNode(ctx, map.Pos())) - .Done(); - - if (!ordered) { - stage = Build(ctx, map.Pos()) - .Inputs() - .Add() - .Output() - .Stage(std::move(stage)) - .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) - .Build() - .Build() - .Build() - .Program().Args({"pass"}).Body("pass").Build() - .Settings(TDqStageSettings().BuildNode(ctx, map.Pos())) - .Done(); - } + .Build() + .Provider().Value(YtProviderName).Build() + .Settings().Build() + .Build() + .Build() + .Settings(TDqStageSettings{.PartitionMode = ordered ? TDqStageSettings::EPartitionMode::Single : TDqStageSettings::EPartitionMode::Default}.BuildNode(ctx, map.Pos())) + .Done(); - return Build(ctx, map.Pos()) - .First() - .World(std::move(newWorld)) - .DataSink(map.DataSink()) - .Output(map.Output()) - .Input() - .Output() - .Stage(std::move(stage)) - .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) - .Build() - .ColumnHints().Build() - .Build() - .Flags().Add(GetHybridFlags(map, ctx)).Build() - .Build() - .Second() - .InitFrom(map) - .Settings(NYql::AddSetting(map.Settings().Ref(), EYtSettingType::NoDq, {}, ctx)) + if (!ordered) { + stage = Build(ctx, map.Pos()) + .Inputs() + .Add() + .Output() + .Stage(std::move(stage)) + .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() - .Done(); - } - } - PushStat("HybridOverLimits"); - PushHybridStat("SkipOverLimits", node.Raw()->Content()); + .Build() + .Build() + .Program().Args({"pass"}).Body("pass").Build() + .Settings(TDqStageSettings().BuildNode(ctx, map.Pos())) + .Done(); } - } + return Build(ctx, map.Pos()) + .First() + .World(std::move(newWorld)) + .DataSink(map.DataSink()) + .Output(map.Output()) + .Input() + .Output() + .Stage(std::move(stage)) + .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) + .Build() + .ColumnHints().Build() + .Build() + .Flags().Add(GetHybridFlags(map, ctx)).Build() + .Build() + .Second() + .InitFrom(map) + .Settings(NYql::AddSetting(map.Settings().Ref(), EYtSettingType::NoDq, {}, ctx)) + .Build() + .Done(); + } return node; } @@ -621,60 +609,40 @@ class TYtDqHybridTransformer : public TOptimizeTransformerBase { } TMaybeNode TryYtReduceByDq(TExprBase node, TExprContext& ctx) const { - if (const auto reduce = node.Cast(); CanReplaceOnHybrid(reduce)) { - const auto chunksLimit = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); - const auto sizeLimit = State_->Configuration->HybridDqDataSizeLimitForOrdered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForOrdered); - if (const auto stat = CanReadHybrid(reduce.Input().Item(0))) { - if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { - if (CanExecuteInHybrid(reduce.Reducer().Ptr(), chunksLimit, sizeLimit)) { - if (ETypeAnnotationKind::Struct == GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { - YQL_CLOG(INFO, ProviderYt) << "Reduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushStat("HybridTry"); - PushHybridStat("Try", node.Raw()->Content()); - return MakeYtReduceByDq(reduce, ctx); - } - } - } - PushStat("HybridSkipOverLimits"); - PushHybridStat("SkipOverLimits", node.Raw()->Content()); + const TStringBuf& nodeName = node.Raw()->Content(); + const auto reduce = node.Cast(); + if (CanReplaceOnHybrid(reduce) && CanReadHybrid(reduce.Input().Item(0), nodeName, true) && CanExecuteInHybrid(reduce.Reducer().Ptr(), nodeName, true)) { + if (ETypeAnnotationKind::Struct != GetSeqItemType(*reduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { + PushHybridStat("SkipNotStructReducerType", nodeName); + return node; } + YQL_CLOG(INFO, ProviderYt) << "Rewrite " << nodeName << " node by hybrid"; + PushHybridStat("Try", nodeName); + return MakeYtReduceByDq(reduce, ctx); } - return node; } TMaybeNode TryYtMapReduceByDq(TExprBase node, TExprContext& ctx) const { - if (const auto mapReduce = node.Cast(); CanReplaceOnHybrid(mapReduce)) { - const auto chunksLimit = State_->Configuration->MaxChunksForDqRead.Get().GetOrElse(DEFAULT_MAX_CHUNKS_FOR_DQ_READ); - const auto sizeLimit = State_->Configuration->HybridDqDataSizeLimitForOrdered.Get().GetOrElse(DefaultHybridDqDataSizeLimitForOrdered); - if (const auto stat = CanReadHybrid(mapReduce.Input().Item(0))) { - if (stat->front() <= sizeLimit && stat->back() <= chunksLimit) { - if (CanExecuteInHybrid(mapReduce.Reducer().Ptr(), chunksLimit, sizeLimit) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), chunksLimit, sizeLimit)) { - if (ETypeAnnotationKind::Struct == GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { - YQL_CLOG(INFO, ProviderYt) << "MapReduce on DQ with equivalent input size " << stat->front() << " and " << stat->back() << " chunks."; - PushHybridStat("Try", node.Raw()->Content()); - PushStat("HybridTry"); - return MakeYtReduceByDq(mapReduce, ctx); - } - } - } - PushHybridStat("SkipOverLimits", node.Raw()->Content()); - PushStat("HybridOverLimits"); + const TStringBuf& nodeName = node.Raw()->Content(); + const auto mapReduce = node.Cast(); + if (CanReplaceOnHybrid(mapReduce) && CanReadHybrid(mapReduce.Input().Item(0), nodeName, true) && + CanExecuteInHybrid(mapReduce.Reducer().Ptr(), nodeName, true) && CanExecuteInHybrid(mapReduce.Mapper().Ptr(), nodeName, true)) { + if (ETypeAnnotationKind::Struct != GetSeqItemType(*mapReduce.Reducer().Args().Arg(0).Ref().GetTypeAnn()).GetKind()) { + PushHybridStat("SkipNotStructReducerType", nodeName); + return node; } + YQL_CLOG(INFO, ProviderYt) << "Rewrite " << nodeName << " node by hybrid"; + PushHybridStat("Try", nodeName); + return MakeYtReduceByDq(mapReduce, ctx); } - return node; } - void PushStat(const std::string_view& name) const { - with_lock(State_->StatisticsMutex) { - State_->Statistics[Max()].Entries.emplace_back(TString{name}, 0, 0, 0, 0, 1); - } - }; - - void PushHybridStat(TStringBuf statName, TStringBuf opName) const { + void PushHybridStat(const TStringBuf& statName, const TStringBuf& nodeName) const { with_lock(State_->StatisticsMutex) { - State_->HybridStatistics[opName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); + State_->Statistics[Max()].Entries.emplace_back("Hybrid" + TString{statName}, 0, 0, 0, 0, 1); + State_->HybridStatistics[nodeName].Entries.emplace_back(TString{statName}, 0, 0, 0, 0, 1); } };