Skip to content

Commit

Permalink
Add initial sql support for distributed tables that don't have a shar…
Browse files Browse the repository at this point in the history
…d key (#6773/#6822)

Enable router planner and a limited version of INSERT .. SELECT planner
for the queries that reference colocated null shard key tables.

* SELECT / UPDATE / DELETE / MERGE is supported as long as it's a router
query.
* INSERT .. SELECT is supported as long as it only references colocated
  null shard key tables.

Note that this is not only limited to distributed INSERT .. SELECT but
also
covers a limited set of query types that require pull-to-coordinator,
e.g.,
  due to LIMIT clause, generate_series() etc. ...
(Ideally distributed INSERT .. SELECT could handle such queries too,
e.g.,
when we're only referencing tables that don't have a shard key, but
today
this is not the case. See
#6773 (comment).
  • Loading branch information
onurctirtir committed Apr 19, 2023
1 parent d66b35f commit dd346f6
Show file tree
Hide file tree
Showing 16 changed files with 3,375 additions and 21 deletions.
32 changes: 32 additions & 0 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,17 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
{
return distributedPlan;
}
else if (ContainsNullDistKeyTable(originalQuery))
{
/*
* We only support router queries if the query contains reference to
* a null-dist-key table. This temporary restriction will be removed
* once we support recursive planning for the queries that reference
* null-dist-key tables.
*/
WrapRouterErrorForNullDistKeyTable(distributedPlan->planningError);
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
else
{
RaiseDeferredError(distributedPlan->planningError, DEBUG2);
Expand Down Expand Up @@ -2462,6 +2473,18 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
}


/*
* ContainsNullDistKeyTable returns true if given query contains reference
* to a null-dist-key table.
*/
bool
ContainsNullDistKeyTable(Query *query)
{
RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query);
return rteListProperties->hasDistTableWithoutShardKey;
}


/*
* GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that
* returns RTEListProperties for the rte list retrieved from query.
Expand Down Expand Up @@ -2538,6 +2561,15 @@ GetRTEListProperties(List *rangeTableList)
else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE))
{
rteListProperties->hasDistributedTable = true;

if (!HasDistributionKeyCacheEntry(cacheEntry))
{
rteListProperties->hasDistTableWithoutShardKey = true;
}
else
{
rteListProperties->hasDistTableWithShardKey = true;
}
}
else
{
Expand Down
17 changes: 10 additions & 7 deletions src/backend/distributed/planner/fast_path_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false;
}

/*
* If the table doesn't have a distribution column, we don't need to
* check anything further.
*/
Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey)
{
return true;
}

/* WHERE clause should not be empty for distributed tables */
if (joinTree == NULL ||
(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals ==
Expand All @@ -220,13 +230,6 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false;
}

/* if that's a reference table, we don't need to check anything further */
Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey)
{
return true;
}

/* convert list of expressions into expression tree for further processing */
quals = joinTree->quals;
if (quals != NULL && IsA(quals, List))
Expand Down
92 changes: 87 additions & 5 deletions src/backend/distributed/planner/insert_select_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
PlannerRestrictionContext *
plannerRestrictionContext,
ParamListInfo boundParams);
static void ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery);
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
Expand Down Expand Up @@ -241,6 +242,12 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
RaiseDeferredError(deferredError, ERROR);
}

/*
* We support a limited set of INSERT .. SELECT queries if the query
* references a null-dist-key table.
*/
ErrorIfInsertSelectWithNullDistKeyNotSupported(originalQuery);

DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
plannerRestrictionContext);

Expand All @@ -260,6 +267,74 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
}


/*
* ErrorIfInsertSelectWithNullDistKeyNotSupported throws an error if given INSERT
* .. SELECT query references a null-dist-key table (as the target table or in
* the SELECT clause) and is unsupported.
*
* Such an INSERT .. SELECT query is supported as long as the it only references
* a "colocated" set of null-dist-key tables, no other relation rte types.
*/
static void
ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery)
{
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Query *subquery = subqueryRte->subquery;
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery);

RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
Oid targetRelationId = insertRte->relid;
if (!IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE) &&
subqueryRteListProperties->hasDistTableWithoutShardKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a distributed table that "
"does not have a shard key when inserting into "
"a different table type")));
}
else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
if (subqueryRteListProperties->hasPostgresLocalTable ||
subqueryRteListProperties->hasReferenceTable ||
subqueryRteListProperties->hasCitusLocalTable ||
subqueryRteListProperties->hasDistTableWithShardKey ||
subqueryRteListProperties->hasMaterializedView)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from different table types "
"when inserting into a distributed table "
"that does not have a shard key")));
}

if (!subqueryRteListProperties->hasDistTableWithoutShardKey)
{
/*
* This means that the SELECT doesn't reference any Citus tables,
* Postgres tables or materialized views but references a function
* call, a values claue etc., or a cte from INSERT.
*
* In that case, we rely on the common restrictions enforced by the
* INSERT .. SELECT planners.
*/
Assert(!NeedsDistributedPlanning(subquery));
return;
}

List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);

if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a non-colocated distributed "
"table when inserting into a distributed table "
"that does not have a shard key")));
}
}
}


/*
* CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed
* INSERT ... SELECT queries which could consist of multiple tasks.
Expand Down Expand Up @@ -379,6 +454,16 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
{
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);

RTEListProperties *selectRteListProperties =
GetRTEListPropertiesForQuery(selectRte->subquery);
if (selectRteListProperties->hasDistTableWithoutShardKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a distributed table that "
"does not have a shard key when inserting into "
"a local table")));
}

PrepareInsertSelectForCitusPlanner(insertSelectQuery);

/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Expand Down Expand Up @@ -717,10 +802,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
}
else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot target a distributed "
"table with a null shard key",
NULL, NULL);
/* we've already checked the subquery via ErrorIfInsertSelectWithNullDistKeyNotSupported */
}
else
{
Expand Down Expand Up @@ -874,7 +956,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
*/
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(
copiedSubquery);
if (subqueryRteListProperties->hasDistributedTable)
if (subqueryRteListProperties->hasDistTableWithShardKey)
{
AddPartitionKeyNotNullFilterToSelect(copiedSubquery);
}
Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ InsertDistributionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
return NULL;
}

if (!HasDistributionKey(resultRte->relid))
{
return NULL;
}

bool foundDistributionColumn = false;
MergeAction *action = NULL;
foreach_ptr(action, query->mergeActionList)
Expand Down
16 changes: 15 additions & 1 deletion src/backend/distributed/planner/multi_logical_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
if (!targetListOnPartitionColumn)
{
if (!FindNodeMatchingCheckFunctionInRangeTableList(query->rtable,
IsDistributedTableRTE))
IsTableWithDistKeyRTE))
{
targetListOnPartitionColumn = true;
}
Expand Down Expand Up @@ -379,6 +379,20 @@ IsReferenceTableRTE(Node *node)
}


/*
* IsTableWithDistKeyRTE gets a node and returns true if the node
* is a range table relation entry that points to a distributed table
* that has a distribution column.
*/
bool
IsTableWithDistKeyRTE(Node *node)
{
Oid relationId = NodeTryGetRteRelid(node);
return relationId != InvalidOid && IsCitusTable(relationId) &&
HasDistributionKey(relationId);
}


/*
* FullCompositeFieldList gets a composite field list, and checks if all fields
* of composite type are used in the list.
Expand Down
24 changes: 23 additions & 1 deletion src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,22 @@ CreateModifyPlan(Query *originalQuery, Query *query,
}


/*
* WrapRouterErrorForNullDistKeyTable wraps given planning error with a
* generic error message if given query references a distributed table
* that doesn't have a distribution key.
*/
void
WrapRouterErrorForNullDistKeyTable(DeferredErrorMessage *planningError)
{
planningError->detail = planningError->message;
planningError->message = pstrdup("queries that reference a distributed "
"table without a shard key can only "
"reference colocated distributed "
"tables or reference tables");
}


/*
* CreateSingleTaskRouterSelectPlan creates a physical plan for given SELECT query.
* The returned plan is a router task that returns query results from a single worker.
Expand Down Expand Up @@ -1870,6 +1886,11 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
*/
if (IsMergeQuery(originalQuery))
{
if (ContainsNullDistKeyTable(originalQuery))
{
WrapRouterErrorForNullDistKeyTable(*planningError);
}

RaiseDeferredError(*planningError, ERROR);
}
else
Expand Down Expand Up @@ -3854,7 +3875,8 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree)
CitusTableCacheEntry *modificationTableCacheEntry =
GetCitusTableCacheEntry(distributedTableId);

if (!HasDistributionKeyCacheEntry(modificationTableCacheEntry))
if (!IsCitusTableTypeCacheEntry(modificationTableCacheEntry,
DISTRIBUTED_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot router plan modification of a non-distributed table",
Expand Down
13 changes: 12 additions & 1 deletion src/include/distributed/distributed_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,19 @@ typedef struct RTEListProperties
bool hasReferenceTable;
bool hasCitusLocalTable;

/* includes hash, append and range partitioned tables */
/* includes hash, null dist key, append and range partitioned tables */
bool hasDistributedTable;

/*
* Effectively, hasDistributedTable is equal to
* "hasDistTableWithShardKey || hasDistTableWithoutShardKey".
*
* We provide below two for the callers that want to know what kind of
* distributed tables that given query has references to.
*/
bool hasDistTableWithShardKey;
bool hasDistTableWithoutShardKey;

/* union of hasReferenceTable, hasCitusLocalTable and hasDistributedTable */
bool hasCitusTable;

Expand Down Expand Up @@ -243,6 +253,7 @@ extern int32 BlessRecordExpression(Expr *expr);
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
struct DistributedPlan *distributedPlan);
extern bool ContainsNullDistKeyTable(Query *query);
extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query);


Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/multi_logical_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ extern bool IsCitusTableRTE(Node *node);
extern bool IsDistributedOrReferenceTableRTE(Node *node);
extern bool IsDistributedTableRTE(Node *node);
extern bool IsReferenceTableRTE(Node *node);
extern bool IsTableWithDistKeyRTE(Node *node);
extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte);
extern bool ContainsReadIntermediateResultFunction(Node *node);
extern bool ContainsReadIntermediateResultArrayFunction(Node *node);
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/multi_router_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query,
extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
extern void WrapRouterErrorForNullDistKeyTable(DeferredErrorMessage *planningError);
extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext,
Expand Down
6 changes: 0 additions & 6 deletions src/test/regress/citus_tests/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,6 @@ def __init__(self, arguments):
# group 8
"function_create",
"functions",
# group 9
"merge_arbitrary_create",
"merge_arbitrary",
# group 10
"arbitrary_configs_router_create",
"arbitrary_configs_router",
#
# ii) Skip the following test as it requires support for create_distributed_function.
"nested_execution",
Expand Down
1 change: 1 addition & 0 deletions src/test/regress/expected/create_null_dist_key.out
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,7 @@ ALTER TABLE trigger_table_3 ENABLE TRIGGER ALL;
-- try a few simple queries at least to make sure that we don't crash
BEGIN;
INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1;
ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key
ROLLBACK;
DROP TRIGGER IF EXISTS trigger_1 ON trigger_table_1;
DROP TRIGGER trigger_2 ON trigger_table_2 CASCADE;
Expand Down
Loading

0 comments on commit dd346f6

Please sign in to comment.