-
Notifications
You must be signed in to change notification settings - Fork 671
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for schema-based-sharding via a GUC
Add citus.enable_schema_based_sharding GUC to enable schema-based sharding. Each schema created while this GUC is one will be considered as a tenant schema. Later on, regardless of whether the GUC is on or off, any table created in a tenant schema will be converted to a distributed table that don't have a shard key. All the tenant tables that belong to a particular schema will be co-located with each other and will have a shard count of 1. We introduce a new metadata table --pg_dist_tenant_schema-- to do the bookkeeping of tenant schemas: ```sql psql> \d pg_dist_tenant_schema Table "pg_catalog.pg_dist_tenant_schema" ┌───────────────┬─────────┬───────────┬──────────┬─────────┐ │ Column │ Type │ Collation │ Nullable │ Default │ ├───────────────┼─────────┼───────────┼──────────┼─────────┤ │ schema_id │ oid │ │ not null │ │ │ colocation_id │ integer │ │ │ │ └───────────────┴─────────┴───────────┴──────────┴─────────┘ Indexes: "pg_dist_tenant_schema_pkey" PRIMARY KEY, btree (schema_id) "pg_dist_tenant_schema_unique_colocationid_index" UNIQUE, btree (colocation_id) psql> table pg_dist_tenant_schema; ┌───────────┬───────────────┐ │ schema_id │ colocation_id │ ├───────────┼───────────────┤ │ 41963 │ │ │ 41962 │ 90 │ └───────────┴───────────────┘ (2 rows) ``` Colocation id column of pg_dist_tenant_schema is set to NULL for the tenant schemas that don't have a tenant table yet, and is set to the colocation id used to form a colocation group for the tenant tables of that schema when the first tenant table is created. When a tenant schema is dropped, we delete the corresponding row from pg_dist_tenant_schema. And when all the tenant tables of a tenant schema are dropped, we set the colocation id of the tenant back to NULL. We're building schema-based sharding on top of the infrastructure that adds support for creating distributed tables without a shard key. However, not all the operations that can be done on distributed tables without a shard key necessarily make sense (in the same way) in the context of schema-based sharding. For example, we need to think about what happens if user attempts altering schema of a tenant table. We will tackle such scenarios in a future PR. We will also add a new UDF --citus.schema_tenant_set() or such-- to allow users to use an existing tenant as a tenant schema, and another one --citus.schema_tenant_unset() or such-- to stop using a schema as a tenant schema in future PRs.
- Loading branch information
1 parent
8b45df4
commit dab92d7
Showing
32 changed files
with
1,445 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
233 changes: 233 additions & 0 deletions
233
src/backend/distributed/commands/schema_based_sharding.c
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
/*------------------------------------------------------------------------- | ||
* schema_based_sharding.c | ||
* | ||
* Routines for schema-based sharding. | ||
* | ||
*------------------------------------------------------------------------- | ||
*/ | ||
|
||
#include "postgres.h" | ||
#include "miscadmin.h" | ||
#include "catalog/pg_namespace_d.h" | ||
#include "distributed/backend_data.h" | ||
#include "distributed/colocation_utils.h" | ||
#include "distributed/commands.h" | ||
#include "distributed/metadata_cache.h" | ||
#include "distributed/metadata_sync.h" | ||
#include "distributed/tenant_schema_metadata.h" | ||
#include "distributed/metadata/distobject.h" | ||
#include "utils/lsyscache.h" | ||
|
||
|
||
/* controlled via citus.enable_schema_based_sharding GUC */ | ||
bool EnableSchemaBasedSharding = false; | ||
|
||
|
||
static void SetTenantSchemaColocationId(Oid schemaId, uint32 colocationId); | ||
|
||
|
||
/* | ||
* ShouldUseSchemaAsTenantSchema returns true if schema given name should be | ||
* used as a tenant schema. | ||
*/ | ||
bool | ||
ShouldUseSchemaAsTenantSchema(char *schemaName) | ||
{ | ||
if (!EnableSchemaBasedSharding) | ||
{ | ||
return false; | ||
} | ||
|
||
if (IsBinaryUpgrade) | ||
{ | ||
return false; | ||
} | ||
|
||
/* | ||
* CREATE SCHEMA commands issued by internal backends are not meant to | ||
* create tenant schemas but to sync metadata. | ||
*/ | ||
if (IsCitusInternalBackend() || IsRebalancerInternalBackend()) | ||
{ | ||
return false; | ||
} | ||
|
||
/* | ||
* Not do an oid comparison based on PG_PUBLIC_NAMESPACE because | ||
* we want to treat "public" schema in the same way even if it's | ||
* recreated. | ||
*/ | ||
if (strcmp(schemaName, "public") == 0) | ||
{ | ||
return false; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
|
||
/* | ||
* ShouldCreateTenantTable returns true if given parsetree is a CREATE TABLE | ||
* statement and the table should be treated as a tenant table. | ||
*/ | ||
bool | ||
ShouldCreateTenantTable(Oid relationId) | ||
{ | ||
if (IsBinaryUpgrade) | ||
{ | ||
return false; | ||
} | ||
|
||
/* | ||
* CREATE TABLE commands issued by internal backends are not meant to | ||
* create tenant tables but to sync metadata. | ||
*/ | ||
if (IsCitusInternalBackend() || IsRebalancerInternalBackend()) | ||
{ | ||
return false; | ||
} | ||
|
||
Oid schemaId = get_rel_namespace(relationId); | ||
ObjectAddress *schemaAddress = palloc0(sizeof(ObjectAddress)); | ||
ObjectAddressSet(*schemaAddress, NamespaceRelationId, schemaId); | ||
if (IsAnyObjectAddressOwnedByExtension(list_make1(schemaAddress), NULL)) | ||
{ | ||
return false; | ||
} | ||
|
||
return IsTenantSchema(schemaId); | ||
} | ||
|
||
|
||
/* | ||
* CreateTenantTable creates a tenant table with given relationId. | ||
* | ||
* This means creating a single shard distributed table without a shard | ||
* key and colocating it with the other tables in its schema. | ||
*/ | ||
void | ||
CreateTenantTable(Oid relationId) | ||
{ | ||
CheckCitusVersion(ERROR); | ||
|
||
if (!IsCoordinator()) | ||
{ | ||
/* | ||
* We don't support creating tenant tables from workers. We could | ||
* let ShouldCreateTenantTable() to return false to allow users to | ||
* create a local table as usual but that would be confusing because | ||
* it might sound like we allow creating tenant tables from workers. | ||
* For this reason, we prefer to throw an error instead. | ||
* | ||
* Indeed, CreateNullShardKeyDistTable() would already do so but we | ||
* prefer to throw an error with a more meaningful message, rather | ||
* than saying "operation is not allowed on this node". | ||
*/ | ||
ereport(ERROR, (errmsg("cannot create a tenant table from a worker node"), | ||
errhint("Connect to the coordinator node and try again."))); | ||
} | ||
|
||
/* | ||
* Decide name of the table with lowest oid in the colocation group | ||
* and use it as the colocate_with parameter. | ||
*/ | ||
char *colocateWithTableName = "none"; | ||
|
||
/* | ||
* Acquire default colocation lock to prevent concurrently forming | ||
* multiple colocation groups for the same schema. | ||
* | ||
* Note that the lock is based on schemaId to avoid serializing | ||
* default colocation group creation for all tenant schemas. | ||
*/ | ||
Oid schemaId = get_rel_namespace(relationId); | ||
AcquireCitusTenantSchemaDefaultColocationLock(schemaId); | ||
|
||
uint32 colocationId = SchemaIdGetTenantColocationId(schemaId); | ||
bool firstTableInSchema = (colocationId == INVALID_COLOCATION_ID); | ||
if (!firstTableInSchema) | ||
{ | ||
/* | ||
* Release the lock if the schema is already associated with a | ||
* colocation group. | ||
*/ | ||
ReleaseCitusTenantSchemaDefaultColocationLock(schemaId); | ||
|
||
Oid colocateWithTableId = ColocationGroupGetTableWithLowestOid(colocationId); | ||
colocateWithTableName = generate_qualified_relation_name(colocateWithTableId); | ||
} | ||
|
||
CreateNullShardKeyDistTable(relationId, colocateWithTableName); | ||
|
||
if (firstTableInSchema) | ||
{ | ||
/* | ||
* Save it into pg_dist_tenant_schema if this is the first tenant | ||
* table in the schema. | ||
*/ | ||
SetTenantSchemaColocationId(schemaId, TableColocationId(relationId)); | ||
} | ||
} | ||
|
||
|
||
/* | ||
* RegisterTenantSchema registers given schema as a tenant schema locally and | ||
* returns the command to do the same on the workers. | ||
*/ | ||
char * | ||
RegisterTenantSchema(Oid schemaId) | ||
{ | ||
CheckCitusVersion(ERROR); | ||
|
||
/* not assign a colocation id until creating the first table */ | ||
uint32 colocationId = INVALID_COLOCATION_ID; | ||
|
||
InsertTenantSchemaLocally(schemaId, colocationId); | ||
|
||
return TenantSchemaInsertCommand(schemaId, colocationId); | ||
} | ||
|
||
|
||
/* | ||
* UnregisterTenantSchema deletes tenant schema metadata related to given | ||
* schema. | ||
*/ | ||
void | ||
UnregisterTenantSchema(Oid schemaId) | ||
{ | ||
DeleteTenantSchemaLocally(schemaId); | ||
|
||
SendCommandToWorkersWithMetadataViaSuperUser( | ||
TenantSchemaDeleteCommand(schemaId)); | ||
} | ||
|
||
|
||
/* | ||
* DisassociateTenantSchemaIfAny disassociates given colocation id from its | ||
* tenant schema if any. | ||
*/ | ||
void | ||
DisassociateTenantSchemaIfAny(uint32 colocationId) | ||
{ | ||
Oid schemaId = ColocationIdGetTenantSchemaId(colocationId); | ||
if (OidIsValid(schemaId)) | ||
{ | ||
SetTenantSchemaColocationId(schemaId, INVALID_COLOCATION_ID); | ||
} | ||
} | ||
|
||
|
||
/* | ||
* SetTenantSchemaColocationId sets the colocation id of given tenant schema. | ||
* | ||
* If colocationId is INVALID_COLOCATION_ID, then the colocation_id column | ||
* is set to NULL. | ||
*/ | ||
static void | ||
SetTenantSchemaColocationId(Oid schemaId, uint32 colocationId) | ||
{ | ||
SetTenantSchemaColocationIdLocally(schemaId, colocationId); | ||
|
||
SendCommandToWorkersWithMetadataViaSuperUser( | ||
TenantSchemaSetColocationIdCommand(schemaId, colocationId)); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.