Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use FDW to query multiple servers as shards #320

Merged
merged 5 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/postgres_fdw/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SHLIB_LINK_INTERNAL = -Wl,-Bsymbolic -Wl,-Bstatic -Wl,-Bstatic $(libpq) -Wl,-Bdy
EXTENSION = postgres_fdw
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql

REGRESS = gp_postgres_fdw #postgres_fdw
REGRESS = gp_postgres_fdw mpp_postgres_fdw #postgres_fdw

ifdef USE_PGXS
PG_CONFIG = pg_config
Expand Down
389 changes: 389 additions & 0 deletions contrib/postgres_fdw/expected/mpp_postgres_fdw.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,389 @@
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
NOTICE: extension "postgres_fdw" already exists, skipping
CREATE SERVER testserver2 FOREIGN DATA WRAPPER postgres_fdw;
DO $d$
BEGIN
EXECUTE $$CREATE SERVER mpps1 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (dbname 'fdw1',
port '$$||current_setting('port')||$$'
)$$;
EXECUTE $$CREATE SERVER mpps2 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (dbname 'fdw2',
port '$$||current_setting('port')||$$'
)$$;
EXECUTE $$CREATE SERVER mpps3 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (dbname 'fdw3',
port '$$||current_setting('port')||$$'
)$$;
END;
$d$;

CREATE USER MAPPING FOR CURRENT_USER SERVER mpps1;
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps2;
CREATE USER MAPPING FOR CURRENT_USER SERVER mpps3;
DROP DATABASE IF EXISTS fdw1;
DROP DATABASE IF EXISTS fdw2;
DROP DATABASE IF EXISTS fdw3;
CREATE DATABASE fdw1;
CREATE DATABASE fdw2;
CREATE DATABASE fdw3;
\c fdw1
create table t1(a int, b text);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
create table t2(a int, b text);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
insert into t1 values(1, 'fdw1');
insert into t2 values(1, 'fdw1');
\c fdw2
create table t1(a int, b text);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
create table t2(a int, b text);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
insert into t1 values(1, 'fdw2');
insert into t2 values(1, 'fdw2');
\c fdw3
create table t1(a int, b text);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
create table t2(a int, b text);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Cloudberry Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
insert into t1 values(1, 'fdw3');
insert into t2 values(1, 'fdw3');
\c contrib_regression
CREATE FOREIGN TABLE fs1 (
a int,
b text
)
SERVER mpps1
OPTIONS (schema_name 'public', table_name 't1', mpp_execute 'all segments');
ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs1;
explain (costs off) select * from fs1;
QUERY PLAN
------------------------------------------
Gather Motion 1:1 (slice1; segments: 1)
-> Foreign Scan on fs1
Optimizer: Postgres query optimizer
(3 rows)

select * from fs1;
a | b
---+------
1 | fdw1
(1 row)

ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs1;
explain (costs off) select * from fs1;
QUERY PLAN
------------------------------------------
Gather Motion 2:1 (slice1; segments: 2)
-> Foreign Scan on fs1
Optimizer: Postgres query optimizer
(3 rows)

select * from fs1;
a | b
---+------
1 | fdw1
1 | fdw2
(2 rows)

explain (costs off) select count(*) from fs1;
QUERY PLAN
------------------------------------------------
Finalize Aggregate
-> Gather Motion 2:1 (slice1; segments: 2)
-> Foreign Scan
Relations: Aggregate on (fs1)
Optimizer: Postgres query optimizer
(5 rows)

select count(*) from fs1;
count
-------
2
(1 row)

select count(*),b from fs1 group by b;
count | b
-------+------
1 | fdw2
1 | fdw1
(2 rows)

ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs1;
explain (costs off) select * from fs1;
QUERY PLAN
------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan on fs1
Optimizer: Postgres query optimizer
(3 rows)

select * from fs1;
oppenheimer01 marked this conversation as resolved.
Show resolved Hide resolved
a | b
---+------
1 | fdw2
1 | fdw1
1 | fdw3
(3 rows)

explain (costs off) select count(*) from fs1;
oppenheimer01 marked this conversation as resolved.
Show resolved Hide resolved
QUERY PLAN
------------------------------------------------
Finalize Aggregate
-> Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan
Relations: Aggregate on (fs1)
Optimizer: Postgres query optimizer
(5 rows)

select count(*) from fs1;
count
-------
3
(1 row)

select count(*),b from fs1 group by b;
count | b
-------+------
1 | fdw2
1 | fdw1
1 | fdw3
(3 rows)

----------------------
-- Test join push down
----------------------
CREATE FOREIGN TABLE fs2 (
a int,
b text
)
SERVER mpps1
OPTIONS (schema_name 'public', table_name 't2', mpp_execute 'all segments');
ADD FOREIGN SEGMENT FROM SERVER mpps1 INTO fs2;
ADD FOREIGN SEGMENT FROM SERVER mpps2 INTO fs2;
ADD FOREIGN SEGMENT FROM SERVER mpps3 INTO fs2;
explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a;
QUERY PLAN
------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (fs1.a = fs2.a)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: fs1.a
-> Foreign Scan on fs1
-> Hash
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: fs2.a
-> Foreign Scan on fs2
Optimizer: Postgres query optimizer
(11 rows)

select * from fs1,fs2 where fs1.a = fs2.a;
a | b | a | b
---+------+---+------
1 | fdw1 | 1 | fdw2
1 | fdw1 | 1 | fdw1
1 | fdw1 | 1 | fdw3
1 | fdw2 | 1 | fdw2
1 | fdw2 | 1 | fdw1
1 | fdw2 | 1 | fdw3
1 | fdw3 | 1 | fdw2
1 | fdw3 | 1 | fdw1
1 | fdw3 | 1 | fdw3
(9 rows)

explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
QUERY PLAN
-------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan
Relations: (fs1) INNER JOIN (fs2)
Optimizer: Postgres query optimizer
(4 rows)

select * from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
a | b | a | b
---+------+---+------
1 | fdw3 | 1 | fdw3
1 | fdw1 | 1 | fdw1
1 | fdw2 | 1 | fdw2
(3 rows)

explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a;
QUERY PLAN
------------------------------------------------------------------------------
Finalize Aggregate
-> Gather Motion 3:1 (slice1; segments: 3)
-> Partial Aggregate
-> Hash Join
Hash Cond: (fs1.a = fs2.a)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: fs1.a
-> Foreign Scan on fs1
-> Hash
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: fs2.a
-> Foreign Scan on fs2
Optimizer: Postgres query optimizer
(13 rows)

select count(*) from fs1,fs2 where fs1.a = fs2.a;
count
-------
9
(1 row)

explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
QUERY PLAN
----------------------------------------------------------------
Finalize Aggregate
-> Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan
Relations: Aggregate on ((fs1) INNER JOIN (fs2))
Optimizer: Postgres query optimizer
(5 rows)

select count(*) from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
count
-------
3
(1 row)

----------------------------
-- Test with enable parallel
----------------------------
set enable_parallel to true;
explain (costs off) select * from fs1;
oppenheimer01 marked this conversation as resolved.
Show resolved Hide resolved
QUERY PLAN
------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan on fs1
Optimizer: Postgres query optimizer
(3 rows)

select * from fs1;
a | b
---+------
1 | fdw1
1 | fdw2
1 | fdw3
(3 rows)

explain (costs off) select count(*) from fs1;
QUERY PLAN
------------------------------------------------
Finalize Aggregate
-> Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan
Relations: Aggregate on (fs1)
Optimizer: Postgres query optimizer
(5 rows)

select count(*) from fs1;
count
-------
3
(1 row)

explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a;
QUERY PLAN
------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Hash Join
Hash Cond: (fs1.a = fs2.a)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: fs1.a
-> Foreign Scan on fs1
-> Hash
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: fs2.a
-> Foreign Scan on fs2
Optimizer: Postgres query optimizer
(11 rows)

select * from fs1,fs2 where fs1.a = fs2.a;
a | b | a | b
---+------+---+------
1 | fdw1 | 1 | fdw1
1 | fdw1 | 1 | fdw3
1 | fdw1 | 1 | fdw2
1 | fdw3 | 1 | fdw1
1 | fdw3 | 1 | fdw3
1 | fdw3 | 1 | fdw2
1 | fdw2 | 1 | fdw1
1 | fdw2 | 1 | fdw3
1 | fdw2 | 1 | fdw2
(9 rows)

select count(*),b from fs1 group by b;
count | b
-------+------
1 | fdw2
1 | fdw1
1 | fdw3
(3 rows)

explain (costs off) select * from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
QUERY PLAN
-------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan
Relations: (fs1) INNER JOIN (fs2)
Optimizer: Postgres query optimizer
(4 rows)

select * from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
a | b | a | b
---+------+---+------
1 | fdw3 | 1 | fdw3
1 | fdw2 | 1 | fdw2
1 | fdw1 | 1 | fdw1
(3 rows)

explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a;
QUERY PLAN
------------------------------------------------------------------------------
Finalize Aggregate
-> Gather Motion 3:1 (slice1; segments: 3)
-> Partial Aggregate
-> Hash Join
Hash Cond: (fs1.a = fs2.a)
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: fs1.a
-> Foreign Scan on fs1
-> Hash
-> Redistribute Motion 3:3 (slice3; segments: 3)
Hash Key: fs2.a
-> Foreign Scan on fs2
Optimizer: Postgres query optimizer
(13 rows)

select count(*) from fs1,fs2 where fs1.a = fs2.a;
count
-------
9
(1 row)

explain (costs off) select count(*) from fs1, fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
QUERY PLAN
----------------------------------------------------------------
Finalize Aggregate
-> Gather Motion 3:1 (slice1; segments: 3)
-> Foreign Scan
Relations: Aggregate on ((fs1) INNER JOIN (fs2))
Optimizer: Postgres query optimizer
(5 rows)

select count(*) from fs1,fs2 where fs1.a = fs2.a and fs1.gp_foreign_server = fs2.gp_foreign_server;
count
-------
3
(1 row)

reset enable_parallel;
Loading
Loading