Skip to content

Commit

Permalink
Fix incorrect window function cancellation handling (#5818) (#5826)
Browse files Browse the repository at this point in the history
close #5814
  • Loading branch information
ti-chi-bot committed Sep 9, 2022
1 parent 5d607e7 commit d16dde3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
27 changes: 17 additions & 10 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,28 @@ void WindowBlockInputStream::initialWorkspaces()
only_have_pure_window = onlyHaveRowNumberAndRank();
}

bool WindowBlockInputStream::returnIfCancelledOrKilled()
{
if (isCancelledOrThrowIfKilled())
{
if (!window_blocks.empty())
window_blocks.erase(window_blocks.begin(), window_blocks.end());
input_is_finished = true;
return true;
}
return false;
}

Block WindowBlockInputStream::readImpl()
{
const auto & stream = children.back();
while (!input_is_finished)
{
if (returnIfCancelledOrKilled())
return {};

if (Block output_block = tryGetOutputBlock())
{
return output_block;
}

Block block = stream->read();
if (!block)
Expand All @@ -93,6 +106,8 @@ Block WindowBlockInputStream::readImpl()
tryCalculate();
}

if (returnIfCancelledOrKilled())
return {};
// return last partition block, if already return then return null
return tryGetOutputBlock();
}
Expand Down Expand Up @@ -360,14 +375,6 @@ void WindowBlockInputStream::writeOutCurrentRow()

Block WindowBlockInputStream::tryGetOutputBlock()
{
if (isCancelledOrThrowIfKilled())
{
if (!window_blocks.empty())
window_blocks.erase(window_blocks.begin(), window_blocks.end());
input_is_finished = true;
return {};
}

assert(first_not_ready_row.block >= first_block_number);
// The first_not_ready_row might be past-the-end if we have already
// calculated the window functions for all input rows. That's why the
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ class WindowBlockInputStream : public IProfilingBlockInputStream
protected:
Block readImpl() override;

bool returnIfCancelledOrKilled();

LoggerPtr log;

public:
Expand Down
16 changes: 11 additions & 5 deletions format-diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,22 @@ def main():

if args.check_formatted:
diff_res = run_cmd('git diff --name-only')
if diff_res:
files_not_in_contrib = [f for f in diff_res if not f.startswith('contrib')]
files_contrib = [f for f in diff_res if f.startswith('contrib')]
if files_not_in_contrib:
print('')
print('Error: found files NOT formatted')
print(''.join(diff_res))
print(''.join(files_not_in_contrib))
exit(-1)
elif files_contrib:
print('')
print('Warn: found contrib changed')
print(''.join(files_contrib))
print('')
print(''.join(run_cmd('git status')))
else:
print("Format check passed")
else:
cmd = 'clang-format -i {}'.format(' '.join(files_to_format))
if subprocess.Popen(cmd, shell=True, cwd=tics_repo_path).wait():
exit(-1)
print("Finish code format")
else:
print('No file to format')
Expand Down

0 comments on commit d16dde3

Please sign in to comment.