Skip to content

Commit

Permalink
mock exchange receiver with fine grained enable support different max…
Browse files Browse the repository at this point in the history
…_streams (#6999)

ref #6528
  • Loading branch information
windtalker authored Mar 9, 2023
1 parent f604100 commit 2444393
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 27 deletions.
57 changes: 43 additions & 14 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,61 @@
namespace DB
{
MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const tipb::ExchangeReceiver & receiver, size_t max_block_size, size_t rows_)
: output_index(0)
, max_block_size(max_block_size)
: max_block_size(max_block_size)
, rows(rows_)
, source_num(static_cast<size_t>(receiver.encoded_task_meta_size()))
{
ColumnsWithTypeAndName columns;
assert(receiver.field_types_size() > 0);
for (int i = 0; i < receiver.field_types_size(); ++i)
{
columns.emplace_back(
getDataTypeByColumnInfoForComputingLayer(TiDB::fieldTypeToColumnInfo(receiver.field_types(i))),
fmt::format("exchange_receiver_{}", i));
}
columns_vector.push_back(std::move(columns));
}

MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size)
: columns(columns)
, output_index(0)
, max_block_size(max_block_size)
void MockExchangeReceiverInputStream::initTotalRows()
{
rows = 0;
for (const auto & elem : columns)
for (const auto & columns : columns_vector)
{
if (elem.column)
size_t current_rows = 0;
for (const auto & elem : columns)
{
assert(rows == 0 || rows == elem.column->size());
rows = elem.column->size();
if (elem.column)
{
assert(current_rows == 0 || current_rows == elem.column->size());
current_rows = elem.column->size();
}
}
rows += current_rows;
}
}

MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const ColumnsWithTypeAndName & columns, size_t max_block_size)
: max_block_size(max_block_size)
{
assert(!columns.empty());
columns_vector.push_back(columns);
initTotalRows();
}

MockExchangeReceiverInputStream::MockExchangeReceiverInputStream(const std::vector<ColumnsWithTypeAndName> & columns_vector_, size_t max_block_size)
: columns_vector(columns_vector_)
, max_block_size(max_block_size)
{
assert(!columns_vector.empty() && !columns_vector[0].empty());
initTotalRows();
}

ColumnPtr MockExchangeReceiverInputStream::makeColumn(ColumnWithTypeAndName elem) const
{
auto column = elem.type->createColumn();
size_t row_count = 0;
for (size_t i = output_index; i < rows && i < elem.column->size() && row_count < max_block_size; ++i)
size_t current_output_rows = output_rows;
for (size_t i = output_index_in_current_columns; current_output_rows < rows && i < elem.column->size() && row_count < max_block_size; ++i, ++current_output_rows)
{
column->insert((*elem.column)[i]);
++row_count;
Expand All @@ -61,14 +82,22 @@ ColumnPtr MockExchangeReceiverInputStream::makeColumn(ColumnWithTypeAndName elem

Block MockExchangeReceiverInputStream::readImpl()
{
if (output_index >= rows)
if (output_rows >= rows)
return {};
ColumnsWithTypeAndName output_columns;
for (const auto & elem : columns)
assert(columns_vector.size() > output_columns_index);
for (const auto & elem : columns_vector[output_columns_index])
{
output_columns.push_back({makeColumn(elem), elem.type, elem.name, elem.column_id});
}
output_index += max_block_size;
size_t return_rows = output_columns[0].column->size();
output_rows += return_rows;
output_index_in_current_columns += return_rows;
if (output_index_in_current_columns == columns_vector[output_columns_index][0].column->size())
{
++output_columns_index;
output_index_in_current_columns = 0;
}
return Block(output_columns);
}
} // namespace DB
14 changes: 9 additions & 5 deletions dbms/src/DataStreams/MockExchangeReceiverInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,26 @@ class MockExchangeReceiverInputStream : public IProfilingBlockInputStream
{
public:
MockExchangeReceiverInputStream(const tipb::ExchangeReceiver & receiver, size_t max_block_size, size_t rows_);
MockExchangeReceiverInputStream(ColumnsWithTypeAndName columns, size_t max_block_size);
MockExchangeReceiverInputStream(const ColumnsWithTypeAndName & columns, size_t max_block_size);
MockExchangeReceiverInputStream(const std::vector<ColumnsWithTypeAndName> & columns_vector, size_t max_block_size);
Block getHeader() const override
{
return Block(columns).cloneEmpty();
return Block(columns_vector[0]).cloneEmpty();
}
String getName() const override { return "MockExchangeReceiver"; }
size_t getSourceNum() const { return source_num; }
ColumnsWithTypeAndName columns;
size_t output_index;
std::vector<ColumnsWithTypeAndName> columns_vector;
size_t output_rows = 0;
size_t output_index_in_current_columns = 0;
size_t output_columns_index = 0;
size_t max_block_size;
size_t rows;
size_t rows = 0;
size_t source_num = 0;

protected:
Block readImpl() override;
ColumnPtr makeColumn(ColumnWithTypeAndName elem) const;
void initTotalRows();
};

} // namespace DB
14 changes: 10 additions & 4 deletions dbms/src/Flash/Coprocessor/MockSourceStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreamsForExchangeRecei
/// build with empty blocks.
size_t stream_count = max_streams;
if (fine_grained_stream_count > 0)
stream_count = fine_grained_stream_count;
stream_count = std::min(fine_grained_stream_count, max_streams);
for (size_t i = 0; i < stream_count; ++i)
mock_streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(exchange_receiver, context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
for (const auto & col : mock_streams.back()->getHeader())
Expand All @@ -59,17 +59,23 @@ std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreamsForExchangeRecei
/// build from user input blocks.
if (fine_grained_stream_count > 0)
{
size_t output_stream_count = std::min(fine_grained_stream_count, max_streams);
std::vector<ColumnsWithTypeAndName> columns_with_type_and_name_vector;
columns_with_type_and_name_vector = context.mockStorage()->getFineGrainedExchangeColumnsVector(executor_id, fine_grained_stream_count);
if (columns_with_type_and_name_vector.empty())
{
for (size_t i = 0; i < fine_grained_stream_count; ++i)
for (size_t i = 0; i < output_stream_count; ++i)
mock_streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(exchange_receiver, context.getSettingsRef().max_block_size, context.getSettingsRef().max_block_size / 10));
}
else
{
for (const auto & columns : columns_with_type_and_name_vector)
mock_streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(columns, context.getSettingsRef().max_block_size));
std::vector<std::vector<ColumnsWithTypeAndName>> columns_for_mock_exchange_receiver(output_stream_count);
for (size_t i = 0; i < columns_with_type_and_name_vector.size(); ++i)
{
columns_for_mock_exchange_receiver[i % output_stream_count].push_back(columns_with_type_and_name_vector[i]);
}
for (size_t i = 0; i < output_stream_count; ++i)
mock_streams.push_back(std::make_shared<MockExchangeReceiverInputStream>(columns_for_mock_exchange_receiver[i], context.getSettingsRef().max_block_size));
}
for (const auto & col : mock_streams.back()->getHeader())
schema.emplace_back(col.name, col.type);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/tests/gtest_join_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ try
size_t common_rows = 20480;
UInt64 max_block_size = 800;
size_t original_max_streams = 20;
size_t original_max_streams_small = 4;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(left_column_infos))
{
ColumnGeneratorOpts opts{common_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
Expand Down Expand Up @@ -828,8 +829,8 @@ try
.scan("outer_join_test", left_table_name)
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {}, {}, {}, exchange_concurrency)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams_small));
}
}
/// case 2, right join with right condition
Expand Down Expand Up @@ -864,8 +865,8 @@ try
.scan("outer_join_test", left_table_name)
.join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(exchange_name + ".b"), lit(Field(static_cast<Int64>(1000))))}, {}, {}, exchange_concurrency)
.build(context);
auto result_columns = executeStreams(request, original_max_streams);
ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns);
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams_small));
}
}
}
Expand Down

0 comments on commit 2444393

Please sign in to comment.