Skip to content

Commit

Permalink
Add support for creating distributed tables with a null shard key (#6745
Browse files Browse the repository at this point in the history
)

With this PR, we allow creating distributed tables with without
specifying a shard key via create_distributed_table(). Here are the
the important details about those tables:
* Specifying `shard_count` is not allowed because it is assumed to be 1.
* We mostly call such tables as "null shard-key" table in code /
comments.
* To avoid doing a breaking layout change in create_distributed_table();
instead of throwing an error, it will inform the user that
`distribution_type`
  param is ignored unless it's explicitly set to NULL or  'h'.
* `colocate_with` param allows colocating such null shard-key tables to
  each other.
* We define this table type, i.e., NULL_SHARD_KEY_TABLE, as a subclass
of
  DISTRIBUTED_TABLE because we mostly want to treat them as distributed
  tables in terms of SQL / DDL / operation support.
* Metadata for such tables look like:
  - distribution method => DISTRIBUTE_BY_NONE
  - replication model => REPLICATION_MODEL_STREAMING
- colocation id => **!=** INVALID_COLOCATION_ID (distinguishes from
Citus local tables)
* We assign colocation groups for such tables to different nodes in a
  round-robin fashion based on the modulo of "colocation id".

Note that this PR doesn't care about DDL (except CREATE TABLE) / SQL /
operation (i.e., Citus UDFs) support for such tables but adds a
preliminary
API.
  • Loading branch information
onurctirtir authored and emelsimsek committed May 24, 2023
1 parent 3b10502 commit 252f364
Show file tree
Hide file tree
Showing 27 changed files with 2,909 additions and 75 deletions.
191 changes: 155 additions & 36 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,16 @@ static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams,
Var *distributionColumn);
Expand Down Expand Up @@ -216,51 +219,86 @@ create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}

Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
text *distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);

bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
if (distributionColumnText)
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
if (PG_ARGISNULL(2))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
PG_RETURN_VOID();
}

shardCount = PG_GETARG_INT32(4);
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}

/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
}
shardCount = PG_GETARG_INT32(4);

char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
/*
* If shard_count parameter is given, then we have to
* make sure table has that many shards.
*/
shardCountIsStrict = true;
}

char distributionMethod = LookupDistributionMethod(distributionMethodOid);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
char distributionMethod = LookupDistributionMethod(distributionMethodOid);

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
}
else
{
if (!PG_ARGISNULL(4))
{
ereport(ERROR, (errmsg("shard_count can't be specified when the "
"distribution column is null because in "
"that case it's automatically set to 1")));
}

if (!PG_ARGISNULL(2) &&
LookupDistributionMethod(PG_GETARG_OID(2)) != DISTRIBUTE_BY_HASH)
{
/*
* As we do for shard_count parameter, we could throw an error if
* distribution_type is not NULL when creating a null-shard-key table.
* However, this requires changing the default value of distribution_type
* parameter to NULL and this would mean a breaking change for most
* users because they're mostly using this API to create sharded
* tables. For this reason, here we instead do nothing if the distribution
* method is DISTRIBUTE_BY_HASH.
*/
ereport(ERROR, (errmsg("distribution_type can't be specified "
"when the distribution column is null ")));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
CreateNullShardKeyDistTable(relationId, colocateWithTableName);
}

PG_RETURN_VOID();
}
Expand All @@ -276,11 +314,18 @@ create_distributed_table_concurrently(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}

if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("cannot use create_distributed_table_concurrently "
"to create a distributed table with a null shard "
"key, consider using create_distributed_table()")));
}

Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Expand Down Expand Up @@ -982,6 +1027,23 @@ CreateReferenceTable(Oid relationId)
}


/*
* CreateNullShardKeyDistTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
*/
static void
CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName)
{
DistributedTableParams distributedTableParams = {
.colocateWithTableName = colocateWithTableName,
.shardCount = 1,
.shardCountIsStrict = true,
.distributionColumnName = NULL
};
CreateCitusTable(relationId, NULL_KEY_DISTRIBUTED_TABLE, &distributedTableParams);
}


/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
Expand All @@ -1000,7 +1062,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED) != (distributedTableParams != NULL))
tableType == RANGE_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE) !=
(distributedTableParams != NULL))
{
ereport(ERROR, (errmsg("distributed table params must be provided "
"when creating a distributed table and must "
Expand Down Expand Up @@ -1078,7 +1141,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
PropagatePrerequisiteObjectsForDistributedTable(relationId);

Var *distributionColumn = NULL;
if (distributedTableParams)
if (distributedTableParams && distributedTableParams->distributionColumnName)
{
distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributedTableParams->
Expand Down Expand Up @@ -1150,6 +1213,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{
CreateReferenceTableShard(relationId);
}
else if (tableType == NULL_KEY_DISTRIBUTED_TABLE)
{
CreateNullShardKeyDistTableShard(relationId, colocatedTableId,
colocationId);
}

if (ShouldSyncTableMetadata(relationId))
{
Expand Down Expand Up @@ -1204,7 +1272,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}

/* copy over data for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED || tableType == REFERENCE_TABLE)
if (tableType == HASH_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE ||
tableType == REFERENCE_TABLE)
{
if (RegularTable(relationId))
{
Expand Down Expand Up @@ -1268,6 +1337,13 @@ DecideCitusTableParams(CitusTableType tableType,
break;
}

case NULL_KEY_DISTRIBUTED_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
citusTableParams.replicationModel = REPLICATION_MODEL_STREAMING;
break;
}

case REFERENCE_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
Expand Down Expand Up @@ -1630,6 +1706,41 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
}


/*
* CreateHashDistributedTableShards creates the shard of given null-shard-key
* distributed table.
*/
static void
CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId)
{
if (colocatedTableId != InvalidOid)
{
/*
* We currently allow concurrent distribution of colocated tables (which
* we probably should not be allowing because of foreign keys /
* partitioning etc).
*
* We also prevent concurrent shard moves / copy / splits) while creating
* a colocated table.
*/
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
"colocate distributed table");

/*
* We don't need to force using exclusive connections because we're anyway
* creating a single shard.
*/
bool useExclusiveConnection = false;
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
}
else
{
CreateNullKeyShardWithRoundRobinPolicy(relationId, colocationId);
}
}


/*
* ColocationIdForNewTable returns a colocation id for given table
* according to given configuration. If there is no such configuration, it
Expand Down Expand Up @@ -1662,8 +1773,8 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
errdetail("Currently, colocate_with option is not supported "
"for append / range distributed tables.")));
}

return colocationId;
Expand All @@ -1679,10 +1790,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Assert(citusTableParams.distributionMethod == DISTRIBUTE_BY_HASH);

Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;

/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
Expand Down Expand Up @@ -1871,8 +1983,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
*/
if (PartitionedTableNoLock(relationId))
{
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
/*
* Distributing partitioned tables is only supported for hash-distribution
* or null-shard-key tables.
*/
bool isNullShardKeyTable =
distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_STREAMING &&
colocationId != INVALID_COLOCATION_ID;
if (distributionMethod != DISTRIBUTE_BY_HASH && !isNullShardKeyTable)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
Expand Down
12 changes: 11 additions & 1 deletion src/backend/distributed/commands/foreign_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
/*
* Foreign keys from citus local tables or reference tables to distributed
* tables are not supported.
*
* We could support foreign keys from references tables to null-shard-key
* tables but this doesn't seem useful a lot. However, if we decide supporting
* this, then we need to expand relation access tracking check for the null-shard-key
* tables too.
*/
if (referencingIsCitusLocalOrRefTable && !referencedIsCitusLocalOrRefTable)
{
Expand Down Expand Up @@ -361,7 +366,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
* if tables are hash-distributed and colocated, we need to make sure that
* the distribution key is included in foreign constraint.
*/
if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
bool referencedIsNullShardKeyTable =
IsNullShardKeyTableByDistParams(referencedDistMethod,
referencedReplicationModel,
referencedColocationId);
if (!referencedIsCitusLocalOrRefTable && !referencedIsNullShardKeyTable &&
!foreignConstraintOnDistKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2146,6 +2146,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}

if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
!IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE) &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
Expand Down
3 changes: 2 additions & 1 deletion src/backend/distributed/commands/truncate.c
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);

if (IsCitusTable(relationId) && !HasDistributionKey(relationId) &&
if ((IsCitusTableType(relationId, REFERENCE_TABLE) ||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) &&
TableReferenced(relationId))
{
char *relationName = get_rel_name(relationId);
Expand Down
Loading

0 comments on commit 252f364

Please sign in to comment.