Skip to content

Commit 45cfef9

Browse files
authored
Merge ced693c into cb97bcd
2 parents cb97bcd + ced693c commit 45cfef9

File tree

8 files changed

+921
-761
lines changed

8 files changed

+921
-761
lines changed

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <ydb/library/yql/core/yql_opt_utils.h>
1010
#include <ydb/library/yql/dq/opt/dq_opt_join.h>
1111
#include <ydb/library/yql/dq/opt/dq_opt_log.h>
12+
#include <ydb/library/yql/dq/opt/dq_opt_hopping.h>
1213
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
1314
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
1415

@@ -111,8 +112,28 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
111112
}
112113

113114
TMaybeNode<TExprBase> RewriteAggregate(TExprBase node, TExprContext& ctx) {
114-
TExprBase output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
115-
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Ptr(), ctx);
115+
TMaybeNode<TExprBase> output;
116+
auto aggregate = node.Cast<TCoAggregateBase>();
117+
auto hopSetting = GetSetting(aggregate.Settings().Ref(), "hopping");
118+
if (hopSetting) {
119+
auto input = aggregate.Input().Maybe<TDqConnection>();
120+
if (!input) {
121+
return node;
122+
}
123+
output = NHopping::RewriteAsHoppingWindow(
124+
node,
125+
ctx,
126+
input.Cast(),
127+
false,
128+
TDuration::MilliSeconds(TDqSettings::TDefault::WatermarksLateArrivalDelayMs),
129+
true,
130+
false);
131+
} else {
132+
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
133+
}
134+
if (output) {
135+
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);
136+
}
116137
return output;
117138
}
118139

ydb/core/kqp/ut/opt/kqp_agg_ut.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,31 @@ Y_UNIT_TEST_SUITE(KqpAgg) {
8989
[["Value3"];[1]]
9090
])", FormatResultSetYson(result.GetResultSet(0)));
9191
}
92+
93+
Y_UNIT_TEST(AggWithHop) {
94+
TKikimrRunner kikimr;
95+
auto db = kikimr.GetTableClient();
96+
auto session = db.CreateSession().GetValueSync().GetSession();
97+
98+
auto result = session.ExecuteDataQuery(R"(
99+
--!syntax_v1
100+
101+
SELECT
102+
Text,
103+
CAST(COUNT(*) as Int32) as Count,
104+
SUM(Data)
105+
FROM EightShard
106+
GROUP BY HOP(CAST(Key AS Timestamp?), "PT1M", "PT1M", "PT1M"), Text
107+
ORDER BY Text;
108+
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
109+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
110+
CompareYson(R"([
111+
[["Value1"];[8];[15]];
112+
[["Value2"];[8];[16]];
113+
[["Value3"];[8];[17]]
114+
])", FormatResultSetYson(result.GetResultSet(0)));
115+
}
116+
92117
}
93118

94119
} // namespace NKikimr::NKqp

0 commit comments

Comments
 (0)