22
33#include < ydb/library/yql/core/yql_expr_optimize.h>
44#include < ydb/library/yql/minikql/mkql_node_serialization.h>
5+ #include < ydb/library/yql/minikql/mkql_runtime_version.h>
56#include < ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
67
78namespace NYql ::NDq {
89
910using namespace NKikimr ::NMiniKQL;
1011using namespace NYql ::NNodes;
1112
13+
14+ class TSpillingTransformProvider {
15+ public:
16+
17+ TSpillingTransformProvider (const TSpillingSettings& spillingSettings): SpillingSettings(spillingSettings){};
18+
19+ TCallableVisitFunc operator ()(TInternName name) {
20+ if (RuntimeVersion >= 50U && SpillingSettings.EnableSpillingInGraceJoin && (name == " GraceJoin" || name == " GraceSelfJoin" )) {
21+ return [name](NKikimr::NMiniKQL::TCallable& callable, const TTypeEnvironment& env) {
22+ TCallableBuilder callableBuilder (env,
23+ TStringBuilder () << callable.GetType ()->GetName () << " WithSpilling" ,
24+ callable.GetType ()->GetReturnType (), false );
25+ for (ui32 i = 0 ; i < callable.GetInputsCount (); ++i) {
26+ callableBuilder.Add (callable.GetInput (i));
27+ }
28+ return TRuntimeNode (callableBuilder.Build (), false );
29+ };
30+ }
31+
32+ return TCallableVisitFunc ();
33+ }
34+
35+ private:
36+
37+ TSpillingSettings SpillingSettings;
38+ };
39+
1240const TStructExprType* CollectParameters (NNodes::TCoLambda program, TExprContext& ctx) {
1341 TVector<const TItemExprType*> memberTypes;
1442
@@ -27,7 +55,7 @@ const TStructExprType* CollectParameters(NNodes::TCoLambda program, TExprContext
2755
2856TString BuildProgram (NNodes::TCoLambda program, const TStructExprType& paramsType,
2957 const NCommon::IMkqlCallableCompiler& compiler, const TTypeEnvironment& typeEnv,
30- const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads)
58+ const IFunctionRegistry& funcRegistry, TExprContext& exprCtx, const TVector<TExprBase>& reads, const TSpillingSettings& spillingSettings )
3159{
3260 TProgramBuilder pgmBuilder (typeEnv, funcRegistry);
3361
@@ -49,6 +77,13 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
4977
5078 TRuntimeNode rootNode = MkqlBuildExpr (program.Body ().Ref (), ctx);
5179
80+ TExploringNodeVisitor explorer;
81+ if (spillingSettings) {
82+ explorer.Walk (rootNode.GetNode (), typeEnv);
83+ bool wereChanges = false ;
84+ rootNode = SinglePassVisitCallables (rootNode, explorer, TSpillingTransformProvider (spillingSettings), typeEnv, true , wereChanges);
85+ }
86+
5287 TStructLiteralBuilder structBuilder (typeEnv);
5388 structBuilder.Add (" Program" , rootNode);
5489 structBuilder.Add (" Inputs" , pgmBuilder.NewTuple (inputNodes));
@@ -64,7 +99,6 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
6499
65100 auto programNode = structBuilder.Build ();
66101
67- TExploringNodeVisitor explorer;
68102 explorer.Walk (programNode, typeEnv);
69103 ui32 uniqueId = 0 ;
70104 for (auto & node : explorer.GetNodes ()) {
0 commit comments