Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for schema-based-sharding via a GUC #6866

Merged
merged 31 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21bc76b
Refactor the logic that automatically creates Citus managed tables
onurctirtir Apr 19, 2023
7041e5d
Refactor CreateSingleShardTable() to allow specifying colocation id i…
onurctirtir May 5, 2023
b2421b1
Add support for schema-based-sharding via a GUC
onurctirtir May 4, 2023
200cddb
rename citus_internal_insert_tenant_schema to citus_internal_add_tena…
onurctirtir May 5, 2023
0506131
early assign and keep the colocation id for tenant schemas
onurctirtir May 5, 2023
a0e3504
rename colocation_id -> colocationid & schema_id -> schemaid
onurctirtir May 10, 2023
85ea03e
address comments
onurctirtir May 10, 2023
cf2f870
improve
onurctirtir May 10, 2023
c59180e
remove ColocationGroupGetTableWithLowestOid as it's not used anymore
onurctirtir May 10, 2023
3bae55a
refactor auto-convert-table logic
onurctirtir May 16, 2023
db6b27d
delete tenant schema record on schema-drop
onurctirtir May 16, 2023
96770f7
improve tests
onurctirtir May 16, 2023
ac4c1b0
fix drop owned by
onurctirtir May 16, 2023
eddf40a
improve tests
onurctirtir May 16, 2023
3abcbbd
remove superuser checks
onurctirtir May 17, 2023
7d9ee62
fix metadata sync
onurctirtir May 17, 2023
45b631f
fix a distributed deadlock
onurctirtir May 17, 2023
5162a89
Revert "fix a distributed deadlock"
onurctirtir May 17, 2023
5865531
locally cleanup pg_dist_colocation
onurctirtir May 22, 2023
fc4a4f1
Revert "locally cleanup pg_dist_colocation"
onurctirtir May 23, 2023
8b9af32
use a global variant
onurctirtir May 23, 2023
670f868
address feedback
onurctirtir May 23, 2023
ef269ee
address feedback
onurctirtir May 23, 2023
9aa978b
address feedback
onurctirtir May 23, 2023
6692f3a
Merge branch 'main' into schema-based-sharding-via-guc
onurctirtir May 23, 2023
e8e1f41
Merge branch 'main' into schema-based-sharding-via-guc
onurctirtir May 23, 2023
6ca3ffa
properly escape schema name
onurctirtir May 24, 2023
bdd0783
fix/test partitioning and test foreign/temp tables
onurctirtir May 24, 2023
b355e65
more tests
onurctirtir May 24, 2023
23687e7
handle / throw an error for more create table / create schema variants
onurctirtir May 25, 2023
8eefa99
fix typos and remove an unnecessary #include
onurctirtir May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#include "postgres.h"
#include "miscadmin.h"

#include "access/genam.h"
#include "access/htup_details.h"
Expand Down Expand Up @@ -1500,3 +1501,38 @@ FinalizeCitusLocalTableCreation(Oid relationId)
InvalidateForeignKeyGraph();
}
}


/*
* ShouldAddNewTableToMetadata takes a relationId and returns true if we need to add a
* newly created table to metadata, false otherwise.
* For partitions and temporary tables, ShouldAddNewTableToMetadata returns false.
* For other tables created, returns true, if we are on a coordinator that is added
* as worker, and ofcourse, if the GUC use_citus_managed_tables is set to on.
*/
bool
ShouldAddNewTableToMetadata(Oid relationId)
{
if (get_rel_persistence(relationId) == RELPERSISTENCE_TEMP ||
PartitionTableNoLock(relationId))
{
/*
* Shouldn't add table to metadata if it's a temp table, or a partition.
* Creating partitions of a table that is added to metadata is already handled.
*/
return false;
}

if (AddAllLocalTablesToMetadata && !IsBinaryUpgrade &&
IsCoordinator() && CoordinatorAddedAsWorkerNode())
{
/*
* We have verified that the GUC is set to true, and we are not upgrading,
* and we are on the coordinator that is added as worker node.
* So return true here, to add this newly created table to metadata.
*/
return true;
}

return false;
}
77 changes: 59 additions & 18 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ typedef struct
{
int shardCount;
bool shardCountIsStrict;
char *colocateWithTableName;
char *distributionColumnName;
ColocationParam colocationParam;
} DistributedTableParams;


Expand Down Expand Up @@ -296,7 +296,11 @@ create_distributed_table(PG_FUNCTION_ARGS)
"when the distribution column is null ")));
}

CreateSingleShardTable(relationId, colocateWithTableName);
ColocationParam colocationParam = {
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
.colocateWithTableName = colocateWithTableName,
};
CreateSingleShardTable(relationId, colocationParam);
}

PG_RETURN_VOID();
Expand Down Expand Up @@ -1006,7 +1010,10 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
}

DistributedTableParams distributedTableParams = {
.colocateWithTableName = colocateWithTableName,
.colocationParam = {
.colocateWithTableName = colocateWithTableName,
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT
},
.shardCount = shardCount,
.shardCountIsStrict = shardCountIsStrict,
.distributionColumnName = distributionColumnName
Expand All @@ -1031,10 +1038,10 @@ CreateReferenceTable(Oid relationId)
* single shard distributed table that doesn't have a shard key.
*/
void
CreateSingleShardTable(Oid relationId, char *colocateWithTableName)
CreateSingleShardTable(Oid relationId, ColocationParam colocationParam)
{
DistributedTableParams distributedTableParams = {
.colocateWithTableName = colocateWithTableName,
.colocationParam = colocationParam,
.shardCount = 1,
.shardCountIsStrict = true,
.distributionColumnName = NULL
Expand Down Expand Up @@ -1155,9 +1162,23 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
* our caller already acquired lock on relationId.
*/
uint32 colocationId = ColocationIdForNewTable(relationId, tableType,
distributedTableParams,
distributionColumn);
uint32 colocationId = INVALID_COLOCATION_ID;
if (distributedTableParams &&
distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_COLOCATION_ID)
{
colocationId = distributedTableParams->colocationParam.colocationId;
}
else
{
/*
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
* our caller already acquired lock on relationId.
*/
colocationId = ColocationIdForNewTable(relationId, tableType,
distributedTableParams,
distributionColumn);
}

EnsureRelationCanBeDistributed(relationId, distributionColumn,
citusTableParams.distributionMethod,
Expand Down Expand Up @@ -1257,7 +1278,10 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
MemoryContextReset(citusPartitionContext);

DistributedTableParams childDistributedTableParams = {
.colocateWithTableName = parentRelationName,
.colocationParam = {
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
.colocateWithTableName = parentRelationName,
},
.shardCount = distributedTableParams->shardCount,
.shardCountIsStrict = false,
.distributionColumnName = distributedTableParams->distributionColumnName,
Expand Down Expand Up @@ -1308,30 +1332,39 @@ DecideCitusTableParams(CitusTableType tableType,
{
case HASH_DISTRIBUTED:
{
Assert(distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_TABLE_LIKE_OPT);

citusTableParams.distributionMethod = DISTRIBUTE_BY_HASH;
citusTableParams.replicationModel =
DecideDistTableReplicationModel(DISTRIBUTE_BY_HASH,
distributedTableParams->
distributedTableParams->colocationParam.
colocateWithTableName);
break;
}

case APPEND_DISTRIBUTED:
{
Assert(distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_TABLE_LIKE_OPT);

citusTableParams.distributionMethod = DISTRIBUTE_BY_APPEND;
citusTableParams.replicationModel =
DecideDistTableReplicationModel(APPEND_DISTRIBUTED,
distributedTableParams->
distributedTableParams->colocationParam.
colocateWithTableName);
break;
}

case RANGE_DISTRIBUTED:
{
Assert(distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_TABLE_LIKE_OPT);

citusTableParams.distributionMethod = DISTRIBUTE_BY_RANGE;
citusTableParams.replicationModel =
DecideDistTableReplicationModel(RANGE_DISTRIBUTED,
distributedTableParams->
distributedTableParams->colocationParam.
colocateWithTableName);
break;
}
Expand Down Expand Up @@ -1768,7 +1801,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,

if (tableType == APPEND_DISTRIBUTED || tableType == RANGE_DISTRIBUTED)
{
if (!IsColocateWithDefault(distributedTableParams->colocateWithTableName))
Assert(distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_TABLE_LIKE_OPT);
char *colocateWithTableName =
distributedTableParams->colocationParam.colocateWithTableName;
if (!IsColocateWithDefault(colocateWithTableName))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
Expand All @@ -1795,8 +1832,13 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;

Assert(distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_TABLE_LIKE_OPT);
char *colocateWithTableName =
distributedTableParams->colocationParam.colocateWithTableName;

/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
if (IsColocateWithDefault(colocateWithTableName))
{
AcquireColocationDefaultLock();
}
Expand All @@ -1808,10 +1850,9 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
distributedTableParams->shardCount,
distributedTableParams->
shardCountIsStrict,
distributedTableParams->
colocateWithTableName);

if (IsColocateWithDefault(distributedTableParams->colocateWithTableName) &&
if (IsColocateWithDefault(colocateWithTableName) &&
(colocationId != INVALID_COLOCATION_ID))
{
/*
Expand All @@ -1824,7 +1865,7 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,

if (colocationId == INVALID_COLOCATION_ID)
{
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
if (IsColocateWithDefault(colocateWithTableName))
{
/*
* Generate a new colocation ID and insert a pg_dist_colocation
Expand All @@ -1835,7 +1876,7 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
distributionColumnType,
distributionColumnCollation);
}
else if (IsColocateWithNone(distributedTableParams->colocateWithTableName))
else if (IsColocateWithNone(colocateWithTableName))
{
/*
* Generate a new colocation ID and insert a pg_dist_colocation
Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/commands/distribute_object_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ static DistributeObjectOps Any_CreateForeignServer = {
static DistributeObjectOps Any_CreateSchema = {
.deparse = DeparseCreateSchemaStmt,
.qualify = NULL,
.preprocess = PreprocessCreateSchemaStmt,
.postprocess = NULL,
.preprocess = NULL,
.postprocess = PostprocessCreateSchemaStmt,
.operationType = DIST_OPS_CREATE,
.address = CreateSchemaStmtObjectAddress,
.markDistributed = true,
Expand Down
22 changes: 21 additions & 1 deletion src/backend/distributed/commands/drop_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,27 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)

DeletePartitionRow(relationId);

DeleteColocationGroupIfNoTablesBelong(colocationId);
/*
* We want to keep using the same colocation group for the tenant even if
* all the tables that belong to it are dropped and new tables are created
* for the tenant etc. For this reason, if a colocation group belongs to a
* tenant schema, we don't delete the colocation group even if there are no
* tables that belong to it.
*
* We do the same if system catalog cannot find the schema of the table
* because this means that the whole schema is dropped.
*
* In that case, we want to delete the colocation group regardless of
* whether the schema is a tenant schema or not. Even more, calling
* IsTenantSchema() with InvalidOid would cause an error, hence we check
* whether the schema is valid beforehand.
*/
bool missingOk = true;
Oid schemaId = get_namespace_oid(schemaName, missingOk);
if (!OidIsValid(schemaId) || !IsTenantSchema(schemaId))
{
DeleteColocationGroupIfNoTablesBelong(colocationId);
}

PG_RETURN_VOID();
}
Expand Down
31 changes: 28 additions & 3 deletions src/backend/distributed/commands/schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "catalog/pg_namespace.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include <distributed/connection_management.h>
#include "distributed/commands/utility_hook.h"
Expand All @@ -33,6 +34,7 @@
#include "distributed/resource_lock.h"
#include <distributed/remote_commands.h>
#include <distributed/remote_commands.h>
#include "distributed/tenant_schema_metadata.h"
#include "distributed/version_compat.h"
#include "nodes/parsenodes.h"
#include "utils/fmgroids.h"
Expand All @@ -48,13 +50,14 @@ static List * GetGrantCommandsFromCreateSchemaStmt(Node *node);


/*
* PreprocessCreateSchemaStmt is called during the planning phase for
* PostprocessCreateSchemaStmt is called during the planning phase for
* CREATE SCHEMA ..
*/
List *
PreprocessCreateSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
PostprocessCreateSchemaStmt(Node *node, const char *queryString)
{
CreateSchemaStmt *createSchemaStmt = castNode(CreateSchemaStmt, node);

if (!ShouldPropagateCreateSchemaStmt())
{
return NIL;
Expand All @@ -74,6 +77,28 @@ PreprocessCreateSchemaStmt(Node *node, const char *queryString,

commands = list_concat(commands, GetGrantCommandsFromCreateSchemaStmt(node));

if (ShouldUseSchemaBasedSharding(createSchemaStmt->schemaname))
{
bool missingOk = false;
Oid schemaId = get_namespace_oid(createSchemaStmt->schemaname, missingOk);

/*
* Register the tenant schema on the coordinator and save the command
* to register it on the workers.
*/
int shardCount = 1;
int replicationFactor = 1;
Oid distributionColumnType = InvalidOid;
Oid distributionColumnCollation = InvalidOid;
uint32 colocationId = CreateColocationGroup(
shardCount, replicationFactor, distributionColumnType,
distributionColumnCollation);

InsertTenantSchemaLocally(schemaId, colocationId);

commands = lappend(commands, TenantSchemaInsertCommand(schemaId, colocationId));
}

commands = lappend(commands, ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
Expand Down
Loading