Skip to content

Commit

Permalink
Add support for creating distributed tables with a null shard key
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Mar 8, 2023
1 parent 538541d commit 8dce689
Show file tree
Hide file tree
Showing 22 changed files with 2,333 additions and 84 deletions.
216 changes: 165 additions & 51 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,16 @@ static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName);
static void CreateCitusTable(Oid relationId, char *distributionColumnName,
char distributionMethod,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName,
char replicationModel);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
bool localTableEmpty);
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char replicationModel,
int shardCount, bool shardCountIsStrict,
Expand Down Expand Up @@ -195,37 +198,17 @@ create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}

Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
text *distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);

bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}

shardCount = PG_GETARG_INT32(4);

/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
}

EnsureCitusTableCanBeCreated(relationId);

/* enable create_distributed_table on an empty node */
Expand All @@ -245,20 +228,74 @@ create_distributed_table(PG_FUNCTION_ARGS)

relation_close(relation, NoLock);

char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
bool shardCountIsStrict = false;
if (distributionColumnText)
{
if (PG_ARGISNULL(2))
{
PG_RETURN_VOID();
}

char distributionMethod = LookupDistributionMethod(distributionMethodOid);
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
shardCount = PG_GETARG_INT32(4);

/*
* If shard_count parameter is given, then we have to
* make sure table has that many shards.
*/
shardCountIsStrict = true;
}

char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);

char distributionMethod = LookupDistributionMethod(distributionMethodOid);

if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
}
else
{
if (!PG_ARGISNULL(4))
{
ereport(ERROR, (errmsg("shard_count can't be specified when dist "
"column is null because in that case it's "
"automatically set to 1 ")));
}

if (!PG_ARGISNULL(2))
{
/*
* As we do for shard_count parameter, we could throw an error if
* distribution_type is not NULL when creating a null-shard-key table.
* However, this requires changing the default value of distribution_type
* parameter to NULL and this would mean a breaking change for most
* users because they're mostly using this API to create sharded
* tables. For this reason, here we instead silently ignore
* distribution_type parameter for null-shard-key tables.
*/
ereport(NOTICE, (errmsg(
"ignoring distribution_type for null-shard-key tables")));
}

CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
CreateNullShardKeyDistTable(relationId, colocateWithTableName);
}

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -993,6 +1030,25 @@ CreateReferenceTable(Oid relationId)
}


/*
* CreateNullShardKeyDistTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
*/
static void
CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName)
{
char *distributionColumnName = NULL;
char distributionMethod = DISTRIBUTE_BY_NONE;
int shardCount = 1;
bool shardCountIsStrict = true;
char replicationModel = REPLICATION_MODEL_STREAMING;
CreateCitusTable(relationId, distributionColumnName,
distributionMethod, shardCount,
shardCountIsStrict, colocateWithTableName,
replicationModel);
}


/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
Expand Down Expand Up @@ -1034,11 +1090,9 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
* that ALTER TABLE hook does the necessary job, which means converting
* local tables to citus local tables to properly support such foreign
* keys.
*
* This function does not expect to create Citus local table, so we blindly
* create reference table when the method is DISTRIBUTE_BY_NONE.
*/
else if (distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_2PC &&
ShouldEnableLocalReferenceForeignKeys() &&
HasForeignKeyWithLocalTable(relationId))
{
Expand Down Expand Up @@ -1128,11 +1182,19 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
}
else if (distributionMethod == DISTRIBUTE_BY_NONE)
{
/*
* This function does not expect to create Citus local table, so we blindly
* create reference table when the method is DISTRIBUTE_BY_NONE.
*/
CreateReferenceTableShard(relationId);
if (replicationModel == REPLICATION_MODEL_2PC)
{
CreateReferenceTableShard(relationId);
}
else if (replicationModel == REPLICATION_MODEL_STREAMING)
{
CreateNullShardKeyDistTableShard(relationId, colocatedTableId,
localTableEmpty);
}
else
{
Assert(false);
}
}

if (ShouldSyncTableMetadata(relationId))
Expand Down Expand Up @@ -1173,9 +1235,9 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
{
MemoryContextReset(citusPartitionContext);

CreateDistributedTable(partitionRelationId, distributionColumnName,
distributionMethod, shardCount, false,
parentRelationName);
CreateCitusTable(partitionRelationId, distributionColumnName,
distributionMethod, shardCount, false,
parentRelationName, replicationModel);
}

MemoryContextSwitchTo(oldContext);
Expand Down Expand Up @@ -1546,6 +1608,49 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
}


/*
* CreateHashDistributedTableShards creates the shard of given null-shard-key
* distributed table.
*/
static void
CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
bool localTableEmpty)
{
/*
* Decide whether to use exclusive connections per placement or not. Note that
* if the local table is not empty, we cannot use sequential mode since the COPY
* operation that'd load the data into shards currently requires exclusive
* connections.
*/
bool useExclusiveConnection = false;
if (RegularTable(relationId))
{
useExclusiveConnection = CanUseExclusiveConnections(relationId,
localTableEmpty);
}

if (colocatedTableId != InvalidOid)
{
/*
* We currently allow concurrent distribution of colocated tables (which
* we probably should not be allowing because of foreign keys /
* partitioning etc).
*
* We also prevent concurrent shard moves / copy / splits) while creating
* a colocated table.
*/
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
"colocate distributed table");

CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
}
else
{
CreateNullKeyShard(relationId, useExclusiveConnection);
}
}


/*
* ColocationIdForNewTable returns a colocation id for hash-distributed table
* according to given configuration. If there is no such configuration, it
Expand All @@ -1572,13 +1677,14 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
errdetail("Currently, colocate_with option is not supported "
"for append / range distributed tables.")));
}

return colocationId;
}
else if (distributionMethod == DISTRIBUTE_BY_NONE)
else if (distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_2PC)
{
return CreateReferenceTableColocationId();
}
Expand All @@ -1589,10 +1695,11 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Assert(distributionMethod == DISTRIBUTE_BY_HASH);

Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;

/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(colocateWithTableName))
Expand Down Expand Up @@ -1775,8 +1882,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
*/
if (PartitionedTableNoLock(relationId))
{
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
/*
* Distributing partitioned tables is only supported for hash-distribution
* or null-shard-key tables.
*/
bool isNullShardKeyTable =
distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_STREAMING &&
colocationId != INVALID_COLOCATION_ID;
if (distributionMethod != DISTRIBUTE_BY_HASH && !isNullShardKeyTable)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
Expand Down
7 changes: 6 additions & 1 deletion src/backend/distributed/commands/foreign_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
* if tables are hash-distributed and colocated, we need to make sure that
* the distribution key is included in foreign constraint.
*/
if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
bool referencedIsNullShardKeyTable =
IsNullShardKeyTableByDistParams(referencedDistMethod,
referencedReplicationModel,
referencedColocationId);
if (!referencedIsCitusLocalOrRefTable && !referencedIsNullShardKeyTable &&
!foreignConstraintOnDistKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletion *completionTag)
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE) ||
IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_OR_REFERENCE_TABLE))
{
CopyToExistingShards(copyStatement, completionTag);
Expand Down Expand Up @@ -2140,6 +2141,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}

if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
!IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE) &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
Expand Down
27 changes: 26 additions & 1 deletion src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,21 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
return partitionMethod == DISTRIBUTE_BY_RANGE;
}

case NULL_KEY_DISTRIBUTED_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}

case DISTRIBUTED_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_HASH ||
partitionMethod == DISTRIBUTE_BY_RANGE ||
partitionMethod == DISTRIBUTE_BY_APPEND;
partitionMethod == DISTRIBUTE_BY_APPEND ||
(partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID);
}

case STRICTLY_PARTITIONED_DISTRIBUTED_TABLE:
Expand Down Expand Up @@ -795,6 +805,21 @@ IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
}


/*
* IsNullShardKeyTableByDistParams returns true if given partitionMethod,
* replicationModel and colocationId would identify a distributed table that
* has a null shard key.
*/
bool
IsNullShardKeyTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId)
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}


/*
* CitusTableList returns a list that includes all the valid distributed table
* cache entries.
Expand Down
Loading

0 comments on commit 8dce689

Please sign in to comment.