Skip to content

Commit

Permalink
Feature: add warehouse grammar and GUC variable (#111)
Browse files Browse the repository at this point in the history
Feature: add warehouse grammar and GUC variable

Since we are going to support storage/compute/catalog separation, we
need to add a warehouse logic to the system. Then we can use the
warehouse clause to specify the warehouse for a SQL query. A warehouse
is a group of segments, and we can create/alter/delete a warehouse.

This commit does the following things:

add the grammar for the warehouse clause.
add the GUC variable for the current warehouse.
add some fileter conditions for gp_segment_configuration table, since we
need to filter out the segments that are not in the current warehouse.
  • Loading branch information
roseduan authored Aug 10, 2023
1 parent a5eca8b commit 16b0b9d
Show file tree
Hide file tree
Showing 26 changed files with 444 additions and 120 deletions.
53 changes: 53 additions & 0 deletions doc/src/sgml/ref/create_warehouse.sgml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<!--
doc/src/sgml/ref/create_warehouse.sgml
PostgreSQL documentation
-->

<refentry id="sql-createwarehouse">
<indexterm zone="sql-createwarehouse">
<primary>CREATE WAREHOUSE</primary>
</indexterm>

<refmeta>
<refentrytitle>CREATE WAREHOUSE</refentrytitle>
<manvolnum>7</manvolnum>
<refmiscinfo>SQL - Language Statements</refmiscinfo>
</refmeta>

<refnamediv>
<refname>CREATE WAREHOUSE</refname>
<refpurpose>define a new warehouse for query processing</refpurpose>
</refnamediv>

<refsynopsisdiv>
<synopsis>
CREATE WAREHOUSE <replaceable class="parameter">name</replaceable>
[ WAREHOUSE_SIZE <replaceable class="parameter">size</replaceable> ]
</synopsis>
</refsynopsisdiv>

<refsect1>
<title>Description</title>

<para>
<command>CREATE WAREHOUSE</command> Creates a new virtual warehouse in the system.
Initial creation of a virtual warehouse might take some time to provision the compute resources.
</para>

</refsect1>

<refsect1>
<title>Parameters</title>
<variablelist>
<varlistentry>
<term><literal>WAREHOUSE_SIZE</literal></term>
<listitem>
<para>
The size of the warehouse.
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>

</refentry>
41 changes: 26 additions & 15 deletions gpcontrib/gp_inject_fault/gp_inject_fault.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,35 +119,46 @@ get_segment_configuration(int dbid, char **hostname, int *port, int *content)
SysScanDesc scan;
Datum attr;
bool isNull;
Oid warehouseid = InvalidOid;
bool find_config = false;

configrel = table_open(GpSegmentConfigRelationId, AccessShareLock);
ScanKeyInit(&scankey[0],
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(dbid));
scan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId, true,
scan = systable_beginscan(configrel, GpSegmentConfigDbidWarehouseIndexId, true,
NULL, 1, scankey);

tuple = systable_getnext(scan);

if (HeapTupleIsValid(tuple))
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
attr = heap_getattr(tuple, Anum_gp_segment_configuration_warehouseid,
RelationGetDescr(configrel), &isNull);
Assert(!isNull);
*hostname = TextDatumGetCString(attr);
warehouseid = DatumGetObjectId(attr);

attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
RelationGetDescr(configrel), &isNull);
Assert(!isNull);
*port = DatumGetInt32(attr);
if (!OidIsValid(warehouseid) || warehouseid == GetCurrentWarehouseId())
{
attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
RelationGetDescr(configrel), &isNull);
Assert(!isNull);
*hostname = TextDatumGetCString(attr);

attr = heap_getattr(tuple, Anum_gp_segment_configuration_content,
RelationGetDescr(configrel), &isNull);
Assert(!isNull);
*content = DatumGetInt32(attr);
attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
RelationGetDescr(configrel), &isNull);
Assert(!isNull);
*port = DatumGetInt32(attr);

attr = heap_getattr(tuple, Anum_gp_segment_configuration_content,
RelationGetDescr(configrel), &isNull);
Assert(!isNull);
*content = DatumGetInt32(attr);

find_config = true;
break;
}
}
else
if (!find_config)
elog(ERROR, "dbid %d not found", dbid);

systable_endscan(scan);
Expand Down
1 change: 1 addition & 0 deletions src/backend/catalog/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ CATALOG_HEADERS := \
pg_resqueue.h pg_resqueuecapability.h pg_resourcetype.h \
pg_resgroup.h pg_resgroupcapability.h \
gp_configuration_history.h gp_id.h gp_distribution_policy.h gp_version_at_initdb.h \
gp_warehouse.h \
pg_appendonly.h \
gp_fastsequence.h pg_extprotocol.h \
pg_attribute_encoding.h \
Expand Down
13 changes: 11 additions & 2 deletions src/backend/catalog/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "catalog/gp_configuration_history.h"
#include "catalog/gp_id.h"
#include "catalog/gp_version_at_initdb.h"
#include "catalog/gp_warehouse.h"
#include "catalog/pg_event_trigger.h"
#include "catalog/pg_largeobject_metadata.h"
#include "catalog/pg_resourcetype.h"
Expand Down Expand Up @@ -459,8 +460,8 @@ IsSharedRelation(Oid relationId)
relationId == AuthIdRolResQueueIndexId ||
relationId == AuthIdRolResGroupIndexId ||
#ifdef USE_INTERNAL_FTS
relationId == GpSegmentConfigContentPreferred_roleIndexId ||
relationId == GpSegmentConfigDbidIndexId ||
relationId == GpSegmentConfigContentPreferred_roleWarehouseIndexId ||
relationId == GpSegmentConfigDbidWarehouseIndexId ||
#endif
relationId == AuthTimeConstraintAuthIdIndexId)
{
Expand Down Expand Up @@ -505,6 +506,14 @@ IsSharedRelation(Oid relationId)
return true;
}

/* warehouse table and its indexes */
if (relationId == GpWarehouseRelationId ||
relationId == GpWarehouseOidIndexId ||
relationId == GpWarehouseNameIndexId)
{
return true;
}

return false;
}

Expand Down
180 changes: 107 additions & 73 deletions src/backend/cdb/cdbutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ readGpSegConfigFromCatalog(int *total_dbs)
int array_size;
bool isNull;
Datum attr;
Oid warehouseid = InvalidOid;
Relation gp_seg_config_rel;
HeapTuple gp_seg_config_tuple = NULL;
SysScanDesc gp_seg_config_scan;
Expand All @@ -256,6 +257,13 @@ readGpSegConfigFromCatalog(int *total_dbs)

while (HeapTupleIsValid(gp_seg_config_tuple = systable_getnext(gp_seg_config_scan)))
{
/* warehouseid */
attr = heap_getattr(gp_seg_config_tuple, Anum_gp_segment_configuration_warehouseid, RelationGetDescr(gp_seg_config_rel), &isNull);
Assert(!isNull);
warehouseid = DatumGetObjectId(attr);
if (OidIsValid(warehouseid) && warehouseid != GetCurrentWarehouseId())
continue;

config = &configs[idx];

/* dbid */
Expand Down Expand Up @@ -1511,93 +1519,101 @@ dbid_get_dbinfo(int16 dbid)
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(dbid));
scan = systable_beginscan(rel, GpSegmentConfigDbidIndexId, true,
scan = systable_beginscan(rel, GpSegmentConfigDbidWarehouseIndexId, true,
NULL, 1, &scankey);

tuple = systable_getnext(scan);
if (HeapTupleIsValid(tuple))
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Datum attr;
bool isNull;
Oid warehouseid = InvalidOid;

i = palloc(sizeof(GpSegConfigEntry));

/*
* dbid
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_dbid,
attr = heap_getattr(tuple, Anum_gp_segment_configuration_warehouseid,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->dbid = DatumGetInt16(attr);
warehouseid = DatumGetObjectId(attr);

/*
* content
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_content,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->segindex = DatumGetInt16(attr);
if (!OidIsValid(warehouseid) || warehouseid == GetCurrentWarehouseId())
{
i = palloc(sizeof(GpSegConfigEntry));

/*
* role
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->role = DatumGetChar(attr);
/*
* dbid
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_dbid,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->dbid = DatumGetInt16(attr);

/*
* preferred-role
*/
attr = heap_getattr(tuple,
Anum_gp_segment_configuration_preferred_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->preferred_role = DatumGetChar(attr);
/*
* content
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_content,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->segindex = DatumGetInt16(attr);

/*
* mode
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_mode,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->mode = DatumGetChar(attr);
/*
* role
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->role = DatumGetChar(attr);

/*
* status
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_status,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->status = DatumGetChar(attr);
/*
* preferred-role
*/
attr = heap_getattr(tuple,
Anum_gp_segment_configuration_preferred_role,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->preferred_role = DatumGetChar(attr);

/*
* hostname
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->hostname = TextDatumGetCString(attr);
/*
* mode
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_mode,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->mode = DatumGetChar(attr);

/*
* address
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_address,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->address = TextDatumGetCString(attr);
/*
* status
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_status,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->status = DatumGetChar(attr);

/*
* port
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->port = DatumGetInt32(attr);
/*
* hostname
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->hostname = TextDatumGetCString(attr);

Assert(systable_getnext(scan) == NULL); /* should be only 1 */
/*
* address
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_address,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->address = TextDatumGetCString(attr);

/*
* port
*/
attr = heap_getattr(tuple, Anum_gp_segment_configuration_port,
RelationGetDescr(rel), &isNull);
Assert(!isNull);
i->port = DatumGetInt32(attr);

break;
}
}
else
if (i == NULL)
{
elog(ERROR, "could not find configuration entry for dbid %i", dbid);
}
Expand All @@ -1619,7 +1635,8 @@ contentid_get_dbid(int16 contentid, char role, bool getPreferredRoleNotCurrentRo
{
int16 dbid = 0;
Relation rel;
ScanKeyData scankey[2];
ScanKeyData scankey[3];
int nkeys = 2;
SysScanDesc scan;
HeapTuple tup;

Expand Down Expand Up @@ -1648,8 +1665,17 @@ contentid_get_dbid(int16 contentid, char role, bool getPreferredRoleNotCurrentRo
Anum_gp_segment_configuration_preferred_role,
BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(role));
scan = systable_beginscan(rel, GpSegmentConfigContentPreferred_roleIndexId, true,
NULL, 2, scankey);
if (contentid != MASTER_CONTENT_ID)
{
nkeys++;
ScanKeyInit(&scankey[2],
Anum_gp_segment_configuration_warehouseid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(GetCurrentWarehouseId()));
}

scan = systable_beginscan(rel, GpSegmentConfigContentPreferred_roleWarehouseIndexId, true,
NULL, nkeys, scankey);
}
else
{
Expand All @@ -1665,9 +1691,17 @@ contentid_get_dbid(int16 contentid, char role, bool getPreferredRoleNotCurrentRo
Anum_gp_segment_configuration_role,
BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(role));
if (contentid != MASTER_CONTENT_ID)
{
nkeys++;
ScanKeyInit(&scankey[2],
Anum_gp_segment_configuration_warehouseid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(GetCurrentWarehouseId()));
}
/* no index */
scan = systable_beginscan(rel, InvalidOid, false,
NULL, 2, scankey);
NULL, nkeys, scankey);
}

tup = systable_getnext(scan);
Expand Down
Loading

0 comments on commit 16b0b9d

Please sign in to comment.