Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CreateDistributedTable() #6742

Merged
merged 4 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/backend/distributed/commands/alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1348,8 +1348,7 @@ CreateCitusTableLike(TableConversionState *con)
}
else if (IsCitusTableType(con->relationId, REFERENCE_TABLE))
{
CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, false,
NULL);
CreateReferenceTable(con->newRelationId);
}
else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE))
{
Expand Down
95 changes: 68 additions & 27 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,17 @@ static void CreateDistributedTableConcurrently(Oid relationId,
char *colocateWithTableName,
int shardCount,
bool shardCountIsStrict);
static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName);
static char DecideDistTableReplicationModel(char distributionMethod,
char *colocateWithTableName);
static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateCitusTable(Oid relationId, char *distributionColumnName,
char distributionMethod,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName,
char replicationModel);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
Expand Down Expand Up @@ -377,8 +383,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,

EnsureForeignKeysForDistributedTableConcurrently(relationId);

char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName);
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
colocateWithTableName);

/*
* we fail transaction before local table conversion if the table could not be colocated with
Expand Down Expand Up @@ -622,8 +628,8 @@ static void
EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
char *distributionColumnName, char *colocateWithTableName)
{
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName);
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
colocateWithTableName);

/*
* we fail transaction before local table conversion if the table could not be colocated with
Expand Down Expand Up @@ -860,9 +866,6 @@ create_reference_table(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);

char *colocateWithTableName = NULL;
char *distributionColumnName = NULL;

EnsureCitusTableCanBeCreated(relationId);

/* enable create_reference_table on an empty node */
Expand Down Expand Up @@ -895,8 +898,7 @@ create_reference_table(PG_FUNCTION_ARGS)
errdetail("There are no active worker nodes.")));
}

CreateDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE,
ShardCount, false, colocateWithTableName);
CreateReferenceTable(relationId);
PG_RETURN_VOID();
}

Expand Down Expand Up @@ -951,17 +953,61 @@ EnsureRelationExists(Oid relationId)


/*
* CreateDistributedTable creates distributed table in the given configuration.
* CreateReferenceTable is a wrapper around CreateCitusTable that creates a
* distributed table.
*/
void
CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName)
{
Assert(distributionMethod != DISTRIBUTE_BY_NONE);

char replicationModel = DecideDistTableReplicationModel(distributionMethod,
colocateWithTableName);
CreateCitusTable(relationId, distributionColumnName,
distributionMethod, shardCount,
shardCountIsStrict, colocateWithTableName,
replicationModel);
}


/*
* CreateReferenceTable is a wrapper around CreateCitusTable that creates a
* reference table.
*/
void
CreateReferenceTable(Oid relationId)
{
char *distributionColumnName = NULL;
char distributionMethod = DISTRIBUTE_BY_NONE;
int shardCount = 1;
bool shardCountIsStrict = true;
char *colocateWithTableName = NULL;
char replicationModel = REPLICATION_MODEL_2PC;
CreateCitusTable(relationId, distributionColumnName,
distributionMethod, shardCount,
shardCountIsStrict, colocateWithTableName,
replicationModel);
}


/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
*
* This functions contains all necessary logic to create distributed tables. It
* performs necessary checks to ensure distributing the table is safe. If it is
* safe to distribute the table, this function creates distributed table metadata,
* creates shards and copies local data to shards. This function also handles
* partitioned tables by distributing its partitions as well.
*/
void
CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName)
static void
CreateCitusTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName,
char replicationModel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably switch to a struct at some point, but not necessary now

{
/*
* EnsureTableNotDistributed errors out when relation is a citus table but
Expand Down Expand Up @@ -1022,9 +1068,6 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,

PropagatePrerequisiteObjectsForDistributedTable(relationId);

char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName);

Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributionColumnName,
NoLock);
Expand Down Expand Up @@ -1420,18 +1463,16 @@ DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)


/*
* DecideReplicationModel function decides which replication model should be
* used depending on given distribution configuration.
* DecideDistTableReplicationModel function decides which replication model should be
* used for a distributed table depending on given distribution configuration.
*/
static char
DecideReplicationModel(char distributionMethod, char *colocateWithTableName)
DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTableName)
{
if (distributionMethod == DISTRIBUTE_BY_NONE)
{
return REPLICATION_MODEL_2PC;
}
else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
!IsColocateWithNone(colocateWithTableName))
Assert(distributionMethod != DISTRIBUTE_BY_NONE);

if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false);
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/metadata_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName);
extern void CreateReferenceTable(Oid relationId);
extern void CreateTruncateTrigger(Oid relationId);
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);

Expand Down