Skip to content

Commit

Permalink
Support partitioning for dist tables with null dist keys (#6778)
Browse files Browse the repository at this point in the history
DESCRIPTION: Support partitioning for distributed tables with null
distribution keys

Adds support for
* Creating new partitions after distributing (with null key) the parent
table
* Attaching partitions to a distributed table with null distribution key
(and automatically distribute the new partition with null key as well)
* Detaching partitions from it
  • Loading branch information
agedemenli authored Mar 21, 2023
1 parent a4cfc32 commit 0be33c2
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 13 deletions.
3 changes: 1 addition & 2 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
Expand Down Expand Up @@ -1082,7 +1081,7 @@ CreateReferenceTable(Oid relationId)
* CreateNullShardKeyDistTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
*/
static void
void
CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName)
{
DistributedTableParams distributedTableParams = {
Expand Down
19 changes: 18 additions & 1 deletion src/backend/distributed/commands/table.c
Original file line number Diff line number Diff line change
Expand Up @@ -384,18 +384,30 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
*/
if (IsCitusTable(parentRelationId))
{
/*
* We can create Citus local tables and distributed tables with null shard keys
* right away, without switching to sequential mode, because they are going to
* have only one shard.
*/
if (IsCitusTableType(parentRelationId, CITUS_LOCAL_TABLE))
{
CreateCitusLocalTablePartitionOf(createStatement, relationId,
parentRelationId);
return;
}

char *parentRelationName = generate_qualified_relation_name(parentRelationId);

if (IsCitusTableType(parentRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
CreateNullShardKeyDistTable(relationId, parentRelationName);
return;
}

Var *parentDistributionColumn = DistPartitionKeyOrError(parentRelationId);
char *distributionColumnName =
ColumnToColumnName(parentRelationId, (Node *) parentDistributionColumn);
char parentDistributionMethod = DISTRIBUTE_BY_HASH;
char *parentRelationName = generate_qualified_relation_name(parentRelationId);

SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(parentRelationId,
relationId);
Expand Down Expand Up @@ -525,6 +537,11 @@ PreprocessAttachPartitionToCitusTable(Oid parentRelationId, Oid partitionRelatio
CreateCitusLocalTable(partitionRelationId, cascadeViaForeignKeys,
autoConverted);
}
else if (IsCitusTableType(parentRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
char *parentName = generate_qualified_relation_name(parentRelationId);
CreateNullShardKeyDistTable(partitionRelationId, parentName);
}
else if (IsCitusTableType(parentRelationId, DISTRIBUTED_TABLE))
{
DistributePartitionUsingParent(parentRelationId, partitionRelationId);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/multi_join_order.c
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ DistPartitionKeyOrError(Oid relationId)
if (partitionKey == NULL)
{
ereport(ERROR, (errmsg(
"no distribution column found for relation %d, because it is a reference table",
"no distribution column found for relation %d",
relationId)));
}

Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/metadata_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName);
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName);
Expand Down
99 changes: 90 additions & 9 deletions src/test/regress/expected/create_null_dist_key.out
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,87 @@ SELECT create_distributed_table('sensors', NULL, distribution_type=>null);

(1 row)

-- verify we can create new partitions after distributing the parent table
CREATE TABLE sensors_2001 PARTITION OF sensors FOR VALUES FROM ('2001-01-01') TO ('2002-01-01');
-- verify we can attach to a null dist key table
CREATE TABLE sensors_2002 (measureid integer, eventdatetime date, measure_data jsonb, PRIMARY KEY (measureid, eventdatetime, measure_data));
ALTER TABLE sensors ATTACH PARTITION sensors_2002 FOR VALUES FROM ('2002-01-01') TO ('2003-01-01');
-- verify we can detach from a null dist key table
ALTER TABLE sensors DETACH PARTITION sensors_2001;
-- error out when attaching a noncolocated partition
CREATE TABLE sensors_2003 (measureid integer, eventdatetime date, measure_data jsonb, PRIMARY KEY (measureid, eventdatetime, measure_data));
SELECT create_distributed_table('sensors_2003', NULL, distribution_type=>null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------

(1 row)

ALTER TABLE sensors ATTACH PARTITION sensors_2003 FOR VALUES FROM ('2003-01-01') TO ('2004-01-01');
ERROR: distributed tables cannot have non-colocated distributed tables as a partition
DROP TABLE sensors_2003;
-- verify we can attach after distributing, if the parent and partition are colocated
CREATE TABLE sensors_2004 (measureid integer, eventdatetime date, measure_data jsonb, PRIMARY KEY (measureid, eventdatetime, measure_data));
SELECT create_distributed_table('sensors_2004', NULL, distribution_type=>null, colocate_with=>'sensors');
create_distributed_table
---------------------------------------------------------------------

(1 row)

ALTER TABLE sensors ATTACH PARTITION sensors_2004 FOR VALUES FROM ('2004-01-01') TO ('2005-01-01');
-- check metadata
-- check all partitions and the parent on pg_dist_partition
SELECT logicalrelid::text FROM pg_dist_partition WHERE logicalrelid::text IN ('sensors', 'sensors_2000', 'sensors_2001', 'sensors_2002', 'sensors_2004') ORDER BY logicalrelid::text;
logicalrelid
---------------------------------------------------------------------
sensors
sensors_2000
sensors_2001
sensors_2002
sensors_2004
(5 rows)

-- verify they are all colocated
SELECT COUNT(DISTINCT(colocationid)) FROM pg_dist_partition WHERE logicalrelid::text IN ('sensors', 'sensors_2000', 'sensors_2001', 'sensors_2002', 'sensors_2004');
count
---------------------------------------------------------------------
1
(1 row)

-- verify all partitions are placed on the same node
SELECT COUNT(DISTINCT(groupid)) FROM pg_dist_placement WHERE shardid IN
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text IN ('sensors', 'sensors_2000', 'sensors_2001', 'sensors_2002', 'sensors_2004'));
count
---------------------------------------------------------------------
1
(1 row)

-- verify the shard of sensors_2000 is attached to the parent shard, on the worker node
SELECT COUNT(*) FROM run_command_on_workers($$
SELECT relpartbound FROM pg_class WHERE relname LIKE 'sensors_2000_1______';$$)
WHERE length(result) > 0;
count
---------------------------------------------------------------------
1
(1 row)

-- verify the shard of sensors_2001 is detached from the parent shard, on the worker node
SELECT COUNT(*) FROM run_command_on_workers($$
SELECT relpartbound FROM pg_class WHERE relname LIKE 'sensors_2001_1______';$$)
WHERE length(result) > 0;
count
---------------------------------------------------------------------
0
(1 row)

-- verify the shard of sensors_2002 is attached to the parent shard, on the worker node
SELECT COUNT(*) FROM run_command_on_workers($$
SELECT relpartbound FROM pg_class WHERE relname LIKE 'sensors_2002_1______';$$)
WHERE length(result) > 0;
count
---------------------------------------------------------------------
1
(1 row)

CREATE TABLE multi_level_partitioning_parent(
measureid integer,
eventdatetime date,
Expand Down Expand Up @@ -761,7 +842,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730049"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730053"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
Expand Down Expand Up @@ -807,7 +888,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730085"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730089"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
Expand Down Expand Up @@ -925,8 +1006,8 @@ SELECT result, success FROM run_command_on_workers($$
$$);
result | success
---------------------------------------------------------------------
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730102" | f
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730102" | f
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730106" | f
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730106" | f
(2 rows)

DROP TABLE referencing_table, referenced_table;
Expand All @@ -941,8 +1022,8 @@ SELECT create_distributed_table('self_fkey_test', NULL, distribution_type=>null)

INSERT INTO self_fkey_test VALUES (1, 1); -- ok
INSERT INTO self_fkey_test VALUES (2, 3); -- fails
ERROR: insert or update on table "self_fkey_test_1730103" violates foreign key constraint "self_fkey_test_b_fkey_1730103"
DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730103".
ERROR: insert or update on table "self_fkey_test_1730107" violates foreign key constraint "self_fkey_test_b_fkey_1730107"
DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730107".
CONTEXT: while executing command on localhost:xxxxx
-- similar foreign key tests but this time create the referencing table later on
-- referencing table is a null shard key table
Expand All @@ -966,7 +1047,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730105"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730109"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
Expand All @@ -989,7 +1070,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (2, 1);
-- fails
INSERT INTO referencing_table VALUES (1, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730107"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730111"
DETAIL: Key (a, b)=(1, 2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
Expand Down Expand Up @@ -1078,7 +1159,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730146"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730150"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
Expand Down
45 changes: 45 additions & 0 deletions src/test/regress/sql/create_null_dist_key.sql
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,51 @@ SELECT create_distributed_table('sensors_2000', NULL, distribution_type=>null);

SELECT create_distributed_table('sensors', NULL, distribution_type=>null);

-- verify we can create new partitions after distributing the parent table
CREATE TABLE sensors_2001 PARTITION OF sensors FOR VALUES FROM ('2001-01-01') TO ('2002-01-01');

-- verify we can attach to a null dist key table
CREATE TABLE sensors_2002 (measureid integer, eventdatetime date, measure_data jsonb, PRIMARY KEY (measureid, eventdatetime, measure_data));
ALTER TABLE sensors ATTACH PARTITION sensors_2002 FOR VALUES FROM ('2002-01-01') TO ('2003-01-01');

-- verify we can detach from a null dist key table
ALTER TABLE sensors DETACH PARTITION sensors_2001;

-- error out when attaching a noncolocated partition
CREATE TABLE sensors_2003 (measureid integer, eventdatetime date, measure_data jsonb, PRIMARY KEY (measureid, eventdatetime, measure_data));
SELECT create_distributed_table('sensors_2003', NULL, distribution_type=>null, colocate_with=>'none');
ALTER TABLE sensors ATTACH PARTITION sensors_2003 FOR VALUES FROM ('2003-01-01') TO ('2004-01-01');
DROP TABLE sensors_2003;

-- verify we can attach after distributing, if the parent and partition are colocated
CREATE TABLE sensors_2004 (measureid integer, eventdatetime date, measure_data jsonb, PRIMARY KEY (measureid, eventdatetime, measure_data));
SELECT create_distributed_table('sensors_2004', NULL, distribution_type=>null, colocate_with=>'sensors');
ALTER TABLE sensors ATTACH PARTITION sensors_2004 FOR VALUES FROM ('2004-01-01') TO ('2005-01-01');

-- check metadata
-- check all partitions and the parent on pg_dist_partition
SELECT logicalrelid::text FROM pg_dist_partition WHERE logicalrelid::text IN ('sensors', 'sensors_2000', 'sensors_2001', 'sensors_2002', 'sensors_2004') ORDER BY logicalrelid::text;
-- verify they are all colocated
SELECT COUNT(DISTINCT(colocationid)) FROM pg_dist_partition WHERE logicalrelid::text IN ('sensors', 'sensors_2000', 'sensors_2001', 'sensors_2002', 'sensors_2004');
-- verify all partitions are placed on the same node
SELECT COUNT(DISTINCT(groupid)) FROM pg_dist_placement WHERE shardid IN
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid::text IN ('sensors', 'sensors_2000', 'sensors_2001', 'sensors_2002', 'sensors_2004'));

-- verify the shard of sensors_2000 is attached to the parent shard, on the worker node
SELECT COUNT(*) FROM run_command_on_workers($$
SELECT relpartbound FROM pg_class WHERE relname LIKE 'sensors_2000_1______';$$)
WHERE length(result) > 0;

-- verify the shard of sensors_2001 is detached from the parent shard, on the worker node
SELECT COUNT(*) FROM run_command_on_workers($$
SELECT relpartbound FROM pg_class WHERE relname LIKE 'sensors_2001_1______';$$)
WHERE length(result) > 0;

-- verify the shard of sensors_2002 is attached to the parent shard, on the worker node
SELECT COUNT(*) FROM run_command_on_workers($$
SELECT relpartbound FROM pg_class WHERE relname LIKE 'sensors_2002_1______';$$)
WHERE length(result) > 0;

CREATE TABLE multi_level_partitioning_parent(
measureid integer,
eventdatetime date,
Expand Down

0 comments on commit 0be33c2

Please sign in to comment.