Skip to content

Commit

Permalink
Add support for creating distributed tables with a null shard key
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Mar 13, 2023
1 parent 3eb6355 commit efe2124
Show file tree
Hide file tree
Showing 24 changed files with 2,606 additions and 73 deletions.
188 changes: 149 additions & 39 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,16 @@ static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams,
Var *distributionColumn);
Expand Down Expand Up @@ -216,37 +219,17 @@ create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}

Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
text *distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);

bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}

shardCount = PG_GETARG_INT32(4);

/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
}

EnsureCitusTableCanBeCreated(relationId);

/* enable create_distributed_table on an empty node */
Expand All @@ -266,20 +249,74 @@ create_distributed_table(PG_FUNCTION_ARGS)

relation_close(relation, NoLock);

char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
bool shardCountIsStrict = false;
if (distributionColumnText)
{
if (PG_ARGISNULL(2))
{
PG_RETURN_VOID();
}

char distributionMethod = LookupDistributionMethod(distributionMethodOid);
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
shardCount = PG_GETARG_INT32(4);

/*
* If shard_count parameter is given, then we have to
* make sure table has that many shards.
*/
shardCountIsStrict = true;
}

char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);

char distributionMethod = LookupDistributionMethod(distributionMethodOid);

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
}
else
{
if (!PG_ARGISNULL(4))
{
ereport(ERROR, (errmsg("shard_count can't be specified when the "
"distribution column is null because in "
"that case it's automatically set to 1")));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
if (!PG_ARGISNULL(2))
{
/*
* As we do for shard_count parameter, we could throw an error if
* distribution_type is not NULL when creating a null-shard-key table.
* However, this requires changing the default value of distribution_type
* parameter to NULL and this would mean a breaking change for most
* users because they're mostly using this API to create sharded
* tables. For this reason, here we instead silently ignore
* distribution_type parameter for null-shard-key tables.
*/
ereport(NOTICE, (errmsg("ignoring distribution_type for the "
"tables that don't have a shard key")));
}

CreateNullShardKeyDistTable(relationId, colocateWithTableName);
}

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -1033,6 +1070,23 @@ CreateReferenceTable(Oid relationId)
}


/*
* CreateNullShardKeyDistTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
*/
static void
CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName)
{
DistributedTableParams distributedTableParams = {
.colocateWithTableName = colocateWithTableName,
.shardCount = 1,
.shardCountIsStrict = true,
.distributionColumnName = NULL
};
CreateCitusTable(relationId, NULL_KEY_DISTRIBUTED_TABLE, &distributedTableParams);
}


/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
Expand All @@ -1051,7 +1105,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED) !=
tableType == RANGE_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE) !=
(distributedTableParams != NULL))
{
ereport(ERROR, (errmsg("distributed table params must be provided "
Expand Down Expand Up @@ -1188,6 +1242,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{
CreateReferenceTableShard(relationId);
}
else if (tableType == NULL_KEY_DISTRIBUTED_TABLE)
{
CreateNullShardKeyDistTableShard(relationId, colocatedTableId,
colocationId);
}

if (ShouldSyncTableMetadata(relationId))
{
Expand Down Expand Up @@ -1242,7 +1301,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}

/* copy over data for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED || tableType == REFERENCE_TABLE)
if (tableType == HASH_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE ||
tableType == REFERENCE_TABLE)
{
if (RegularTable(relationId))
{
Expand Down Expand Up @@ -1306,6 +1366,13 @@ DecideCitusTableParams(CitusTableType tableType,
break;
}

case NULL_KEY_DISTRIBUTED_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
citusTableParams.replicationModel = REPLICATION_MODEL_STREAMING;
break;
}

case REFERENCE_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
Expand Down Expand Up @@ -1668,6 +1735,41 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
}


/*
* CreateHashDistributedTableShards creates the shard of given null-shard-key
* distributed table.
*/
static void
CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId)
{
if (colocatedTableId != InvalidOid)
{
/*
* We currently allow concurrent distribution of colocated tables (which
* we probably should not be allowing because of foreign keys /
* partitioning etc).
*
* We also prevent concurrent shard moves / copy / splits) while creating
* a colocated table.
*/
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
"colocate distributed table");

/*
* We don't need to force using exclusive connections because we're anyway
* creating a single shard.
*/
bool useExclusiveConnection = false;
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
}
else
{
CreateNullKeyShardWithRoundRobinPolicy(relationId, colocationId);
}
}


/*
* ColocationIdForNewTable returns a colocation id for given table
* according to given configuration. If there is no such configuration, it
Expand Down Expand Up @@ -1717,10 +1819,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Assert(citusTableParams.distributionMethod == DISTRIBUTE_BY_HASH);

Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;

/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
Expand Down Expand Up @@ -1907,8 +2010,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
*/
if (PartitionedTableNoLock(relationId))
{
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
/*
* Distributing partitioned tables is only supported for hash-distribution
* or null-shard-key tables.
*/
bool isNullShardKeyTable =
distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_STREAMING &&
colocationId != INVALID_COLOCATION_ID;
if (distributionMethod != DISTRIBUTE_BY_HASH && !isNullShardKeyTable)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
Expand Down
7 changes: 6 additions & 1 deletion src/backend/distributed/commands/foreign_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
* if tables are hash-distributed and colocated, we need to make sure that
* the distribution key is included in foreign constraint.
*/
if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
bool referencedIsNullShardKeyTable =
IsNullShardKeyTableByDistParams(referencedDistMethod,
referencedReplicationModel,
referencedColocationId);
if (!referencedIsCitusLocalOrRefTable && !referencedIsNullShardKeyTable &&
!foreignConstraintOnDistKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}

if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
!IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE) &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
Expand Down
27 changes: 26 additions & 1 deletion src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,21 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
return partitionMethod == DISTRIBUTE_BY_RANGE;
}

case NULL_KEY_DISTRIBUTED_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}

case DISTRIBUTED_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_HASH ||
partitionMethod == DISTRIBUTE_BY_RANGE ||
partitionMethod == DISTRIBUTE_BY_APPEND;
partitionMethod == DISTRIBUTE_BY_APPEND ||
(partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID);
}

case STRICTLY_PARTITIONED_DISTRIBUTED_TABLE:
Expand Down Expand Up @@ -803,6 +813,21 @@ IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
}


/*
* IsNullShardKeyTableByDistParams returns true if given partitionMethod,
* replicationModel and colocationId would identify a distributed table that
* has a null shard key.
*/
bool
IsNullShardKeyTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId)
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}


/*
* CitusTableList returns a list that includes all the valid distributed table
* cache entries.
Expand Down
11 changes: 6 additions & 5 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ ShouldSyncUserCommandForObject(ObjectAddress objectAddress)
/*
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* propagated to metadata workers, i.e. the table is a hash distributed table or
* reference/citus local table.
* a Citus table that doesn't have shard key.
*/
bool
ShouldSyncTableMetadata(Oid relationId)
Expand All @@ -542,10 +542,11 @@ ShouldSyncTableMetadata(Oid relationId)


/*
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a distributed table should
* be propagated to metadata workers, i.e. the table is an MX table or reference table.
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a Citus table should
* be propagated to metadata workers, i.e. the table is an MX table or Citus table
* that doesn't have shard key.
* Tables with streaming replication model (which means RF=1) and hash distribution are
* considered as MX tables while tables with none distribution are reference tables.
* considered as MX tables.
*
* ShouldSyncTableMetadataViaCatalog does not use the CitusTableCache and instead reads
* from catalog tables directly.
Expand Down Expand Up @@ -1144,7 +1145,7 @@ EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId)

/*
* DistributionCreateCommands generates a commands that can be
* executed to replicate the metadata for a distributed table.
* executed to replicate the metadata for a Citus table.
*/
char *
DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
Expand Down
Loading

0 comments on commit efe2124

Please sign in to comment.