-
Notifications
You must be signed in to change notification settings - Fork 686
/
Copy pathdistributed_planner.c
2737 lines (2319 loc) · 79.6 KB
/
distributed_planner.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* distributed_planner.c
* General Citus planner code.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include <float.h>
#include <limits.h>
#include "postgres.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_class.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "optimizer/optimizer.h"
#include "optimizer/pathnode.h"
#include "optimizer/plancat.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "parser/parse_type.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "pg_version_constants.h"
#include "distributed/citus_depended_object.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/combine_query_planner.h"
#include "distributed/commands.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/cte_inline.h"
#include "distributed/distributed_planner.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/merge_planner.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/query_utils.h"
#include "distributed/recursive_planning.h"
#include "distributed/shard_utils.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#if PG_VERSION_NUM >= PG_VERSION_16
#include "parser/parse_relation.h"
#endif
/* RouterPlanType is used to determine the router plan to invoke */
typedef enum RouterPlanType
{
INSERT_SELECT_INTO_CITUS_TABLE,
INSERT_SELECT_INTO_LOCAL_TABLE,
DML_QUERY,
SELECT_QUERY,
MERGE_QUERY,
REPLAN_WITH_BOUND_PARAMETERS
} RouterPlanType;
static List *plannerRestrictionContextList = NIL;
int MultiTaskQueryLogLevel = CITUS_LOG_LEVEL_OFF; /* multi-task query log level */
static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext
*planContext);
static PlannedStmt * TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo
boundParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);
static int AssignRTEIdentities(List *rangeTableList, int rteIdCounter);
static void AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier);
static void AdjustPartitioningForDistributedPlanning(List *rangeTableList,
bool setPartitionedTablesInherited);
static bool RTEWentThroughAdjustPartitioning(RangeTblEntry *rangeTableEntry);
static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan,
DistributedPlan *distributedPlan,
CustomScan *customScan);
static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan);
static AppendRelInfo * FindTargetAppendRelInfo(PlannerInfo *root, int relationRteIndex);
static List * makeTargetListFromCustomScanList(List *custom_scan_tlist);
static List * makeCustomScanTargetlistFromExistingTargetList(List *existingTargetlist);
static int32 BlessRecordExpressionList(List *exprs);
static void CheckNodeIsDumpable(Node *node);
static Node * CheckNodeCopyAndSerialization(Node *node);
static void AdjustReadIntermediateResultCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo);
static void AdjustReadIntermediateResultArrayCost(RangeTblEntry *rangeTableEntry,
RelOptInfo *relOptInfo);
static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
List *columnTypes,
int resultIdCount,
Datum *resultIds,
Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList);
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
static RouterPlanType GetRouterPlanType(Query *query,
Query *originalQuery,
bool hasUnresolvedParams);
static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
PlannedStmt *concatPlan);
/* Distributed planner hook */
PlannedStmt *
distributed_planner(Query *parse,
const char *query_string,
int cursorOptions,
ParamListInfo boundParams)
{
bool needsDistributedPlanning = false;
bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL;
List *rangeTableList = ExtractRangeTableEntryList(parse);
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
{
/* this cursor flag could only be set when Citus has been loaded */
Assert(CitusHasBeenLoaded());
/*
* We cannot have merge command for this path as well because
* there cannot be recursively planned merge command.
*/
Assert(!IsMergeQuery(parse));
needsDistributedPlanning = true;
}
else if (CitusHasBeenLoaded())
{
bool maybeHasForeignDistributedTable = false;
needsDistributedPlanning =
ListContainsDistributedTableRTE(rangeTableList,
&maybeHasForeignDistributedTable);
if (needsDistributedPlanning)
{
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
if (maybeHasForeignDistributedTable)
{
WarnIfListHasForeignDistributedTable(rangeTableList);
}
}
}
int rteIdCounter = 1;
DistributedPlanningContext planContext = {
.query = parse,
.cursorOptions = cursorOptions,
.boundParams = boundParams,
};
if (needsDistributedPlanning)
{
/*
* standard_planner scribbles on its input, but for deparsing we need the
* unmodified form. Before copying we call AssignRTEIdentities to be able
* to match RTEs in the rewritten query tree with those in the original
* tree.
*/
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
planContext.originalQuery = copyObject(parse);
if (!fastPathRouterQuery)
{
/*
* When there are partitioned tables (not applicable to fast path),
* pretend that they are regular tables to avoid unnecessary work
* in standard_planner.
*/
bool setPartitionedTablesInherited = false;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
}
}
/*
* Make sure that we hide shard names on the Citus MX worker nodes. See comments in
* HideShardsFromSomeApplications() for the details.
*/
HideShardsFromSomeApplications(parse);
/*
* If GUC is set, we prevent queries, which contain pg meta relations, from
* showing any citus dependent object. The flag is expected to be set only before
* postgres vanilla tests.
*/
HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL);
/* create a restriction context and put it at the end if context list */
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
/*
* We keep track of how many times we've recursed into the planner, primarily
* to detect whether we are in a function call. We need to make sure that the
* PlannerLevel is decremented exactly once at the end of the next PG_TRY
* block, both in the happy case and when an error occurs.
*/
PlannerLevel++;
PlannedStmt *result = NULL;
PG_TRY();
{
if (fastPathRouterQuery)
{
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
}
else
{
/*
* Call into standard_planner because the Citus planner relies on both the
* restriction information per table and parse tree transformations made by
* postgres' planner.
*/
planContext.plan = standard_planner(planContext.query, NULL,
planContext.cursorOptions,
planContext.boundParams);
if (needsDistributedPlanning)
{
result = PlanDistributedStmt(&planContext, rteIdCounter);
}
else if ((result = TryToDelegateFunctionCall(&planContext)) == NULL)
{
result = planContext.plan;
}
}
}
PG_CATCH();
{
PopPlannerRestrictionContext();
PlannerLevel--;
PG_RE_THROW();
}
PG_END_TRY();
PlannerLevel--;
/* remove the context from the context list */
PopPlannerRestrictionContext();
/*
* In some cases, for example; parameterized SQL functions, we may miss that
* there is a need for distributed planning. Such cases only become clear after
* standard_planner performs some modifications on parse tree. In such cases
* we will simply error out.
*/
if (!needsDistributedPlanning && NeedsDistributedPlanning(parse))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this "
"query because parameterized queries for SQL "
"functions referencing distributed tables are "
"not supported"),
errhint("Consider using PL/pgSQL functions instead.")));
}
/*
* We annotate the query for tenant statisisics.
*/
AttributeQueryIfAnnotated(query_string, parse->commandType);
return result;
}
/*
* ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
* The function traverses the input query and returns all the range table
* entries that are in the query tree.
*/
List *
ExtractRangeTableEntryList(Query *query)
{
List *rteList = NIL;
ExtractRangeTableEntryWalker((Node *) query, &rteList);
return rteList;
}
/*
* NeedsDistributedPlanning returns true if the Citus extension is loaded and
* the query contains a distributed table.
*
* This function allows queries containing local tables to pass through the
* distributed planner. How to handle local tables is a decision that should
* be made within the planner
*/
bool
NeedsDistributedPlanning(Query *query)
{
if (!CitusHasBeenLoaded())
{
return false;
}
CmdType commandType = query->commandType;
if (commandType != CMD_SELECT && commandType != CMD_INSERT &&
commandType != CMD_UPDATE && commandType != CMD_DELETE)
{
return false;
}
List *allRTEs = ExtractRangeTableEntryList(query);
return ListContainsDistributedTableRTE(allRTEs, NULL);
}
/*
* ListContainsDistributedTableRTE gets a list of range table entries
* and returns true if there is at least one distributed relation range
* table entry in the list. The boolean maybeHasForeignDistributedTable
* variable is set to true if the list contains a foreign table.
*/
static bool
ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
if (rangeTableEntry->rtekind != RTE_RELATION)
{
continue;
}
if (HideCitusDependentObjects && IsolationIsSerializable() && IsPgLocksTable(
rangeTableEntry))
{
/*
* Postgres tidscan.sql test fails if we do not filter pg_locks table because
* test results, which show taken locks in serializable isolation mode,
* fails by showing extra lock taken by IsCitusTable below.
*/
continue;
}
if (IsCitusTable(rangeTableEntry->relid))
{
if (maybeHasForeignDistributedTable != NULL &&
IsForeignTable(rangeTableEntry->relid))
{
*maybeHasForeignDistributedTable = true;
}
return true;
}
}
return false;
}
/*
* AssignRTEIdentities function modifies query tree by adding RTE identities to the
* RTE_RELATIONs.
*
* Please note that, we want to avoid modifying query tree as much as possible
* because if PostgreSQL changes the way it uses modified fields, that may break
* our logic.
*
* Returns the next id. This can be used to call on a rangeTableList that may've
* been partially assigned. Should be set to 1 initially.
*/
static int
AssignRTEIdentities(List *rangeTableList, int rteIdCounter)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/*
* To be able to track individual RTEs through PostgreSQL's query
* planning, we need to be able to figure out whether an RTE is
* actually a copy of another, rather than a different one. We
* simply number the RTEs starting from 1.
*
* Note that we're only interested in RTE_RELATIONs and thus assigning
* identifiers to those RTEs only.
*/
if (rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->values_lists == NIL)
{
AssignRTEIdentity(rangeTableEntry, rteIdCounter++);
}
}
return rteIdCounter;
}
/*
* AdjustPartitioningForDistributedPlanning function modifies query tree by
* changing inh flag and relkind of partitioned tables. We want Postgres to
* treat partitioned tables as regular relations (i.e. we do not want to
* expand them to their partitions) since it breaks Citus planning in different
* ways. We let anything related to partitioning happen on the shards.
*
* Please note that, we want to avoid modifying query tree as much as possible
* because if PostgreSQL changes the way it uses modified fields, that may break
* our logic.
*/
static void
AdjustPartitioningForDistributedPlanning(List *rangeTableList,
bool setPartitionedTablesInherited)
{
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
/*
* We want Postgres to behave partitioned tables as regular relations
* (i.e. we do not want to expand them to their partitions). To do this
* we set each partitioned table's inh flag to appropriate
* value before and after dropping to the standart_planner.
*/
if (rangeTableEntry->rtekind == RTE_RELATION &&
PartitionedTable(rangeTableEntry->relid))
{
rangeTableEntry->inh = setPartitionedTablesInherited;
if (setPartitionedTablesInherited)
{
rangeTableEntry->relkind = RELKIND_PARTITIONED_TABLE;
}
else
{
rangeTableEntry->relkind = RELKIND_RELATION;
}
}
}
}
/*
* RTEWentThroughAdjustPartitioning returns true if the given rangetableentry
* has been modified through AdjustPartitioningForDistributedPlanning
* function, false otherwise.
*/
static bool
RTEWentThroughAdjustPartitioning(RangeTblEntry *rangeTableEntry)
{
return (rangeTableEntry->rtekind == RTE_RELATION &&
PartitionedTable(rangeTableEntry->relid) &&
rangeTableEntry->inh == false);
}
/*
* AssignRTEIdentity assigns the given rteIdentifier to the given range table
* entry.
*
* To be able to track RTEs through postgres' query planning, which copies and
* duplicate, and modifies them, we sometimes need to figure out whether two
* RTEs are copies of the same original RTE. For that we, hackishly, use a
* field normally unused in RTE_RELATION RTEs.
*
* The assigned identifier better be unique within a plantree.
*/
static void
AssignRTEIdentity(RangeTblEntry *rangeTableEntry, int rteIdentifier)
{
Assert(rangeTableEntry->rtekind == RTE_RELATION);
rangeTableEntry->values_lists = list_make2_int(rteIdentifier, rangeTableEntry->inh);
}
/* GetRTEIdentity returns the identity assigned with AssignRTEIdentity. */
int
GetRTEIdentity(RangeTblEntry *rte)
{
Assert(rte->rtekind == RTE_RELATION);
/*
* Since SQL functions might be in-lined by standard_planner,
* we might miss assigning an RTE identity for RangeTblEntries
* related to SQL functions. We already have checks in other
* places to throw an error for SQL functions but they are not
* sufficient due to function in-lining; so here we capture such
* cases and throw an error here.
*/
if (list_length(rte->values_lists) != 2)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this "
"query because parameterized queries for SQL "
"functions referencing distributed tables are "
"not supported"),
errhint("Consider using PL/pgSQL functions instead.")));
}
Assert(IsA(rte->values_lists, IntList));
return linitial_int(rte->values_lists);
}
/*
* GetOriginalInh gets the original value of the inheritance flag set by
* AssignRTEIdentity. The planner resets this flag in the rewritten query,
* but we need it during deparsing.
*/
bool
GetOriginalInh(RangeTblEntry *rte)
{
return lsecond_int(rte->values_lists);
}
/*
* GetQueryLockMode returns the necessary lock mode to be acquired for the
* given query. (See comment written in RangeTblEntry->rellockmode)
*/
LOCKMODE
GetQueryLockMode(Query *query)
{
if (IsModifyCommand(query))
{
return RowExclusiveLock;
}
else if (query->hasForUpdate)
{
return RowShareLock;
}
else
{
return AccessShareLock;
}
}
/*
* IsModifyCommand returns true if the query performs modifications, false
* otherwise.
*/
bool
IsModifyCommand(Query *query)
{
CmdType commandType = query->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE || commandType == CMD_MERGE)
{
return true;
}
return false;
}
/*
* IsMultiTaskPlan returns true if job contains multiple tasks.
*/
bool
IsMultiTaskPlan(DistributedPlan *distributedPlan)
{
Job *workerJob = distributedPlan->workerJob;
if (workerJob != NULL && list_length(workerJob->taskList) > 1)
{
return true;
}
return false;
}
/*
* PlanFastPathDistributedStmt creates a distributed planned statement using
* the FastPathPlanner.
*/
static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Node *distributionKeyValue)
{
FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext;
planContext->plannerRestrictionContext->fastPathRestrictionContext->
fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{
/* nothing to record */
}
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams);
return CreateDistributedPlannedStmt(planContext);
}
/*
* PlanDistributedStmt creates a distributed planned statement using the PG
* planner.
*/
static PlannedStmt *
PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter)
{
/* may've inlined new relation rtes */
List *rangeTableList = ExtractRangeTableEntryList(planContext->query);
rteIdCounter = AssignRTEIdentities(rangeTableList, rteIdCounter);
PlannedStmt *result = CreateDistributedPlannedStmt(planContext);
bool setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
return result;
}
/*
* DissuadePlannerFromUsingPlan try dissuade planner when planning a plan that
* potentially failed due to unresolved prepared statement parameters.
*/
void
DissuadePlannerFromUsingPlan(PlannedStmt *plan)
{
/*
* Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan().
*/
Assert(plan != NULL);
plan->planTree->total_cost = FLT_MAX / 100000000;
}
/*
* CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
* query into a distributed plan that is encapsulated by a PlannedStmt.
*/
static PlannedStmt *
CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
{
uint64 planId = NextPlanId++;
bool hasUnresolvedParams = false;
PlannedStmt *resultPlan = NULL;
if (QueryTreeContainsInlinableCTE(planContext->originalQuery))
{
/*
* Inlining CTEs as subqueries in the query can avoid recursively
* planning some (or all) of the CTEs. In other words, the inlined
* CTEs could become part of query pushdown planning, which is much
* more efficient than recursively planning. So, first try distributed
* planning on the inlined CTEs in the query tree.
*
* We also should fallback to distributed planning with non-inlined CTEs
* if the distributed planning fails with inlined CTEs, because recursively
* planning CTEs can provide full SQL coverage, although it might be slow.
*/
resultPlan = InlineCtesAndCreateDistributedPlannedStmt(planId, planContext);
if (resultPlan != NULL)
{
return resultPlan;
}
}
if (HasUnresolvedExternParamsWalker((Node *) planContext->originalQuery,
planContext->boundParams))
{
hasUnresolvedParams = true;
}
bool allowRecursivePlanning = true;
DistributedPlan *distributedPlan =
CreateDistributedPlan(planId, allowRecursivePlanning,
planContext->originalQuery,
planContext->query,
planContext->boundParams,
hasUnresolvedParams,
planContext->plannerRestrictionContext);
/*
* If no plan was generated, prepare a generic error to be emitted.
* Normally this error message will never returned to the user, as it's
* usually due to unresolved prepared statement parameters - in that case
* the logic below will force a custom plan (i.e. with parameters bound to
* specific values) to be generated. But sql (not plpgsql) functions
* unfortunately don't go through a codepath supporting custom plans - so
* we still need to have an error prepared.
*/
if (!distributedPlan)
{
/* currently always should have a more specific error otherwise */
Assert(hasUnresolvedParams);
distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->planningError =
DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"could not create distributed plan",
"Possibly this is caused by the use of parameters in SQL "
"functions, which is not supported in Citus.",
"Consider using PL/pgSQL functions instead.");
}
/*
* Error out if none of the planners resulted in a usable plan, unless the
* error was possibly triggered by missing parameters. In that case we'll
* not error out here, but instead rely on postgres' custom plan logic.
* Postgres re-plans prepared statements the first five executions
* (i.e. it produces custom plans), after that the cost of a generic plan
* is compared with the average custom plan cost. We support otherwise
* unsupported prepared statement parameters by assigning an exorbitant
* cost to the unsupported query. That'll lead to the custom plan being
* chosen. But for that to be possible we can't error out here, as
* otherwise that logic is never reached.
*/
if (distributedPlan->planningError && !hasUnresolvedParams)
{
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
/* remember the plan's identifier for identifying subplans */
distributedPlan->planId = planId;
/* create final plan by combining local plan with distributed plan */
resultPlan = FinalizePlan(planContext->plan, distributedPlan);
/*
* As explained above, force planning costs to be unrealistically high if
* query planning failed (possibly) due to prepared statement parameters or
* if it is planned as a multi shard modify query.
*/
if ((distributedPlan->planningError ||
(UpdateOrDeleteOrMergeQuery(planContext->originalQuery) && IsMultiTaskPlan(
distributedPlan))) &&
hasUnresolvedParams)
{
DissuadePlannerFromUsingPlan(resultPlan);
}
return resultPlan;
}
/*
* InlineCtesAndCreateDistributedPlannedStmt gets all the parameters required
* for creating a distributed planned statement. The function is primarily a
* wrapper on top of CreateDistributedPlannedStmt(), by first inlining the
* CTEs and calling CreateDistributedPlannedStmt() in PG_TRY() block. The
* function returns NULL if the planning fails on the query where eligable
* CTEs are inlined.
*/
static PlannedStmt *
InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext *planContext)
{
/*
* We'll inline the CTEs and try distributed planning, preserve the original
* query in case the planning fails and we fallback to recursive planning of
* CTEs.
*/
Query *copyOfOriginalQuery = copyObject(planContext->originalQuery);
RecursivelyInlineCtesInQueryTree(copyOfOriginalQuery);
/* after inlining, we shouldn't have any inlinable CTEs */
Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
/* simply recurse into CreateDistributedPlannedStmt() in a PG_TRY() block */
PlannedStmt *result = TryCreateDistributedPlannedStmt(planContext->plan,
copyOfOriginalQuery,
planContext->query,
planContext->boundParams,
planContext->
plannerRestrictionContext);
return result;
}
/*
* TryCreateDistributedPlannedStmt is a wrapper around CreateDistributedPlannedStmt, simply
* calling it in PG_TRY()/PG_CATCH() block. The function returns a PlannedStmt if the input
* query can be planned by Citus. If not, the function returns NULL and generates a DEBUG4
* message with the reason for the failure.
*/
static PlannedStmt *
TryCreateDistributedPlannedStmt(PlannedStmt *localPlan,
Query *originalQuery,
Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
MemoryContext savedContext = CurrentMemoryContext;
PlannedStmt *result = NULL;
DistributedPlanningContext *planContext = palloc0(sizeof(DistributedPlanningContext));
planContext->plan = localPlan;
planContext->boundParams = boundParams;
planContext->originalQuery = originalQuery;
planContext->query = query;
planContext->plannerRestrictionContext = plannerRestrictionContext;
PG_TRY();
{
result = CreateDistributedPlannedStmt(planContext);
}
PG_CATCH();
{
MemoryContextSwitchTo(savedContext);
ErrorData *edata = CopyErrorData();
FlushErrorState();
/* don't try to intercept PANIC or FATAL, let those breeze past us */
if (edata->elevel != ERROR)
{
PG_RE_THROW();
}
ereport(DEBUG4, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Planning after CTEs inlined failed with "
"\nmessage: %s\ndetail: %s\nhint: %s",
edata->message ? edata->message : "",
edata->detail ? edata->detail : "",
edata->hint ? edata->hint : "")));
/* leave the error handling system */
FreeErrorData(edata);
result = NULL;
}
PG_END_TRY();
return result;
}
/*
* GetRouterPlanType checks the parse tree to return appropriate plan type.
*/
static RouterPlanType
GetRouterPlanType(Query *query, Query *originalQuery, bool hasUnresolvedParams)
{
if (!IsModifyCommand(originalQuery))
{
return SELECT_QUERY;
}
Oid targetRelationId = ModifyQueryResultRelationId(query);
EnsureModificationsCanRunOnRelation(targetRelationId);
EnsurePartitionTableNotReplicated(targetRelationId);
/* Check the type of modification being done */
if (InsertSelectIntoCitusTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_CITUS_TABLE;
}
else if (InsertSelectIntoLocalTable(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return INSERT_SELECT_INTO_LOCAL_TABLE;
}
else if (IsMergeQuery(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return MERGE_QUERY;
}
else
{
return DML_QUERY;
}
}
/*
* CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps:
*
* 1. Try router planner
* 2. Generate subplans for CTEs and complex subqueries
* - If any, go back to step 1 by calling itself recursively
* 3. Logical planner
*/
DistributedPlan *
CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *originalQuery,
Query *query, ParamListInfo boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext)
{
DistributedPlan *distributedPlan = NULL;
bool hasCtes = originalQuery->cteList != NIL;
/* Step 1: Try router planner */
RouterPlanType routerPlan = GetRouterPlanType(query, originalQuery,
hasUnresolvedParams);
switch (routerPlan)
{
case INSERT_SELECT_INTO_CITUS_TABLE:
{
distributedPlan =
CreateInsertSelectPlan(planId,
originalQuery,
plannerRestrictionContext,
boundParams);
break;
}
case INSERT_SELECT_INTO_LOCAL_TABLE:
{
distributedPlan =