Skip to content

Commit

Permalink
remove code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
APozdniakov committed Sep 19, 2024
1 parent 8ae12f4 commit 7557e9e
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 774 deletions.
107 changes: 106 additions & 1 deletion ydb/library/yql/core/common_opt/yql_co_simple1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3302,6 +3302,111 @@ TExprNode::TPtr RemoveDeadPayloadColumns(const TCoAggregate& aggr, TExprContext&
return aggr.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindowFullOutput(const TCoAggregate& aggregate, TExprContext& ctx) {
const auto pos = aggregate.Pos();

NHopping::EnsureNotDistinct(aggregate);

const auto maybeHopTraits = NHopping::ExtractHopTraits(aggregate, ctx, false);
if (!maybeHopTraits) {
return nullptr;
}
const auto hopTraits = *maybeHopTraits;

const auto aggregateInputType = GetSeqItemType(*aggregate.Ptr()->Head().GetTypeAnn()).Cast<TStructExprType>();
NHopping::TKeysDescription keysDescription(*aggregateInputType, aggregate.Keys(), hopTraits.Column);

// if (keysDescription.NeedPickle()) {
// return Build<TCoMap>(ctx, pos)
// .Lambda(keysDescription.BuildUnpickleLambda(ctx, pos, *aggregateInputType))
// .Input<TCoAggregate>()
// .InitFrom(aggregate)
// .Input<TCoMap>()
// .Lambda(keysDescription.BuildPickleLambda(ctx, pos))
// .Input(aggregate.Input())
// .Build()
// .Settings(RemoveSetting(aggregate.Settings().Ref(), "output_columns", ctx))
// .Build()
// .Done()
// .Ptr();
// }

const auto keyLambda = keysDescription.GetKeySelector(ctx, pos, aggregateInputType);
const auto timeExtractorLambda = NHopping::BuildTimeExtractor(hopTraits.Traits, ctx);
const auto initLambda = NHopping::BuildInitHopLambda(aggregate, ctx);
const auto updateLambda = NHopping::BuildUpdateHopLambda(aggregate, ctx);
const auto saveLambda = NHopping::BuildSaveHopLambda(aggregate, ctx);
const auto loadLambda = NHopping::BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = NHopping::BuildMergeHopLambda(aggregate, ctx);
const auto finishLambda = NHopping::BuildFinishHopLambda(aggregate, keysDescription.GetActualGroupKeys(), hopTraits.Column, ctx);

const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
.KeyExtractor(keyLambda)
.TimeExtractor(timeExtractorLambda)
.Hop(hopTraits.Traits.Hop())
.Interval(hopTraits.Traits.Interval())
.Delay(hopTraits.Traits.Delay())
.DataWatermarks(hopTraits.Traits.DataWatermarks())
.InitHandler(initLambda)
.UpdateHandler(updateLambda)
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
.LoadHandler(loadLambda)
.template WatermarkMode<TCoAtom>().Build(ToString(false));

return Build<TCoPartitionsByKeys>(ctx, pos)
.Input(aggregate.Input())
.KeySelectorLambda(keyLambda)
.SortDirections<TCoBool>()
.Literal()
.Value("true")
.Build()
.Build()
.SortKeySelectorLambda(timeExtractorLambda)
.ListHandlerLambda()
.Args(streamArg)
.template Body<TCoForwardList>()
.Stream(multiHoppingCoreBuilder
.template Input<TCoIterator>()
.List(streamArg)
.Build()
.Done())
.Build()
.Build()
.Done()
.Ptr();
}

TExprNode::TPtr RewriteAsHoppingWindow(TExprNode::TPtr node, TExprContext& ctx) {
const auto aggregate = TCoAggregate(node);

if (aggregate.Input().Ptr()->GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) {
return nullptr;
}

if (!GetSetting(aggregate.Settings().Ref(), "hopping")) {
return nullptr;
}

auto result = RewriteAsHoppingWindowFullOutput(aggregate, ctx);
if (!result) {
return result;
}

auto outputColumnSetting = GetSetting(aggregate.Settings().Ref(), "output_columns");
if (!outputColumnSetting) {
return result;
}

return Build<TCoExtractMembers>(ctx, aggregate.Pos())
.Input(result)
.Members(outputColumnSetting->ChildPtr(1))
.Done()
.Ptr();
}

TExprNode::TPtr PullAssumeColumnOrderOverEquiJoin(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
TVector<ui32> withAssume;
for (ui32 i = 0; i < node->ChildrenSize() - 2; i++) {
Expand Down Expand Up @@ -5081,7 +5186,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
return clean;
}

if (auto hopping = NHopping::RewriteAsHoppingWindow(node, ctx); hopping) {
if (auto hopping = RewriteAsHoppingWindow(node, ctx); hopping) {
return hopping;
}

Expand Down
Loading

0 comments on commit 7557e9e

Please sign in to comment.