Skip to content

Commit

Permalink
Make the definition of reference & citus local tables more strict
Browse files Browse the repository at this point in the history
Now that we will soon add another table type having DISTRIBUTE_BY_NONE
as distribution method and that we want the code interpret such tables
mostly as distributed tables, let's make the definition of those other
two table types more strict.
  • Loading branch information
onurctirtir committed Mar 8, 2023
1 parent e3cf7ac commit 538541d
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 49 deletions.
20 changes: 15 additions & 5 deletions src/backend/distributed/commands/foreign_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
if (!referencedIsCitus && !selfReferencingTable)
{
if (IsCitusLocalTableByDistParams(referencingDistMethod,
referencingReplicationModel))
referencingReplicationModel,
referencingColocationId))
{
ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(referencedTableId);
}
Expand All @@ -246,7 +247,7 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
{
referencedDistMethod = PartitionMethod(referencedTableId);
referencedDistKey = IsCitusTableType(referencedTableId,
CITUS_TABLE_WITH_NO_DIST_KEY) ?
CITUS_LOCAL_OR_REFERENCE_TABLE) ?
NULL :
DistPartitionKey(referencedTableId);
referencedColocationId = TableColocationId(referencedTableId);
Expand Down Expand Up @@ -278,9 +279,17 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
}

bool referencingIsCitusLocalOrRefTable =
(referencingDistMethod == DISTRIBUTE_BY_NONE);
IsCitusLocalTableByDistParams(referencingDistMethod,
referencingReplicationModel,
referencingColocationId) ||
IsReferenceTableByDistParams(referencingDistMethod,
referencingReplicationModel);
bool referencedIsCitusLocalOrRefTable =
(referencedDistMethod == DISTRIBUTE_BY_NONE);
IsCitusLocalTableByDistParams(referencedDistMethod,
referencedReplicationModel,
referencedColocationId) ||
IsReferenceTableByDistParams(referencedDistMethod,
referencedReplicationModel);
if (referencingIsCitusLocalOrRefTable && referencedIsCitusLocalOrRefTable)
{
EnsureSupportedFKeyBetweenCitusLocalAndRefTable(constraintForm,
Expand Down Expand Up @@ -313,7 +322,8 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
* reference table is referenced.
*/
bool referencedIsReferenceTable =
(referencedReplicationModel == REPLICATION_MODEL_2PC);
IsReferenceTableByDistParams(referencedDistMethod,
referencedReplicationModel);
if (!referencedIsReferenceTable && (
referencingColocationId == INVALID_COLOCATION_ID ||
referencingColocationId != referencedColocationId))
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/index.c
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
* Non-distributed tables do not have partition key, and unique constraints
* are allowed for them. Thus, we added a short-circuit for non-distributed tables.
*/
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletion *completionTag)
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
CopyToExistingShards(copyStatement, completionTag);
}
Expand Down
6 changes: 3 additions & 3 deletions src/backend/distributed/commands/table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1607,9 +1607,9 @@ AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(AlterTableStmt *alterTableSt
if (!IsCitusTable(leftRelationId))
{
return RelationIdListContainsCitusTableType(rightRelationIdList,
CITUS_TABLE_WITH_NO_DIST_KEY);
CITUS_LOCAL_OR_REFERENCE_TABLE);
}
else if (IsCitusTableType(leftRelationId, CITUS_TABLE_WITH_NO_DIST_KEY))
else if (IsCitusTableType(leftRelationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
return RelationIdListContainsPostgresTable(rightRelationIdList);
}
Expand Down Expand Up @@ -3666,7 +3666,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
* sequential mode.
*/
if (executeSequentially &&
!IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY) &&
!IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE) &&
ParallelQueryExecutedInTransaction())
{
char *relationName = get_rel_name(relationId);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/truncate.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);

if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY) &&
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE) &&
TableReferenced(relationId))
{
char *relationName = get_rel_name(relationId);
Expand Down
44 changes: 33 additions & 11 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ static void InvalidateDistTableCache(void);
static void InvalidateDistObjectCache(void);
static bool InitializeTableCacheEntry(int64 shardId, bool missingOk);
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
CitusTableType tableType);
uint32 colocationId, CitusTableType tableType);
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry, bool
missingOk);

Expand Down Expand Up @@ -450,7 +450,8 @@ bool
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
{
return IsCitusTableTypeInternal(tableEntry->partitionMethod,
tableEntry->replicationModel, tableType);
tableEntry->replicationModel,
tableEntry->colocationId, tableType);
}


Expand All @@ -460,7 +461,7 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
*/
static bool
IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
CitusTableType tableType)
uint32 colocationId, CitusTableType tableType)
{
switch (tableType)
{
Expand Down Expand Up @@ -501,12 +502,16 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
case CITUS_LOCAL_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC;
replicationModel != REPLICATION_MODEL_2PC &&
colocationId == INVALID_COLOCATION_ID;
}

case CITUS_TABLE_WITH_NO_DIST_KEY:
case CITUS_LOCAL_OR_REFERENCE_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_NONE;
return partitionMethod == DISTRIBUTE_BY_NONE &&
(replicationModel == REPLICATION_MODEL_2PC ||
(replicationModel != REPLICATION_MODEL_2PC &&
colocationId == INVALID_COLOCATION_ID));
}

case ANY_CITUS_TABLE_TYPE:
Expand Down Expand Up @@ -765,14 +770,28 @@ PgDistPartitionTupleViaCatalog(Oid relationId)


/*
* IsCitusLocalTableByDistParams returns true if given partitionMethod and
* replicationModel would identify a citus local table.
* IsReferenceTableByDistParams returns true if given partitionMethod and
* replicationModel would identify a reference table.
*/
bool
IsReferenceTableByDistParams(char partitionMethod, char replicationModel)
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_2PC;
}


/*
* IsCitusLocalTableByDistParams returns true if given partitionMethod,
* replicationModel and colocationId would identify a citus local table.
*/
bool
IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel)
IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId)
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC;
replicationModel != REPLICATION_MODEL_2PC &&
colocationId == INVALID_COLOCATION_ID;
}


Expand Down Expand Up @@ -4837,11 +4856,14 @@ CitusTableTypeIdList(CitusTableType citusTableType)

Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
Datum replicationModelDatum = datumArray[Anum_pg_dist_partition_repmodel - 1];
Datum colocationIdDatum = datumArray[Anum_pg_dist_partition_colocationid - 1];

Oid partitionMethod = DatumGetChar(partMethodDatum);
Oid replicationModel = DatumGetChar(replicationModelDatum);
uint32 colocationId = DatumGetUInt32(colocationIdDatum);

if (IsCitusTableTypeInternal(partitionMethod, replicationModel, citusTableType))
if (IsCitusTableTypeInternal(partitionMethod, replicationModel, colocationId,
citusTableType))
{
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];

Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ ShouldSyncTableMetadata(Oid relationId)

bool hashDistributed = IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED);
bool citusTableWithNoDistKey =
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY);
IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_OR_REFERENCE_TABLE);

return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey);
}
Expand Down Expand Up @@ -1158,7 +1158,7 @@ DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
char replicationModel = cacheEntry->replicationModel;
StringInfo tablePartitionKeyNameString = makeStringInfo();

if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
appendStringInfo(tablePartitionKeyNameString, "NULL");
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
errmsg("relation is not distributed")));
}

if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
List *shardIntervalList = LoadShardIntervalList(relationId);
if (shardIntervalList == NIL)
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/multi_join_order.c
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ DistPartitionKey(Oid relationId)
CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId);

/* non-distributed tables do not have partition column */
if (IsCitusTableTypeCacheEntry(partitionEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableTypeCacheEntry(partitionEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/multi_logical_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
* If the expression belongs to a non-distributed table continue searching for
* other partition keys.
*/
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
continue;
}
Expand Down
8 changes: 4 additions & 4 deletions src/backend/distributed/planner/multi_physical_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -2199,7 +2199,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
Oid relationId = relationRestriction->relationId;

CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
continue;
}
Expand Down Expand Up @@ -2377,7 +2377,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
nonReferenceRelations = lappend_oid(nonReferenceRelations,
relationId);
}
else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
else if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
/* do not need to handle non-distributed tables */
continue;
Expand Down Expand Up @@ -2482,7 +2482,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
ShardInterval *shardInterval = NULL;

CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
/* non-distributed tables have only one shard */
shardInterval = cacheEntry->sortedShardIntervalArray[0];
Expand Down Expand Up @@ -3697,7 +3697,7 @@ PartitionedOnColumn(Var *column, List *rangeTableList, List *dependentJobList)
Var *partitionColumn = PartitionColumn(relationId, rangeTableId);

/* non-distributed tables do not have partition columns */
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
return false;
}
Expand Down
8 changes: 4 additions & 4 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -2675,7 +2675,7 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
{
Oid relationId = ExtractFirstCitusTableId(query);

if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
/* we don't need to do shard pruning for non-distributed tables */
return list_make1(LoadShardIntervalList(relationId));
Expand Down Expand Up @@ -2968,7 +2968,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
Assert(query->commandType == CMD_INSERT);

/* reference tables and citus local tables can only have one shard */
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
List *shardIntervalList = LoadShardIntervalList(distributedTableId);

Expand Down Expand Up @@ -3509,7 +3509,7 @@ ExtractInsertPartitionKeyValue(Query *query)
uint32 rangeTableId = 1;
Const *singlePartitionValueConst = NULL;

if (IsCitusTableType(distributedTableId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(distributedTableId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
return NULL;
}
Expand Down Expand Up @@ -3830,7 +3830,7 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree)
GetCitusTableCacheEntry(distributedTableId);

if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry,
CITUS_TABLE_WITH_NO_DIST_KEY))
CITUS_LOCAL_OR_REFERENCE_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot router plan modification of a non-distributed table",
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/query_colocation_checker.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ AnchorRte(Query *subquery)
{
Oid relationId = currentRte->relid;

if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
/*
* Non-distributed tables should not be the anchor rte since they
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,

/* we shouldn't check for the equality of non-distributed tables */
if (IsCitusTableType(relationRestriction->relationId,
CITUS_TABLE_WITH_NO_DIST_KEY))
CITUS_LOCAL_OR_REFERENCE_TABLE))
{
continue;
}
Expand Down Expand Up @@ -1933,7 +1933,7 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio
{
Oid relationId = relationRestriction->relationId;

if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/shard_pruning.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList,
}

/* short circuit for non-distributed tables such as reference table */
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
prunedList = ShardArrayToList(cacheEntry->sortedShardIntervalArray,
cacheEntry->shardIntervalArrayLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType acce
* recursively calling RecordRelationAccessBase(), so becareful about
* removing this check.
*/
if (!IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
if (!IsCitusTableType(relationId, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
return;
}
Expand Down Expand Up @@ -732,7 +732,7 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access

CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);

if (!(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY) &&
if (!(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE) &&
cacheEntry->referencingRelationsViaForeignKey != NIL))
{
return;
Expand Down Expand Up @@ -931,7 +931,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess
* We're only interested in foreign keys to reference tables and citus
* local tables.
*/
if (!IsCitusTableType(referencedRelation, CITUS_TABLE_WITH_NO_DIST_KEY))
if (!IsCitusTableType(referencedRelation, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
continue;
}
Expand Down Expand Up @@ -993,7 +993,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
bool holdsConflictingLocks = false;

Assert(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY));
Assert(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE));

Oid referencingRelation = InvalidOid;
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/utils/colocation_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard
return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval);
}
else if (IsCitusTableType(leftShardInterval->relationId,
CITUS_TABLE_WITH_NO_DIST_KEY))
CITUS_LOCAL_OR_REFERENCE_TABLE))
{
/*
* Reference tables has only a single shard and all reference tables
Expand Down
Loading

0 comments on commit 538541d

Please sign in to comment.