Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Apr 19, 2023
1 parent b335134 commit e9365fa
Show file tree
Hide file tree
Showing 31 changed files with 1,431 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/backend/distributed/commands/distribute_object_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ static DistributeObjectOps Any_CreateForeignServer = {
static DistributeObjectOps Any_CreateSchema = {
.deparse = DeparseCreateSchemaStmt,
.qualify = NULL,
.preprocess = PreprocessCreateSchemaStmt,
.postprocess = NULL,
.preprocess = NULL,
.postprocess = PostprocessCreateSchemaStmt,
.operationType = DIST_OPS_CREATE,
.address = CreateSchemaStmtObjectAddress,
.markDistributed = true,
Expand Down
28 changes: 25 additions & 3 deletions src/backend/distributed/commands/schema.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "distributed/resource_lock.h"
#include <distributed/remote_commands.h>
#include <distributed/remote_commands.h>
#include "distributed/tenant_schema_metadata.h"
#include "distributed/version_compat.h"
#include "nodes/parsenodes.h"
#include "utils/fmgroids.h"
Expand All @@ -48,13 +49,14 @@ static List * GetGrantCommandsFromCreateSchemaStmt(Node *node);


/*
* PreprocessCreateSchemaStmt is called during the planning phase for
* PostprocessCreateSchemaStmt is called during the planning phase for
* CREATE SCHEMA ..
*/
List *
PreprocessCreateSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
PostprocessCreateSchemaStmt(Node *node, const char *queryString)
{
CreateSchemaStmt *createSchemaStmt = castNode(CreateSchemaStmt, node);

if (!ShouldPropagateCreateSchemaStmt())
{
return NIL;
Expand All @@ -74,6 +76,19 @@ PreprocessCreateSchemaStmt(Node *node, const char *queryString,

commands = list_concat(commands, GetGrantCommandsFromCreateSchemaStmt(node));

if (ShouldUseSchemaAsTenantSchema(createSchemaStmt->schemaname))
{
bool missingOk = false;
Oid schemaId = get_namespace_oid(createSchemaStmt->schemaname, missingOk);

/*
* Register the tenant schema on the coordinator and get the command
* to register it on the workers.
*/
char *remoteRegisterCommand = RegisterTenantSchema(schemaId);
commands = lappend(commands, remoteRegisterCommand);
}

commands = lappend(commands, ENABLE_DDL_PROPAGATION);

return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
Expand Down Expand Up @@ -110,6 +125,13 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString,
String *schemaVal = NULL;
foreach_ptr(schemaVal, distributedSchemas)
{
bool missingOk = false;
Oid schemaId = get_namespace_oid(strVal(schemaVal), missingOk);
if (IsTenantSchema(schemaId))
{
UnregisterTenantSchema(schemaId);
}

if (SchemaHasDistributedTableWithFKey(strVal(schemaVal)))
{
MarkInvalidateForeignKeyGraph();
Expand Down
233 changes: 233 additions & 0 deletions src/backend/distributed/commands/schema_based_sharding.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*-------------------------------------------------------------------------
* schema_based_sharding.c
*
* Routines for schema-based sharding.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"
#include "miscadmin.h"
#include "catalog/pg_namespace_d.h"
#include "distributed/backend_data.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/metadata/distobject.h"
#include "utils/lsyscache.h"


/* controlled via citus.enable_schema_based_sharding GUC */
bool EnableSchemaBasedSharding = false;


static void SetTenantSchemaColocationId(Oid schemaId, uint32 colocationId);


/*
* ShouldUseSchemaAsTenantSchema returns true if schema given name should be
* used as a tenant schema.
*/
bool
ShouldUseSchemaAsTenantSchema(char *schemaName)
{
if (!EnableSchemaBasedSharding)
{
return false;
}

if (IsBinaryUpgrade)
{
return false;
}

/*
* CREATE SCHEMA commands issued by internal backends are not meant to
* create tenant schemas but to sync metadata.
*/
if (IsCitusInternalBackend() || IsRebalancerInternalBackend())
{
return false;
}

/*
* Not do an oid comparison based on PG_PUBLIC_NAMESPACE because
* we want to treat "public" schema in the same way even if it's
* recreated.
*/
if (strcmp(schemaName, "public") == 0)
{
return false;
}

return true;
}


/*
* ShouldCreateTenantTable returns true if given parsetree is a CREATE TABLE
* statement and the table should be treated as a tenant table.
*/
bool
ShouldCreateTenantTable(Oid relationId)
{
if (IsBinaryUpgrade)
{
return false;
}

/*
* CREATE TABLE commands issued by internal backends are not meant to
* create tenant tables but to sync metadata.
*/
if (IsCitusInternalBackend() || IsRebalancerInternalBackend())
{
return false;
}

Oid schemaId = get_rel_namespace(relationId);
ObjectAddress *schemaAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*schemaAddress, NamespaceRelationId, schemaId);
if (IsAnyObjectAddressOwnedByExtension(list_make1(schemaAddress), NULL))
{
return false;
}

return IsTenantSchema(schemaId);
}


/*
* CreateTenantTable creates a tenant table with given relationId.
*
* This means creating a single shard distributed table without a shard
* key and colocating it with the other tables in its schema.
*/
void
CreateTenantTable(Oid relationId)
{
CheckCitusVersion(ERROR);

if (!IsCoordinator())
{
/*
* We don't support creating tenant tables from workers. We could
* let ShouldCreateTenantTable() to return false to allow users to
* create a local table as usual but that would be confusing because
* it might sound like we allow creating tenant tables from workers.
* For this reason, we prefer to throw an error instead.
*
* Indeed, CreateNullShardKeyDistTable() would already do so but we
* prefer to throw an error with a more meaningful message, rather
* than saying "operation is not allowed on this node".
*/
ereport(ERROR, (errmsg("cannot create a tenant table from a worker node"),
errhint("Connect to the coordinator node and try again.")));
}

/*
* Decide name of the table with lowest oid in the colocation group
* and use it as the colocate_with parameter.
*/
char *colocateWithTableName = "none";

/*
* Acquire default colocation lock to prevent concurrently forming
* multiple colocation groups for the same schema.
*
* Note that the lock is based on schemaId to avoid serializing
* default colocation group creation for all tenant schemas.
*/
Oid schemaId = get_rel_namespace(relationId);
AcquireCitusTenantSchemaDefaultColocationLock(schemaId);

uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
bool firstTableInSchema = (colocationId == INVALID_COLOCATION_ID);
if (!firstTableInSchema)
{
/*
* Release the lock if the schema is already associated with a
* colocation group.
*/
ReleaseCitusTenantSchemaDefaultColocationLock(schemaId);

Oid colocateWithTableId = ColocationGroupGetTableWithLowestOid(colocationId);
colocateWithTableName = generate_qualified_relation_name(colocateWithTableId);
}

CreateNullShardKeyDistTable(relationId, colocateWithTableName);

if (firstTableInSchema)
{
/*
* Save it into pg_dist_tenant_schema if this is the first tenant
* table in the schema.
*/
SetTenantSchemaColocationId(schemaId, TableColocationId(relationId));
}
}


/*
* RegisterTenantSchema registers given schema as a tenant schema locally and
* returns the command to do the same on the workers.
*/
char *
RegisterTenantSchema(Oid schemaId)
{
CheckCitusVersion(ERROR);

/* not assign a colocation id until creating the first table */
uint32 colocationId = INVALID_COLOCATION_ID;

InsertTenantSchemaLocally(schemaId, colocationId);

return TenantSchemaInsertCommand(schemaId, colocationId);
}


/*
* UnregisterTenantSchema deletes tenant schema metadata related to given
* schema.
*/
void
UnregisterTenantSchema(Oid schemaId)
{
DeleteTenantSchemaLocally(schemaId);

SendCommandToWorkersWithMetadataViaSuperUser(
TenantSchemaDeleteCommand(schemaId));
}


/*
* DisassociateTenantSchemaIfAny disassociates given colocation id from its
* tenant schema if any.
*/
void
DisassociateTenantSchemaIfAny(uint32 colocationId)
{
Oid schemaId = ColocationIdGetTenantSchemaId(colocationId);
if (OidIsValid(schemaId))
{
SetTenantSchemaColocationId(schemaId, INVALID_COLOCATION_ID);
}
}


/*
* SetTenantSchemaColocationId sets the colocation id of given tenant schema.
*
* If colocationId is INVALID_COLOCATION_ID, then the colocation_id column
* is set to NULL.
*/
static void
SetTenantSchemaColocationId(Oid schemaId, uint32 colocationId)
{
SetTenantSchemaColocationIdLocally(schemaId, colocationId);

SendCommandToWorkersWithMetadataViaSuperUser(
TenantSchemaSetColocationIdCommand(schemaId, colocationId));
}
11 changes: 10 additions & 1 deletion src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,

/*
* We're only interested in top-level CREATE TABLE commands
* to create a Citus managed table.
* to create a tenant table or a Citus managed table.
*/
Oid createdRelationId = InvalidOid;
if (context == PROCESS_UTILITY_TOPLEVEL &&
Expand All @@ -367,10 +367,19 @@ multi_ProcessUtility(PlannedStmt *pstmt,
NoLock, missingOk);
}

/*
* Check ShouldCreateTenantTable() before ShouldAddNewTableToMetadata()
* because we don't want to unnecessarily add the table into metadata
* (as a Citus managed table) before distributing it as a tenant table.
*/
if (!OidIsValid(createdRelationId))
{
/* no tables were created by this command, or it wasn't a top-level one */
}
else if (ShouldCreateTenantTable(createdRelationId))
{
CreateTenantTable(createdRelationId);
}
else if (ShouldAddNewTableToMetadata(createdRelationId))
{
/*
Expand Down
36 changes: 36 additions & 0 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ typedef struct MetadataCacheData
Oid distColocationRelationId;
Oid distColocationConfigurationIndexId;
Oid distPartitionRelationId;
Oid distTenantSchemaRelationId;
Oid distPartitionLogicalRelidIndexId;
Oid distPartitionColocationidIndexId;
Oid distShardLogicalRelidIndexId;
Expand All @@ -188,6 +189,8 @@ typedef struct MetadataCacheData
Oid distPlacementGroupidIndexId;
Oid distTransactionRelationId;
Oid distTransactionGroupIndexId;
Oid distTenantSchemaPrimaryKeyIndexId;
Oid distTenantSchemaUniqueColocationIdIndexId;
Oid citusCatalogNamespaceId;
Oid copyFormatTypeId;
Oid readIntermediateResultFuncId;
Expand Down Expand Up @@ -2843,6 +2846,39 @@ DistColocationConfigurationIndexId(void)
}


/* return oid of pg_dist_tenant_schema relation */
Oid
DistTenantSchemaRelationId(void)
{
CachedRelationLookup("pg_dist_tenant_schema",
&MetadataCache.distTenantSchemaRelationId);

return MetadataCache.distTenantSchemaRelationId;
}


/* return oid of pg_dist_tenant_schema_pkey index */
Oid
DistTenantSchemaPrimaryKeyIndexId(void)
{
CachedRelationLookup("pg_dist_tenant_schema_pkey",
&MetadataCache.distTenantSchemaPrimaryKeyIndexId);

return MetadataCache.distTenantSchemaPrimaryKeyIndexId;
}


/* return oid of pg_dist_tenant_schema_unique_colocationid_index index */
Oid
DistTenantSchemaUniqueColocationIdIndexId(void)
{
CachedRelationLookup("pg_dist_tenant_schema_unique_colocationid_index",
&MetadataCache.distTenantSchemaUniqueColocationIdIndexId);

return MetadataCache.distTenantSchemaUniqueColocationIdIndexId;
}


/* return oid of pg_dist_partition relation */
Oid
DistPartitionRelationId(void)
Expand Down
Loading

0 comments on commit e9365fa

Please sign in to comment.