Skip to content

Commit

Permalink
Add hstackBlocks for Block to join blocks by columns (#6803)
Browse files Browse the repository at this point in the history
close #5829
  • Loading branch information
Lloyd-Pottiger authored Feb 13, 2023
1 parent e665e59 commit 01a5ddd
Show file tree
Hide file tree
Showing 15 changed files with 50 additions and 24 deletions.
30 changes: 26 additions & 4 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,29 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons
return ReturnType(true);
}

Block mergeBlocks(Blocks && blocks)
/// join blocks by columns
Block hstackBlocks(Blocks && blocks, const Block & header)
{
if (blocks.empty())
return {};

Block res = header.cloneEmpty();

size_t num_rows = blocks.front().rows();
for (const auto & block : blocks)
{
RUNTIME_CHECK_MSG(block.rows() == num_rows, "Cannot hstack blocks with different number of rows");
for (const auto & elem : block)
{
res.getByName(elem.name).column = std::move(elem.column);
}
}

return res;
}

/// join blocks by rows
Block vstackBlocks(Blocks && blocks)
{
if (blocks.empty())
{
Expand All @@ -526,13 +548,13 @@ Block mergeBlocks(Blocks && blocks)
return std::move(blocks[0]);
}

auto & first_block = blocks[0];
size_t result_rows = 0;
for (const auto & block : blocks)
{
result_rows += block.rows();
}

auto & first_block = blocks.front();
MutableColumns dst_columns(first_block.columns());

for (size_t i = 0; i < first_block.columns(); ++i)
Expand All @@ -546,9 +568,9 @@ Block mergeBlocks(Blocks && blocks)
if (likely(blocks[i].rows() > 0))
{
assert(blocksHaveEqualStructure(first_block, blocks[i]));
for (size_t column = 0; column < blocks[i].columns(); ++column)
for (size_t idx = 0; idx < blocks[i].columns(); ++idx)
{
dst_columns[column]->insertRangeFrom(*blocks[i].getByPosition(column).column, 0, blocks[i].rows());
dst_columns[idx]->insertRangeFrom(*blocks[i].getByPosition(idx).column, 0, blocks[i].rows());
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
using BucketBlocksListMap = std::map<Int32, BlocksList>;

Block mergeBlocks(Blocks && blocks);
/// join blocks by columns
Block hstackBlocks(Blocks && blocks, const Block & header);

/// join blocks by rows
Block vstackBlocks(Blocks && blocks);

Block popBlocksListFront(BlocksList & blocks);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
BlocksToMerge blocks_to_merge = getNextBlocksToMerge();
if (blocks_to_merge && !blocks_to_merge->empty())
{
current_result = aggregator.mergeBlocks(*blocks_to_merge, final);
current_result = aggregator.vstackBlocks(*blocks_to_merge, final);
auto block = popBlocksListFront(current_result);
assert(block);
return block;
Expand Down Expand Up @@ -380,7 +380,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::mergeThread()
}

/// At this point, several merge threads may work in parallel.
BlocksList res = aggregator.mergeBlocks(*blocks_to_merge, final);
BlocksList res = aggregator.vstackBlocks(*blocks_to_merge, final);
assert(!res.empty());

{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/SquashingHashJoinBlockTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void SquashingHashJoinBlockTransform::appendBlock(Block & block)

Block SquashingHashJoinBlockTransform::getFinalOutputBlock()
{
Block final_block = mergeBlocks(std::move(blocks));
Block final_block = vstackBlocks(std::move(blocks));
reset();
handleOverLimitBlock();
return final_block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SimpleOperatorTestRunner : public DB::tests::ExecutorTest
while (op_pipeline->execute() != OperatorStatus::FINISHED)
{
}
ASSERT_COLUMNS_EQ_UR(expect_columns, mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName());
ASSERT_COLUMNS_EQ_UR(expect_columns, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName());
}
};

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/tests/gtest_spill_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ try
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
}
ASSERT_COLUMNS_EQ_UR(ref_columns, mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName());
ASSERT_COLUMNS_EQ_UR(ref_columns, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName());
blocks = getExecuteStreamsReturnBlocks(request, original_max_streams, true);
for (auto & block : blocks)
{
ASSERT_EQ(block.rows() <= small_max_block_size, true);
}
ASSERT_COLUMNS_EQ_UR(ref_columns, mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName());
ASSERT_COLUMNS_EQ_UR(ref_columns, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName());
}
CATCH
} // namespace tests
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2255,9 +2255,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
}


BlocksList Aggregator::mergeBlocks(BlocksList & blocks, bool final)
BlocksList Aggregator::vstackBlocks(BlocksList & blocks, bool final)
{
RUNTIME_CHECK_MSG(!blocks.empty(), "The input blocks list for Aggregator::mergeBlocks must be non-empty");
RUNTIME_CHECK_MSG(!blocks.empty(), "The input blocks list for Aggregator::vstackBlocks must be non-empty");

auto bucket_num = blocks.front().info.bucket_num;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ class Aggregator
using BucketToBlocks = std::map<Int32, BlocksList>;

/// Merge several partially aggregated blocks into one.
BlocksList mergeBlocks(BlocksList & blocks, bool final);
BlocksList vstackBlocks(BlocksList & blocks, bool final);

/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1953,7 +1953,7 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
}
else
{
block = mergeBlocks(std::move(result_blocks));
block = vstackBlocks(std::move(result_blocks));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,9 @@ Block SegmentTestBasic::prepareWriteBlock(Int64 start_key, Int64 end_key, bool i
is_deleted);
}

Block sortMergeBlocks(std::vector<Block> && blocks)
Block sortvstackBlocks(std::vector<Block> && blocks)
{
auto accumulated_block = mergeBlocks(std::move(blocks));
auto accumulated_block = vstackBlocks(std::move(blocks));

SortDescription sort;
sort.emplace_back(EXTRA_HANDLE_COLUMN_NAME, 1, 0);
Expand Down Expand Up @@ -375,7 +375,7 @@ Block SegmentTestBasic::prepareWriteBlockInSegmentRange(PageIdU64 segment_id, UI
remaining_rows);
}

return sortMergeBlocks(std::move(blocks));
return sortvstackBlocks(std::move(blocks));
}

void SegmentTestBasic::writeSegment(PageIdU64 segment_id, UInt64 write_rows, std::optional<Int64> start_at)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic
ColumnPtr getSegmentHandle(PageIdU64 segment_id, const RowKeyRanges & ranges);
void writeSegmentWithDeleteRange(PageIdU64 segment_id, Int64 begin, Int64 end);
RowKeyValue buildRowKeyValue(Int64 key);
RowKeyRange buildRowKeyRange(Int64 begin, Int64 end);
static RowKeyRange buildRowKeyRange(Int64 begin, Int64 end);

protected:
std::mt19937 random;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ColumnPtr SegmentTestBasic::getSegmentHandle(PageIdU64 segment_id, const RowKeyR
}
else
{
auto block = mergeBlocks(std::move(blks));
auto block = vstackBlocks(std::move(blks));
RUNTIME_CHECK(block.has(EXTRA_HANDLE_COLUMN_NAME));
RUNTIME_CHECK(block.segmentRowIdCol() == nullptr);
return block.getByName(EXTRA_HANDLE_COLUMN_NAME).column;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TestUtils/ExecutorTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ DB::ColumnsWithTypeAndName readBlocks(std::vector<BlockInputStreamPtr> streams)
Blocks actual_blocks;
for (const auto & stream : streams)
readStream(actual_blocks, stream);
return mergeBlocks(std::move(actual_blocks)).getColumnsWithTypeAndName();
return vstackBlocks(std::move(actual_blocks)).getColumnsWithTypeAndName();
}

void ExecutorTest::enablePlanner(bool is_enable)
Expand Down Expand Up @@ -244,7 +244,7 @@ ColumnsWithTypeAndName ExecutorTest::executeStreams(DAGContext * dag_context, bo
// Currently, don't care about regions information in tests.
Blocks blocks;
queryExecute(context.context, /*internal=*/!enable_memory_tracker)->execute([&blocks](const Block & block) { blocks.push_back(block); }).verify();
return mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName();
return vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName();
}

Blocks ExecutorTest::getExecuteStreamsReturnBlocks(const std::shared_ptr<tipb::DAGRequest> & request,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/ExecutorTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class ExecutorTest : public ::testing::Test
}
}

ColumnsWithTypeAndName executeStreams(DAGContext * dag_context, bool enalbe_memory_tracker = false);
ColumnsWithTypeAndName executeStreams(DAGContext * dag_context, bool enable_memory_tracker = false);

ColumnsWithTypeAndName executeStreams(
const std::shared_ptr<tipb::DAGRequest> & request,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/MPPTaskTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ ColumnsWithTypeAndName extractColumns(Context & context, const std::shared_ptr<t
auto schema = getSelectSchema(context);
for (const auto & chunk : dag_response->chunks())
blocks.emplace_back(codec->decode(chunk.rows_data(), schema));
return mergeBlocks(std::move(blocks)).getColumnsWithTypeAndName();
return vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName();
}

ColumnsWithTypeAndName MPPTaskTestUtils::executeCoprocessorTask(std::shared_ptr<tipb::DAGRequest> & dag_request)
Expand Down

0 comments on commit 01a5ddd

Please sign in to comment.