Skip to content

Commit

Permalink
Add support for creating distributed tables with a null shard key (#6745
Browse files Browse the repository at this point in the history
)

With this PR, we allow creating distributed tables with without
specifying a shard key via create_distributed_table(). Here are the
the important details about those tables:
* Specifying `shard_count` is not allowed because it is assumed to be 1.
* We mostly call such tables as "null shard-key" table in code /
comments.
* To avoid doing a breaking layout change in create_distributed_table();
instead of throwing an error, it will inform the user that
`distribution_type`
  param is ignored unless it's explicitly set to NULL or  'h'.
* `colocate_with` param allows colocating such null shard-key tables to
  each other.
* We define this table type, i.e., NULL_SHARD_KEY_TABLE, as a subclass
of
  DISTRIBUTED_TABLE because we mostly want to treat them as distributed
  tables in terms of SQL / DDL / operation support.
* Metadata for such tables look like:
  - distribution method => DISTRIBUTE_BY_NONE
  - replication model => REPLICATION_MODEL_STREAMING
- colocation id => **!=** INVALID_COLOCATION_ID (distinguishes from
Citus local tables)
* We assign colocation groups for such tables to different nodes in a
  round-robin fashion based on the modulo of "colocation id".

Note that this PR doesn't care about DDL (except CREATE TABLE) / SQL /
operation (i.e., Citus UDFs) support for such tables but adds a
preliminary
API.
  • Loading branch information
onurctirtir committed Mar 27, 2023
1 parent 372a93b commit cb3289e
Show file tree
Hide file tree
Showing 27 changed files with 2,896 additions and 82 deletions.
205 changes: 162 additions & 43 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,16 @@ static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams,
Var *distributionColumn);
Expand Down Expand Up @@ -216,37 +219,17 @@ create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}

Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
text *distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);

bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}

shardCount = PG_GETARG_INT32(4);

/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
}

EnsureCitusTableCanBeCreated(relationId);

/* enable create_distributed_table on an empty node */
Expand All @@ -266,20 +249,75 @@ create_distributed_table(PG_FUNCTION_ARGS)

relation_close(relation, NoLock);

char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
bool shardCountIsStrict = false;
if (distributionColumnText)
{
if (PG_ARGISNULL(2))
{
PG_RETURN_VOID();
}

char distributionMethod = LookupDistributionMethod(distributionMethodOid);
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
shardCount = PG_GETARG_INT32(4);

/*
* If shard_count parameter is given, then we have to
* make sure table has that many shards.
*/
shardCountIsStrict = true;
}

char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);

char distributionMethod = LookupDistributionMethod(distributionMethodOid);

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
}
else
{
if (!PG_ARGISNULL(4))
{
ereport(ERROR, (errmsg("shard_count can't be specified when the "
"distribution column is null because in "
"that case it's automatically set to 1")));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
if (!PG_ARGISNULL(2) &&
LookupDistributionMethod(PG_GETARG_OID(2)) != DISTRIBUTE_BY_HASH)
{
/*
* As we do for shard_count parameter, we could throw an error if
* distribution_type is not NULL when creating a null-shard-key table.
* However, this requires changing the default value of distribution_type
* parameter to NULL and this would mean a breaking change for most
* users because they're mostly using this API to create sharded
* tables. For this reason, here we instead do nothing if the distribution
* method is DISTRIBUTE_BY_HASH.
*/
ereport(ERROR, (errmsg("distribution_type can't be specified "
"when the distribution column is null ")));
}

CreateNullShardKeyDistTable(relationId, colocateWithTableName);
}

PG_RETURN_VOID();
}
Expand All @@ -295,11 +333,18 @@ create_distributed_table_concurrently(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}

if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("cannot use create_distributed_table_concurrently "
"to create a distributed table with a null shard "
"key, consider using create_distributed_table()")));
}

Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Expand Down Expand Up @@ -1033,6 +1078,23 @@ CreateReferenceTable(Oid relationId)
}


/*
* CreateNullShardKeyDistTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
*/
static void
CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName)
{
DistributedTableParams distributedTableParams = {
.colocateWithTableName = colocateWithTableName,
.shardCount = 1,
.shardCountIsStrict = true,
.distributionColumnName = NULL
};
CreateCitusTable(relationId, NULL_KEY_DISTRIBUTED_TABLE, &distributedTableParams);
}


/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
Expand All @@ -1051,7 +1113,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED) != (distributedTableParams != NULL))
tableType == RANGE_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE) !=
(distributedTableParams != NULL))
{
ereport(ERROR, (errmsg("distributed table params must be provided "
"when creating a distributed table and must "
Expand Down Expand Up @@ -1115,7 +1178,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
PropagatePrerequisiteObjectsForDistributedTable(relationId);

Var *distributionColumn = NULL;
if (distributedTableParams)
if (distributedTableParams && distributedTableParams->distributionColumnName)
{
distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributedTableParams->
Expand Down Expand Up @@ -1187,6 +1250,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{
CreateReferenceTableShard(relationId);
}
else if (tableType == NULL_KEY_DISTRIBUTED_TABLE)
{
CreateNullShardKeyDistTableShard(relationId, colocatedTableId,
colocationId);
}

if (ShouldSyncTableMetadata(relationId))
{
Expand Down Expand Up @@ -1241,7 +1309,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}

/* copy over data for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED || tableType == REFERENCE_TABLE)
if (tableType == HASH_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE ||
tableType == REFERENCE_TABLE)
{
if (RegularTable(relationId))
{
Expand Down Expand Up @@ -1305,6 +1374,13 @@ DecideCitusTableParams(CitusTableType tableType,
break;
}

case NULL_KEY_DISTRIBUTED_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
citusTableParams.replicationModel = REPLICATION_MODEL_STREAMING;
break;
}

case REFERENCE_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
Expand Down Expand Up @@ -1667,6 +1743,41 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
}


/*
* CreateHashDistributedTableShards creates the shard of given null-shard-key
* distributed table.
*/
static void
CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId)
{
if (colocatedTableId != InvalidOid)
{
/*
* We currently allow concurrent distribution of colocated tables (which
* we probably should not be allowing because of foreign keys /
* partitioning etc).
*
* We also prevent concurrent shard moves / copy / splits) while creating
* a colocated table.
*/
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
"colocate distributed table");

/*
* We don't need to force using exclusive connections because we're anyway
* creating a single shard.
*/
bool useExclusiveConnection = false;
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
}
else
{
CreateNullKeyShardWithRoundRobinPolicy(relationId, colocationId);
}
}


/*
* ColocationIdForNewTable returns a colocation id for given table
* according to given configuration. If there is no such configuration, it
Expand Down Expand Up @@ -1699,8 +1810,8 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
errdetail("Currently, colocate_with option is not supported "
"for append / range distributed tables.")));
}

return colocationId;
Expand All @@ -1716,10 +1827,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Assert(citusTableParams.distributionMethod == DISTRIBUTE_BY_HASH);

Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;

/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
Expand Down Expand Up @@ -1906,8 +2018,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
*/
if (PartitionedTableNoLock(relationId))
{
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
/*
* Distributing partitioned tables is only supported for hash-distribution
* or null-shard-key tables.
*/
bool isNullShardKeyTable =
distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_STREAMING &&
colocationId != INVALID_COLOCATION_ID;
if (distributionMethod != DISTRIBUTE_BY_HASH && !isNullShardKeyTable)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
Expand Down
12 changes: 11 additions & 1 deletion src/backend/distributed/commands/foreign_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
/*
* Foreign keys from citus local tables or reference tables to distributed
* tables are not supported.
*
* We could support foreign keys from references tables to null-shard-key
* tables but this doesn't seem useful a lot. However, if we decide supporting
* this, then we need to expand relation access tracking check for the null-shard-key
* tables too.
*/
if (referencingIsCitusLocalOrRefTable && !referencedIsCitusLocalOrRefTable)
{
Expand Down Expand Up @@ -361,7 +366,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
* if tables are hash-distributed and colocated, we need to make sure that
* the distribution key is included in foreign constraint.
*/
if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
bool referencedIsNullShardKeyTable =
IsNullShardKeyTableByDistParams(referencedDistMethod,
referencedReplicationModel,
referencedColocationId);
if (!referencedIsCitusLocalOrRefTable && !referencedIsNullShardKeyTable &&
!foreignConstraintOnDistKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}

if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
!IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE) &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
Expand Down
3 changes: 2 additions & 1 deletion src/backend/distributed/commands/truncate.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);

if (IsCitusTable(relationId) && !HasDistributionKey(relationId) &&
if ((IsCitusTableType(relationId, REFERENCE_TABLE) ||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) &&
TableReferenced(relationId))
{
char *relationName = get_rel_name(relationId);
Expand Down
Loading

0 comments on commit cb3289e

Please sign in to comment.