Skip to content

Commit

Permalink
Add support for more pushable / non-pushable insert .. select queries…
Browse files Browse the repository at this point in the history
… with null-shard-key tables (#6823)

* Add support for dist insert select by selecting from a reference
table.
  
  This was the only pushable insert .. select case that
  #6773 didn't cover.

* For the cases where we insert into a Citus table but the INSERT ..
SELECT
  query cannot be pushed down, allow pull-to-coordinator when possible.

  Remove the checks that we had at the very beginning of
  CreateInsertSelectPlanInternal so that we can try insert .. select via
  pull-to-coordinator for the cases where we cannot push-down the insert
  .. select query. What we support via pull-to-coordinator is still
  limited due to lacking of logical planner support for SELECT queries,
but this commit at least allows using pull-to-coordinator for the cases
  where the select query can be planned via router planner, without
  limiting ourselves to restrictive top-level checks.

  Also introduce some additional restrictions into
CreateDistributedInsertSelectPlan for the cases it was missing to check
  for null-shard-key tables. Indeed, it would make more sense to have
those checks for distributed tables in general, via separate PRs against
  main branch. See #6817.

* Add support for inserting into a Postgres table.
  • Loading branch information
onurctirtir authored Apr 24, 2023
1 parent dd346f6 commit 400f58d
Show file tree
Hide file tree
Showing 7 changed files with 1,484 additions and 128 deletions.
156 changes: 52 additions & 104 deletions src/backend/distributed/planner/insert_select_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
PlannerRestrictionContext *
plannerRestrictionContext,
ParamListInfo boundParams);
static void ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery);
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
Expand Down Expand Up @@ -242,12 +241,6 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
RaiseDeferredError(deferredError, ERROR);
}

/*
* We support a limited set of INSERT .. SELECT queries if the query
* references a null-dist-key table.
*/
ErrorIfInsertSelectWithNullDistKeyNotSupported(originalQuery);

DistributedPlan *distributedPlan = CreateDistributedInsertSelectPlan(originalQuery,
plannerRestrictionContext);

Expand All @@ -267,74 +260,6 @@ CreateInsertSelectPlanInternal(uint64 planId, Query *originalQuery,
}


/*
* ErrorIfInsertSelectWithNullDistKeyNotSupported throws an error if given INSERT
* .. SELECT query references a null-dist-key table (as the target table or in
* the SELECT clause) and is unsupported.
*
* Such an INSERT .. SELECT query is supported as long as the it only references
* a "colocated" set of null-dist-key tables, no other relation rte types.
*/
static void
ErrorIfInsertSelectWithNullDistKeyNotSupported(Query *originalQuery)
{
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Query *subquery = subqueryRte->subquery;
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery);

RangeTblEntry *insertRte = ExtractResultRelationRTEOrError(originalQuery);
Oid targetRelationId = insertRte->relid;
if (!IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE) &&
subqueryRteListProperties->hasDistTableWithoutShardKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a distributed table that "
"does not have a shard key when inserting into "
"a different table type")));
}
else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
if (subqueryRteListProperties->hasPostgresLocalTable ||
subqueryRteListProperties->hasReferenceTable ||
subqueryRteListProperties->hasCitusLocalTable ||
subqueryRteListProperties->hasDistTableWithShardKey ||
subqueryRteListProperties->hasMaterializedView)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from different table types "
"when inserting into a distributed table "
"that does not have a shard key")));
}

if (!subqueryRteListProperties->hasDistTableWithoutShardKey)
{
/*
* This means that the SELECT doesn't reference any Citus tables,
* Postgres tables or materialized views but references a function
* call, a values claue etc., or a cte from INSERT.
*
* In that case, we rely on the common restrictions enforced by the
* INSERT .. SELECT planners.
*/
Assert(!NeedsDistributedPlanning(subquery));
return;
}

List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);

if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a non-colocated distributed "
"table when inserting into a distributed table "
"that does not have a shard key")));
}
}
}


/*
* CreateDistributedInsertSelectPlan creates a DistributedPlan for distributed
* INSERT ... SELECT queries which could consist of multiple tasks.
Expand Down Expand Up @@ -454,16 +379,6 @@ CreateInsertSelectIntoLocalTablePlan(uint64 planId, Query *insertSelectQuery,
{
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);

RTEListProperties *selectRteListProperties =
GetRTEListPropertiesForQuery(selectRte->subquery);
if (selectRteListProperties->hasDistTableWithoutShardKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot select from a distributed table that "
"does not have a shard key when inserting into "
"a local table")));
}

PrepareInsertSelectForCitusPlanner(insertSelectQuery);

/* get the SELECT query (may have changed after PrepareInsertSelectForCitusPlanner) */
Expand Down Expand Up @@ -800,10 +715,6 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
NULL, NULL);
}
}
else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
/* we've already checked the subquery via ErrorIfInsertSelectWithNullDistKeyNotSupported */
}
else
{
/*
Expand All @@ -819,25 +730,49 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
"table", NULL, NULL);
}

/* ensure that INSERT's partition column comes from SELECT's partition column */
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
if (error)
if (!HasDistributionKey(targetRelationId) ||
subqueryRteListProperties->hasDistTableWithoutShardKey)
{
return error;
/*
* XXX: Better to check this regardless of the fact that the target table
* has a distribution column or not.
*/
List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);

if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot reference a "
"distributed table without a shard key together "
"with non-colocated distributed tables",
NULL, NULL);
}
}

/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
if (HasDistributionKey(targetRelationId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
/* ensure that INSERT's partition column comes from SELECT's partition column */
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
if (error)
{
return error;
}

/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
}
}
}

Expand Down Expand Up @@ -1626,6 +1561,19 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);

/*
* Today it's not possible to generate a distributed plan for a SELECT
* having more than one tasks if it references a null-shard-key table.
* This is because, we don't support queries beyond router planner
* if the query references a null-shard-key table.
*
* For this reason, right now we don't expect an INSERT .. SELECT
* query to go through the repartitioned INSERT .. SELECT logic if the
* SELECT query references a null-shard-key table.
*/
Assert(!repartitioned ||
!GetRTEListPropertiesForQuery(selectQueryCopy)->hasDistTableWithoutShardKey);

distributedPlan->insertSelectQuery = insertSelectQuery;
distributedPlan->selectPlanForInsertSelect = selectPlan;
distributedPlan->insertSelectMethod = repartitioned ?
Expand Down
1 change: 0 additions & 1 deletion src/test/regress/expected/create_null_dist_key.out
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,6 @@ ALTER TABLE trigger_table_3 ENABLE TRIGGER ALL;
-- try a few simple queries at least to make sure that we don't crash
BEGIN;
INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1;
ERROR: cannot select from a non-colocated distributed table when inserting into a distributed table that does not have a shard key
ROLLBACK;
DROP TRIGGER IF EXISTS trigger_1 ON trigger_table_1;
DROP TRIGGER trigger_2 ON trigger_table_2 CASCADE;
Expand Down
Loading

0 comments on commit 400f58d

Please sign in to comment.