Skip to content

Commit

Permalink
Expand partition table leaves in parallel.
Browse files Browse the repository at this point in the history
This commit brings back the feature that we can parallelly
redistribute data of the same partition table's leaf partitions
during gpexpand's second stage. It is implemented via several
clear small commits (finally squash and commit as this one
big commit) so for details please refer to the Github PR page:

This feature is important not only because we can speed up
due to more concurrency, but also because it can save a lot
of disk. Expanding a table to new cluster size is implemented
by CTAS in Greenplum, CTAS can only remove old data at the
end of the transaction. One leaf per transaction can make sure
we remove useless data quickly.

The most important theory here is Parent-Child Distribution
Policy Invariant, for details please refer to Heikki's comments
in https://groups.google.com/a/greenplum.org/g/gpdb-dev/c/rSacd_vI-fM/m/pkAW-Z-lCgAJ

Following list what this commit has done:

1. Introduce a new Statement `ALTER TABLE EXPAND PARTITION PREPARE`.

    This statement can only be used for partition table's root
    (before expanding to new cluster size). For hash distributed
    partition table, it will make the root (of middle) hash
    distributed to the full cluster, and all leafs to full randomly
    distributed to the full cluster in single transaction. Heaviest
    locks will be held in this step and because no data is touching
    this transaction will be fast.

2. Modify the implementation of `Alter Table Set Distributed by`
   to allow operate on leafs.

    The constrain now is to obey Parent-Child Distribution Policy
    Invariant, so you can directly operate on leafs if the target
    policy of the leaf is the same as the root. Then later we can
    use this to concurrently expand leafs.

3. Change gpexpand script to fully work.

    First, it will invoke `prepare expand` to change policy and
    then  during state_details table built (1st stage of gpexpand),
    we will not put root in the queue that need to expand. We
    put leafs instead. Later for leafs expansion in 2nd stage, we
    use `alter table set distributed by` for them because their
    numsegments are already the full cluster size.

4. Finish writable external leaf partitions during prepare-stage.

    For Foreign|External tables, only writable external table has
    gp_policy entry and to expand them we just need to change the
    catalog without any data movement. So any foreign|external leaf
    partitions, if writable, expand during prepare-stage, otherwise
    just ignore.

Authored-by: Xuejing Zhao <zxuejing@vmware.com>
Authored-by: Jian Guo <xihuke@gmail.com>
Authored-by: Xuebin Su <sxuebin@vmware.com>
Authored-by: Yini Li <yinil@vmware.com>
Authored-by: Zhenghua Lyu <kainwen@gmail.com>
Reviewed-by: Alexandra Wang <walexandra@vmware.com>
Inspired-by: uglthinx
  • Loading branch information
zxuejing authored and kainwen committed Nov 16, 2021
1 parent 248c1c7 commit a45be43
Show file tree
Hide file tree
Showing 14 changed files with 2,021 additions and 75 deletions.
124 changes: 74 additions & 50 deletions gpMgmt/bin/gpexpand
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ status_detail_table_sql = """CREATE TABLE gpexpand.status_detail
( dbname text,
fq_name text,
table_oid oid,
root_partition_name text,
root_partition_oid oid,
rank int,
external_writable bool,
status text,
Expand Down Expand Up @@ -1582,7 +1582,7 @@ class gpexpand:
current_database(),
quote_ident(n.nspname) || '.' || quote_ident(c.relname) as fq_name,
c.oid as tableoid,
NULL as root_partition_name,
NULL as root_partition_oid,
2 as rank,
pe.writable is not null as external_writable,
'%s' as undone_status,
Expand Down Expand Up @@ -1628,14 +1628,17 @@ WHERE

def _populate_partitioned_tables(self, dbname):
"""
population of status_detail for partitioned tables, leaf partition cannot
has different numsegments with root partition, we need to expand root
partition in one shot, so just populate root partition for now.
TODO:
We used to use a tricky but effective way to expand leaf partition in
in parallel, that way is still under discussion. Keep the old method
here in case we need bring it back someday.
The policy of leaves can be different of the policy of root, But it must
follow below rules:
If a partitioned table is Hash distributed, then all its leaf partitions
must also be Hash partitioned on the same distribution key, with the
same 'numsegments', or randomly distributed with the same 'numsegments'.
If a partitioned table is Randomly distributed, then all the leaves must
be randomly distributed as well.
population of status_detail for partitioned tables, leaf partition can
has different policy with root partition, we need to expand leaf
partitions separately in parallel.
Step1:
BEGIN;
Expand All @@ -1644,44 +1647,60 @@ WHERE
Change all leaf partition to random distributed;
COMMIT;
Step2:
Change all leaf partition's policy back to old policy with a mandatory
data movement.
Change all leaf partition's policy back to parent's policy with set distributed
with(REORGANIZE=true)
"""
src_bytes_str = "0" if self.options.simple_progress else "pg_relation_size(quote_ident(n.nspname) || '.' || quote_ident(c.relname))"
sql = """
SELECT
current_database(),
quote_ident(n.nspname) || '.' || quote_ident(c.relname) as fq_name,
c.oid as tableoid,
quote_ident(n.nspname) || '.' || quote_ident(c.relname) as root_partition_name,
2 as rank,
false as external_writable,
'%s' as undone_status,
NULL as expansion_started,
NULL as expansion_finished,
%s as source_bytes
FROM
pg_class c,
pg_namespace n,
pg_partitioned_table p,
gp_distribution_policy d
WHERE
c.relnamespace = n.oid
AND p.partrelid = c.oid
AND NOT c.relispartition
AND d.localoid = c.oid
ORDER BY fq_name, tableoid desc
""" % (undone_status, src_bytes_str)
self.logger.debug(sql)
table_conn = self.connect_database(dbname)

cursor = dbconn.query(table_conn, """
SELECT partrelid::regclass AS relname
FROM pg_partitioned_table, pg_class
WHERE partrelid = pg_class.oid AND relispartition = FALSE;
""")
for row in cursor:
prepare_cmd = """
ALTER TABLE %s EXPAND PARTITION PREPARE;
""" % (row.relname)
self.logger.debug(prepare_cmd)
dbconn.execSQL(table_conn, prepare_cmd, autocommit=False)

src_bytes_str = "0" if self.options.simple_progress else "pg_relation_size(c.oid)"
get_status_detail_cmd = """
SELECT
current_database(),
quote_ident(n.nspname) || '.' || quote_ident(c.relname) as fq_name,
c.oid as tableoid,
d.oid as root_partition_oid,
2 as rank,
false as external_writable,
'%s' as undone_status,
NULL as expansion_started,
NULL as expansion_finished,
%s as source_bytes
FROM
pg_inherits a,
pg_partitioned_table b,
pg_class c,
pg_class d,
pg_namespace n
WHERE
a.inhparent=b.partrelid and
a.inhrelid = c.oid and
a.inhparent = d.oid and
c.relnamespace = n.oid and
c.relkind != 'p' and
c.relkind != 'f'
""" % (undone_status, src_bytes_str)
self.logger.debug(get_status_detail_cmd)

try:
data_file = os.path.abspath('./status_detail.dat')
self.logger.debug('status_detail data file: %s' % data_file)
copySQL = """COPY (%s) TO '%s'""" % (sql, data_file)
copySQL = """COPY (%s) TO '%s'""" % (get_status_detail_cmd, data_file)

self.logger.debug(copySQL)
dbconn.execSQL(table_conn, copySQL)
table_conn.commit()
table_conn.close()
except Exception as e:
raise ExpansionError(e)
Expand Down Expand Up @@ -1907,22 +1926,19 @@ ORDER BY fq_name, tableoid desc
class ExpandTable():
def __init__(self, options, row=None):
self.options = options
self.is_root_partition = False
if row is not None:
(self.dbname, self.fq_name, self.table_oid,
self.root_partition_name,
self.root_partition_oid,
self.rank, self.external_writable, self.status,
self.expansion_started, self.expansion_finished,
self.source_bytes) = row
if self.fq_name == self.root_partition_name:
self.is_root_partition = True

def add_table(self, conn):
insertSQL = """INSERT INTO gpexpand.status_detail
VALUES ('%s','%s',%s,
'%s',%d,'%s','%s','%s','%s',%d)
'%d',%d,'%s','%s','%s','%s',%d)
""" % (self.dbname.replace("'", "''"), self.fq_name.replace("'", "''"), self.table_oid,
self.root_partition_name.replace("'", "''"),
self.root_partition_oid,
self.rank, self.external_writable, self.status,
self.expansion_started, self.expansion_finished,
self.source_bytes)
Expand Down Expand Up @@ -1960,11 +1976,19 @@ class ExpandTable():
dbconn.execSQL(status_conn, sql)

def expand(self, table_conn, cancel_flag):
# for root partition, we want to expand whose partition in one shot
# TODO: expand leaf partitions separately in parallel
only_str = "" if self.is_root_partition else "ONLY"
external_str = "EXTERNAL" if self.external_writable else ""
sql = 'ALTER %s TABLE %s %s EXPAND TABLE' % (external_str, only_str, self.fq_name)
# expand leaf partitions separately in parallel
# FIXME: alter table on external table does not throw
# a warning, but it will throw error in 6X
# do we still need using alter external table?
if self.root_partition_oid is not None:
get_dist_cmd = """
select pg_get_table_distributedby(%d) as distribution_policy;
""" % (self.root_partition_oid)
res = dbconn.queryRow(table_conn, get_dist_cmd)
sql = "ALTER TABLE %s SET WITH (REORGANIZE=true) %s" % (self.fq_name, res.distribution_policy)
else:
# FIXME: Can "ONLY" be allowed in "EXPAND TABLE"?
sql = 'ALTER TABLE %s EXPAND TABLE' % self.fq_name

logger.info('Expanding %s.%s' % (self.dbname, self.fq_name))
logger.debug("Expand SQL: %s" % sql)
Expand Down
2 changes: 1 addition & 1 deletion gpMgmt/bin/gppylib/test/unit/test_unit_gpexpand_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def setUp(self):
distribution_policy_names text,
distribution_policy_coloids text,
distribution_policy_type text,
root_partition_name text,
root_partition_oid oid,
storage_options text,
rank int,
status text,
Expand Down
19 changes: 19 additions & 0 deletions gpMgmt/test/behave/mgmt_utils/gpexpand.feature
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,22 @@ Feature: expand the cluster by adding more segments
When the user runs gpexpand to redistribute
Then the numsegments of table "public.test_matview" is 4
And distribution information from table "public.test_matview" and "public.test_matview_base" in "gptest" are the same

@gpexpand_verify_partition_table
Scenario: Verify should succeed when expand partition table
Given the database is not running
And a working directory of the test as '/data/gpdata/gpexpand'
And a temporary directory under "/data/gpdata/gpexpand/expandedData" to expand into
And the cluster is generated with "1" primaries only
And database "gptest" exists
And the user create a partition table with name "partition_test"
And distribution information from table "partition_test" with data in "gptest" is saved
And there are no gpexpand_inputfiles
And the cluster is setup for an expansion on hosts "localhost"
When the user runs gpexpand interview to add 3 new segment and 0 new host "ignored.host"
Then the number of segments have been saved
When the user runs gpexpand with the latest gpexpand_inputfile with additional parameters "--silent"
Then verify that the cluster has 3 new segments
When the user runs gpexpand to redistribute
Then the numsegments of table "partition_test" is 4
Then distribution information from table "partition_test" with data in "gptest" is verified against saved data
2 changes: 1 addition & 1 deletion gpMgmt/test/behave/mgmt_utils/gpstate.feature
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ Feature: gpstate tests
distribution_policy_names text,
distribution_policy_coloids text,
distribution_policy_type text,
root_partition_name text,
root_partition_oid oid,
storage_options text,
rank int,
status text,
Expand Down
28 changes: 28 additions & 0 deletions gpMgmt/test/behave/mgmt_utils/steps/mgmt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ def impl(conetxt, tabname):
dbconn.execSQL(conn, sql)
conn.commit()

@given('the user create a partition table with name "{tabname}"')
def impl(conetxt, tabname):
dbname = 'gptest'
with closing(dbconn.connect(dbconn.DbURL(dbname=dbname), unsetSearchPath=False)) as conn:
sql = "create table {tabname}(i int) partition by range(i) (start(0) end(10001) every(1000)) distributed by (i)".format(tabname=tabname)
dbconn.execSQL(conn, sql)
sql = "INSERT INTO {tabname} SELECT generate_series(1, 10000)".format(tabname=tabname)
dbconn.execSQL(conn, sql)
conn.commit()


@given('the user executes "{sql}" with named connection "{cname}"')
def impl(context, cname, sql):
Expand Down Expand Up @@ -2957,11 +2967,14 @@ def impl(context, table_name, dbname):
@given('distribution information from table "{table}" with data in "{dbname}" is saved')
def impl(context, table, dbname):
context.pre_redistribution_row_count = _get_row_count_per_segment(table, dbname)
context.pre_redistribution_dist_policy = _get_dist_policy_per_partition(table, dbname)

@then('distribution information from table "{table}" with data in "{dbname}" is verified against saved data')
def impl(context, table, dbname):
pre_distribution_row_count = context.pre_redistribution_row_count
pre_redistribution_dist_policy = context.pre_redistribution_dist_policy
post_distribution_row_count = _get_row_count_per_segment(table, dbname)
post_distribution_dist_policy = _get_dist_policy_per_partition(table, dbname)

if len(pre_distribution_row_count) >= len(post_distribution_row_count):
raise Exception("Failed to redistribute table. Expected to have more than %d segments, got %d segments" % (len(pre_distribution_row_count), len(post_distribution_row_count)))
Expand All @@ -2988,6 +3001,14 @@ def impl(context, table, dbname):
raise Exception("Unexpected variance for redistributed data in table %s. Relative standard error %f exceeded tolerance factor of %f." %
(table, relative_std_error, tolerance))

for i in range(len(post_distribution_dist_policy)):
if(post_distribution_dist_policy[i][0] == pre_redistribution_dist_policy[i][0] or \
post_distribution_dist_policy[i][1] != pre_redistribution_dist_policy[i][1] or \
post_distribution_dist_policy[i][2] != pre_redistribution_dist_policy[i][2]):
raise Exception("""Redistributed policy does not match pre-redistribution policy.
before expanded: %s, after expanded: %s""" % (",".join(map(str, pre_redistribution_dist_policy[i])), \
",".join(map(str, post_distribution_dist_policy[i]))))


@then('the row count from table "{table_name}" in "{dbname}" is verified against the saved data')
def impl(context, table_name, dbname):
Expand All @@ -3012,6 +3033,13 @@ def _get_row_count_per_segment(table, dbname):
rows = cursor.fetchall()
return [row[1] for row in rows] # indices are the gp segment id's, so no need to store them explicitly

def _get_dist_policy_per_partition(table, dbname):
with closing(dbconn.connect(dbconn.DbURL(dbname=dbname), unsetSearchPath=False)) as conn:
query = "select * from gp_distribution_policy where localoid::regclass::text like '%s%%' order by localoid;" % table
cursor = dbconn.query(conn, query)
rows = cursor.fetchall()
return [row[2:5] for row in rows] # we only need numsegments、distkey、distclass

@given('run rollback')
@then('run rollback')
@when('run rollback')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
</row>
<row>
<entry colname="col1">
<codeph>root_partition_name</codeph>
<codeph>root_partition_oid</codeph>
</entry>
<entry colname="col2">text</entry>
<entry colname="col2">oid</entry>
<entry colname="col3"/>
<entry colname="col4">For a partitioned table, the name of the root partition.
<entry colname="col4">For a partitioned table, the OID of the root partition.
Otherwise, <codeph>None</codeph>.</entry>
</row>
<row>
Expand Down
Loading

0 comments on commit a45be43

Please sign in to comment.