Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi stage hashagg for groupingsets with unsortable refs #810

Merged
merged 8 commits into from
Dec 25, 2024
6 changes: 0 additions & 6 deletions gpMgmt/bin/gpcheckcat
Original file line number Diff line number Diff line change
Expand Up @@ -3256,12 +3256,6 @@ def checkcatReport():
# Expand partition tables
db = connect2(GV.cfg[GV.coordinator_dbid], utilityMode=False)

# GPDB_12_MERGE_FIXME: We used to have a query here, to fetch
# some extra informationa about partitioned tables. I (Heikki)
# didn't understand what it did. I ripped it out, and in the
# limited tests I did, I got the same result without it.
# What was it for? Do we need to put something back?

GV.missing_attr_tables = list(set(GV.missing_attr_tables))
if len(GV.missing_attr_tables) > 0:
myprint('----------------------------------------------------')
Expand Down
5 changes: 5 additions & 0 deletions gpMgmt/test/behave/mgmt_utils/gpinitsystem.feature
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Feature: gpinitsystem tests
Then gpinitsystem should return a return code of 0
Given the user runs "gpstate"
Then gpstate should return a return code of 0
And "LC_ALL" environment variable should be restored

Scenario: gpinitsystem creates a cluster when the user set -n or --locale parameter
Given create demo cluster config
Expand All @@ -39,6 +40,7 @@ Feature: gpinitsystem tests
Then gpinitsystem should return a return code of 0
Given the user runs "gpstate"
Then gpstate should return a return code of 0
And "LC_MONETARY" environment variable should be restored

Scenario: gpinitsystem exits with status 0 when the user set locale parameters
Given create demo cluster config
Expand Down Expand Up @@ -213,6 +215,7 @@ Feature: gpinitsystem tests
And the database timezone matches "HST"
And the startup timezone is saved
And the startup timezone matches "HST"
And "TZ" environment variable should be restored

Scenario: gpinitsystem should print FQDN in pg_hba.conf when HBA_HOSTNAMES=1
Given the cluster config is generated with HBA_HOSTNAMES "1"
Expand Down Expand Up @@ -284,6 +287,8 @@ Feature: gpinitsystem tests
And the database locales "lc_collate" match the locale "C"
And the database locales "lc_ctype" match the installed UTF locale
And the database locales "lc_messages,lc_monetary,lc_numeric,lc_time" match the system locale
And "LC_COLLATE" environment variable should be restored
And "LC_CTYPE" environment variable should be restored

@backup_restore_bashrc
Scenario: gpinitsystem succeeds if there is banner on host
Expand Down
13 changes: 0 additions & 13 deletions src/backend/cdb/cdbgroupingpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,19 +413,6 @@ cdb_create_multistage_grouping_paths(PlannerInfo *root,
List *gcls;
List *tlist;

/* GPDB_12_MERGE_FIXME: For now, bail out if there are any unsortable
* refs. PostgreSQL supports hashing with grouping sets nowadays, but
* the code in this file hasn't been updated to deal with it yet.
*/
ListCell *lc;
foreach(lc, parse->groupClause)
{
SortGroupClause *gc = lfirst_node(SortGroupClause, lc);

if (!OidIsValid(gc->sortop))
return;
}

gsetid = makeNode(GroupingSetId);
grouping_sets_tlist = copyObject(root->processed_tlist);
ctx.gsetid_sortref = add_gsetid_tlist(grouping_sets_tlist);
Expand Down
20 changes: 14 additions & 6 deletions src/backend/optimizer/plan/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -8935,21 +8935,29 @@ make_new_rollups_for_hash_grouping_set(PlannerInfo *root,
{
RollupData *rollup = lfirst_node(RollupData, lc);

/*
* If there are any empty grouping sets and all non-empty grouping
* sets are unsortable, there will be a rollup containing only
* empty groups. We handle those specially below.
* Note: This case only holds when path is equal to null.
*/
if (rollup->groupClause == NIL)
{
unhashed_rollup = rollup;
break;
}

/*
* If we find an unhashable rollup that's not been skipped by the
* "actually sorted" check above, we can't cope; we'd need sorted
* input (with a different sort order) but we can't get that here.
* So bail out; we'll get a valid path from the is_sorted case
* instead.
*
* The mere presence of empty grouping sets doesn't make a rollup
* unhashable (see preprocess_grouping_sets), we handle those
* specially below.
*/
if (!rollup->hashable)
return NULL;
else
sets_data = list_concat(sets_data, list_copy(rollup->gsets_data));

sets_data = list_concat(sets_data, list_copy(rollup->gsets_data));
}
foreach(lc, sets_data)
{
Expand Down
53 changes: 27 additions & 26 deletions src/test/isolation2/sql_isolation_testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ def get_hostname_port(name, role_name):
for content, host, port, role in conn_map:
if real_content_id == content and role == role_name:
return (host, port)
raise Exception("Cannont find a connection with content_id=%d, role=%c" % (content_id, role_name))
raise Exception("Cannot find a connection with content_id=%d, role=%c" % (content_id, role_name))


class GlobalShellExecutor(object):
BASH_PS1 = 'test_sh$>'
BASH_PS1 = 'GlobalShellExecutor>'

class ExecutionError(Exception):
""
Expand All @@ -98,6 +98,7 @@ def __init__(self, output_file='', initfile_prefix=''):
stdin=self.slave_fd,
stdout=self.slave_fd,
stderr=self.slave_fd,
start_new_session=True,
universal_newlines=True)
self.bash_log_file = open("%s.log" % self.initfile_prefix, "w+")
self.__run_command("export PS1='%s'" % GlobalShellExecutor.BASH_PS1)
Expand All @@ -110,7 +111,7 @@ def terminate(self, with_error = False):
# If write the matchsubs section directly to the output, the generated token id will be compared by gpdiff.pl
# so here just write all matchsubs section into an auto generated init file when this test case file finished.
if not with_error and self.initfile_prefix != None and len(self.initfile_prefix) > 1:
output_init_file = "%s.ini" % self.initfile_prefix
output_init_file = "%s.initfile" % self.initfile_prefix
cmd = ''' [ ! -z "${MATCHSUBS}" ] && echo "-- start_matchsubs ${NL} ${MATCHSUBS} ${NL}-- end_matchsubs" > %s ''' % output_init_file
self.exec_global_shell(cmd, False)

Expand All @@ -119,48 +120,48 @@ def terminate(self, with_error = False):
try:
self.sh_proc.terminate()
except OSError as e:
# Ignore the exception if the process doesn't exist.
# Log then ignore the exception if the process doesn't exist.
print("Failed to terminate bash process: %s" % e)
pass
self.sh_proc = None

def __run_command(self, sh_cmd):
# Strip the newlines at the end. It will be added later.
# Strip extra new lines
sh_cmd = sh_cmd.rstrip()
bytes_written = os.write(self.master_fd, sh_cmd.encode())
bytes_written += os.write(self.master_fd, b'\n')
os.write(self.master_fd, sh_cmd.encode()+b'\n')

output = ""
while self.sh_proc.poll() is None:
# If not returns in 10 seconds, consider it as an fatal error.
r, w, e = select.select([self.master_fd], [], [self.master_fd], 30)
# If times out, consider it as an fatal error.
r, _, e = select.select([self.master_fd], [], [self.master_fd], 30)
if e:
# Terminate the shell when we get any output from stderr
o = os.read(self.master_fd, 10240)
self.bash_log_file.write(o)
self.bash_log_file.flush()
self.terminate(True)
raise GlobalShellExecutor.ExecutionError("Error happened to the bash daemon, see %s for details." % self.bash_log_file.name)
raise GlobalShellExecutor.ExecutionError("Error happened to the bash process, see %s for details." % self.bash_log_file.name)

if r:
o = os.read(self.master_fd, 10240).decode()
self.bash_log_file.write(o)
self.bash_log_file.flush()
output += o
if o.endswith(GlobalShellExecutor.BASH_PS1):
if output.endswith(GlobalShellExecutor.BASH_PS1):
lines = output.splitlines()
return lines[len(sh_cmd.splitlines()):len(lines) - 1]

if not r and not e:
self.terminate(True)
raise GlobalShellExecutor.ExecutionError("Timeout happened to the bash daemon, see %s for details." % self.bash_log_file.name)
raise GlobalShellExecutor.ExecutionError("Timeout happened to the bash process, see %s for details." % self.bash_log_file.name)

self.terminate(True)
raise GlobalShellExecutor.ExecutionError("Bash daemon has been stopped, see %s for details." % self.bash_log_file.name)
raise GlobalShellExecutor.ExecutionError("Bash process has been stopped, see %s for details." % self.bash_log_file.name)

# execute global shell cmd in bash deamon, and fetch result without blocking
# execute global shell cmd in bash process, and fetch result without blocking
def exec_global_shell(self, sh_cmd, is_trip_output_end_blanklines):
if self.sh_proc == None:
raise GlobalShellExecutor.ExecutionError("The bash daemon has been terminated abnormally, see %s for details." % self.bash_log_file.name)
raise GlobalShellExecutor.ExecutionError("The bash process has been terminated abnormally, see %s for details." % self.bash_log_file.name)

# get the output of shell command
output = self.__run_command(sh_cmd)
Expand All @@ -173,7 +174,7 @@ def exec_global_shell(self, sh_cmd, is_trip_output_end_blanklines):

return output

# execute gobal shell:
# execute global shell:
# 1) set input stream -> $RAW_STR
# 2) execute shell command from input
# if error, write error message to err_log_file
Expand Down Expand Up @@ -203,7 +204,7 @@ def extract_sh_cmd(self, header, input_str):
for i in range(start, len(input_str)):
if end == 0 and input_str[i] == '\'':
if not is_start:
# find shell begin postion
# find shell begin position
is_start = True
start = i+1
continue
Expand All @@ -215,7 +216,7 @@ def extract_sh_cmd(self, header, input_str):
break
if cnt % 2 == 1:
continue
# find shell end postion
# find shell end position
res_cmd = input_str[start: i]
end = i
continue
Expand Down Expand Up @@ -658,7 +659,7 @@ def process_command(self, command, output_file, global_sh_executor):
con_mode = "mirror"
sql = m.groups()[2]
sql = sql.lstrip()
# If db_name is specifed , it should be of the following syntax:
# If db_name is specified , it should be of the following syntax:
# 1:@db_name <db_name>: <sql>
if sql.startswith('@db_name'):
sql_parts = sql.split(':', 2)
Expand Down Expand Up @@ -803,7 +804,7 @@ def process_isolation_file(self, sql_file, output_file, initfile_prefix):
print((" " if command and not newline else "") + line.strip(), end="", file=output_file)
newline = False
if line[0] == "!":
command_part = line # shell commands can use -- for multichar options like --include
command_part = line # shell commands can use -- for long options like --include
elif re.match(r";.*--", line) or re.match(r"^--", line):
command_part = line.partition("--")[0] # remove comment from line
else:
Expand All @@ -816,7 +817,7 @@ def process_isolation_file(self, sql_file, output_file, initfile_prefix):
try:
self.process_command(command, output_file, shell_executor)
except GlobalShellExecutor.ExecutionError as e:
# error in the daemon shell cannot be recovered
# error of the GlobalShellExecutor cannot be recovered
raise
except Exception as e:
print("FAILED: ", e, file=output_file)
Expand Down Expand Up @@ -864,7 +865,7 @@ class SQLIsolationTestCase:
U&: expect blocking behavior in utility mode (does not currently support an asterisk target)
U<: join an existing utility mode session (does not currently support an asterisk target)

R|R&|R<: similar to 'U' meaning execept that the connect is in retrieve mode, here don't
R|R&|R<: similar to 'U' meaning except that the connect is in retrieve mode, here don't
thinking about retrieve mode authentication, just using the normal authentication directly.

An example is:
Expand All @@ -878,7 +879,7 @@ class SQLIsolationTestCase:

The isolation tests are specified identical to sql-scripts in normal
SQLTestCases. However, it is possible to prefix a SQL line with
an tranaction identifier followed by a colon (":").
an transaction identifier followed by a colon (":").
The above example would be defined by
1: BEGIN;
2: BEGIN;
Expand Down Expand Up @@ -956,9 +957,9 @@ class SQLIsolationTestCase:
Shell Execution for SQL or Output:

@pre_run can be used for executing shell command to change input (i.e. each SQL statement) or get input info;
@post_run can be used for executing shell command to change ouput (i.e. the result set printed for each SQL execution)
@post_run can be used for executing shell command to change output (i.e. the result set printed for each SQL execution)
or get output info. Just use the env variable ${RAW_STR} to refer to the input/out stream before shell execution,
and the output of the shell command will be used as the SQL exeucted or output printed into results file.
and the output of the shell command will be used as the SQL executed or output printed into results file.

1: @post_run ' TOKEN1=` echo "${RAW_STR}" | awk \'NR==3\' | awk \'{print $1}\'` && export MATCHSUBS="${MATCHSUBS}${NL}m/${TOKEN1}/${NL}s/${TOKEN1}/token_id1/${NL}" && echo "${RAW_STR}" ': SELECT token,hostname,status FROM GP_ENDPOINTS WHERE cursorname='c1';
2R: @pre_run ' echo "${RAW_STR}" | sed "s#@TOKEN1#${TOKEN1}#" ': RETRIEVE ALL FROM "@TOKEN1";
Expand All @@ -967,7 +968,7 @@ class SQLIsolationTestCase:
- Sample 1: set env variable ${TOKEN1} to the cell (row 3, col 1) of the result set, and print the raw result.
The env var ${MATCHSUBS} is used to store the matchsubs section so that we can store it into initfile when
this test case file is finished executing.
- Sample 2: replaceing "@TOKEN1" by generated token which is fetch in sample1
- Sample 2: replace "@TOKEN1" by generated token which is fetched in sample1

There are some helper functions which will be sourced automatically to make above
cases easier. See global_sh_executor.sh for more information.
Expand Down
81 changes: 81 additions & 0 deletions src/test/regress/expected/groupingsets.out
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ values (1,1,b'0000','1'), (2,2,b'0001','1'),
(3,4,b'0010','2'), (4,8,b'0011','2'),
(5,16,b'0000','2'), (6,32,b'0001','2'),
(7,64,b'0010','1'), (8,128,b'0011','1');
create temp table gstest5(id integer, v integer,
unsortable_col1 xid, unsortable_col2 xid);
insert into gstest5
values (1,1,'3','1'), (2,2,'3','1'),
(3,4,'4','2'), (4,8,'4','2'),
(5,16,'4','2'), (6,32,'4','2'),
(7,64,'3','1'), (8,128,'3','1');
create temp table gstest_empty (a integer, b integer, v integer);
create function gstest_data(v integer, out a integer, out b integer)
returns setof record
Expand Down Expand Up @@ -1324,6 +1331,80 @@ explain (costs off)
Optimizer: Postgres query optimizer
(13 rows)

select unsortable_col1, unsortable_col2,
grouping(unsortable_col1, unsortable_col2),
count(*), sum(v)
from gstest5 group by grouping sets ((unsortable_col1),(unsortable_col2))
order by 3,5;
unsortable_col1 | unsortable_col2 | grouping | count | sum
-----------------+-----------------+----------+-------+-----
4 | | 1 | 4 | 60
3 | | 1 | 4 | 195
| 2 | 2 | 4 | 60
| 1 | 2 | 4 | 195
(4 rows)

explain (costs off)
select unsortable_col1, unsortable_col2,
grouping(unsortable_col1, unsortable_col2),
count(*), sum(v)
from gstest5 group by grouping sets ((unsortable_col1),(unsortable_col2))
order by 3,5;
QUERY PLAN
-------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
Merge Key: (GROUPING(unsortable_col1, unsortable_col2)), (sum(v))
-> Sort
Sort Key: (GROUPING(unsortable_col1, unsortable_col2)), (sum(v))
-> Finalize HashAggregate
Group Key: unsortable_col1, unsortable_col2, (GROUPINGSET_ID())
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: (GROUPINGSET_ID())
-> Partial HashAggregate
Hash Key: unsortable_col1
Hash Key: unsortable_col2
-> Seq Scan on gstest5
Optimizer: Postgres query optimizer
(13 rows)

select unsortable_col1, unsortable_col2,
grouping(unsortable_col1, unsortable_col2),
count(*), sum(v)
from gstest5 group by grouping sets ((unsortable_col1),(unsortable_col2),())
order by 3,5;
unsortable_col1 | unsortable_col2 | grouping | count | sum
-----------------+-----------------+----------+-------+-----
4 | | 1 | 4 | 60
3 | | 1 | 4 | 195
| 2 | 2 | 4 | 60
| 1 | 2 | 4 | 195
| | 3 | 8 | 255
(5 rows)

explain (costs off)
select unsortable_col1, unsortable_col2,
grouping(unsortable_col1, unsortable_col2),
count(*), sum(v)
from gstest5 group by grouping sets ((unsortable_col1),(unsortable_col2),())
order by 3,5;
QUERY PLAN
-------------------------------------------------------------------------------
Gather Motion 3:1 (slice1; segments: 3)
Merge Key: (GROUPING(unsortable_col1, unsortable_col2)), (sum(v))
-> Sort
Sort Key: (GROUPING(unsortable_col1, unsortable_col2)), (sum(v))
-> Finalize HashAggregate
Group Key: unsortable_col1, unsortable_col2, (GROUPINGSET_ID())
-> Redistribute Motion 3:3 (slice2; segments: 3)
Hash Key: (GROUPINGSET_ID())
-> Partial MixedAggregate
Hash Key: unsortable_col1
Hash Key: unsortable_col2
Group Key: ()
-> Seq Scan on gstest5
Optimizer: Postgres query optimizer
(14 rows)

-- empty input: first is 0 rows, second 1, third 3 etc.
select a, b, sum(v), count(*) from gstest_empty group by grouping sets ((a,b),a);
a | b | sum | count
Expand Down
Loading
Loading