Skip to content

Commit

Permalink
Add unit tests for fine-grained join & agg (#6445)
Browse files Browse the repository at this point in the history
ref #6157
  • Loading branch information
yibin87 authored Dec 23, 2022
1 parent c361d3a commit 957d2d4
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 26 deletions.
9 changes: 6 additions & 3 deletions dbms/src/Debug/MockExecutor/AggregationBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ bool AggregationBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t c
{
tipb_executor->set_tp(tipb::ExecType::TypeAggregation);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
auto * agg = tipb_executor->mutable_aggregation();
buildAggExpr(agg, collator_id, context);
buildGroupBy(agg, collator_id, context);
Expand Down Expand Up @@ -80,7 +81,8 @@ void AggregationBinder::toMPPSubPlan(size_t & executor_index, const DAGPropertie
false,
std::move(agg_exprs),
std::move(gby_exprs),
false);
false,
fine_grained_shuffle_stream_count);
partial_agg->children.push_back(children[0]);
std::vector<size_t> partition_keys;
size_t agg_func_num = partial_agg->agg_exprs.size();
Expand Down Expand Up @@ -206,7 +208,7 @@ void AggregationBinder::buildAggFunc(tipb::Expr * agg_func, const ASTFunction *
agg_func->set_aggfuncmode(tipb::AggFunctionMode::Partial1Mode);
}

ExecutorBinderPtr compileAggregation(ExecutorBinderPtr input, size_t & executor_index, ASTPtr agg_funcs, ASTPtr group_by_exprs)
ExecutorBinderPtr compileAggregation(ExecutorBinderPtr input, size_t & executor_index, ASTPtr agg_funcs, ASTPtr group_by_exprs, uint64_t fine_grained_shuffle_stream_count)
{
std::vector<ASTPtr> agg_exprs;
std::vector<ASTPtr> gby_exprs;
Expand Down Expand Up @@ -276,7 +278,8 @@ ExecutorBinderPtr compileAggregation(ExecutorBinderPtr input, size_t & executor_
need_append_project,
std::move(agg_exprs),
std::move(gby_exprs),
true);
true,
fine_grained_shuffle_stream_count);
aggregation->children.push_back(input);
return aggregation;
}
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Debug/MockExecutor/AggregationBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ class ExchangeReceiverBinder;
class AggregationBinder : public ExecutorBinder
{
public:
AggregationBinder(size_t & index_, const DAGSchema & output_schema_, bool has_uniq_raw_res_, bool need_append_project_, ASTs && agg_exprs_, ASTs && gby_exprs_, bool is_final_mode_)
AggregationBinder(size_t & index_, const DAGSchema & output_schema_, bool has_uniq_raw_res_, bool need_append_project_, ASTs && agg_exprs_, ASTs && gby_exprs_, bool is_final_mode_, uint64_t fine_grained_shuffle_stream_count_)
: ExecutorBinder(index_, "aggregation_" + std::to_string(index_), output_schema_)
, has_uniq_raw_res(has_uniq_raw_res_)
, need_append_project(need_append_project_)
, agg_exprs(std::move(agg_exprs_))
, gby_exprs(std::move(gby_exprs_))
, is_final_mode(is_final_mode_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}

bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;
Expand All @@ -53,13 +54,14 @@ class AggregationBinder : public ExecutorBinder
std::vector<ASTPtr> gby_exprs;
bool is_final_mode;
DAGSchema output_schema_for_partial_agg;
uint64_t fine_grained_shuffle_stream_count;

private:
void buildGroupBy(tipb::Aggregation * agg, int32_t collator_id, const Context & context) const;
void buildAggExpr(tipb::Aggregation * agg, int32_t collator_id, const Context & context) const;
void buildAggFunc(tipb::Expr * agg_func, const ASTFunction * func, int32_t collator_id) const;
};

ExecutorBinderPtr compileAggregation(ExecutorBinderPtr input, size_t & executor_index, ASTPtr agg_funcs, ASTPtr group_by_exprs);
ExecutorBinderPtr compileAggregation(ExecutorBinderPtr input, size_t & executor_index, ASTPtr agg_funcs, ASTPtr group_by_exprs, uint64_t fine_grained_shuffle_stream_count = 0);

} // namespace DB::mock
6 changes: 4 additions & 2 deletions dbms/src/Debug/MockExecutor/JoinBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ bool JoinBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator
{
tipb_executor->set_tp(tipb::ExecType::TypeJoin);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);

tipb::Join * join = tipb_executor->mutable_join();

Expand Down Expand Up @@ -288,14 +289,15 @@ ExecutorBinderPtr compileJoin(size_t & executor_index,
const ASTs & left_conds,
const ASTs & right_conds,
const ASTs & other_conds,
const ASTs & other_eq_conds_from_in)
const ASTs & other_eq_conds_from_in,
uint64_t fine_grained_shuffle_stream_count)
{
DAGSchema output_schema;

buildLeftSideJoinSchema(output_schema, left->output_schema, tp);
buildRightSideJoinSchema(output_schema, right->output_schema, tp);

auto join = std::make_shared<mock::JoinBinder>(executor_index, output_schema, tp, join_cols, left_conds, right_conds, other_conds, other_eq_conds_from_in);
auto join = std::make_shared<mock::JoinBinder>(executor_index, output_schema, tp, join_cols, left_conds, right_conds, other_conds, other_eq_conds_from_in, fine_grained_shuffle_stream_count);
join->children.push_back(left);
join->children.push_back(right);

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Debug/MockExecutor/JoinBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ class ExchangeReceiverBinder;
class JoinBinder : public ExecutorBinder
{
public:
JoinBinder(size_t & index_, const DAGSchema & output_schema_, tipb::JoinType tp_, const ASTs & join_cols_, const ASTs & l_conds, const ASTs & r_conds, const ASTs & o_conds, const ASTs & o_eq_conds)
JoinBinder(size_t & index_, const DAGSchema & output_schema_, tipb::JoinType tp_, const ASTs & join_cols_, const ASTs & l_conds, const ASTs & r_conds, const ASTs & o_conds, const ASTs & o_eq_conds, uint64_t fine_grained_shuffle_stream_count_)
: ExecutorBinder(index_, "Join_" + std::to_string(index_), output_schema_)
, tp(tp_)
, join_cols(join_cols_)
, left_conds(l_conds)
, right_conds(r_conds)
, other_conds(o_conds)
, other_eq_conds_from_in(o_eq_conds)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
if (!(join_cols.size() + left_conds.size() + right_conds.size() + other_conds.size() + other_eq_conds_from_in.size()))
throw Exception("No join condition found.");
Expand All @@ -57,9 +58,10 @@ class JoinBinder : public ExecutorBinder
const ASTs right_conds{};
const ASTs other_conds{};
const ASTs other_eq_conds_from_in{};
uint64_t fine_grained_shuffle_stream_count;
};
// compileJoin constructs a mocked Join executor node, note that all conditional expression params can be default
ExecutorBinderPtr compileJoin(size_t & executor_index, ExecutorBinderPtr left, ExecutorBinderPtr right, tipb::JoinType tp, const ASTs & join_cols, const ASTs & left_conds = {}, const ASTs & right_conds = {}, const ASTs & other_conds = {}, const ASTs & other_eq_conds_from_in = {});
ExecutorBinderPtr compileJoin(size_t & executor_index, ExecutorBinderPtr left, ExecutorBinderPtr right, tipb::JoinType tp, const ASTs & join_cols, const ASTs & left_conds = {}, const ASTs & right_conds = {}, const ASTs & other_conds = {}, const ASTs & other_eq_conds_from_in = {}, uint64_t fine_grained_shuffle_stream_count = 0);


/// Note: this api is only used by legacy test framework for compatibility purpose, which will be depracated soon,
Expand Down
172 changes: 172 additions & 0 deletions dbms/src/Flash/tests/gtest_compute_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <TestUtils/MPPTaskTestUtils.h>

namespace DB
Expand All @@ -34,6 +35,24 @@ class ComputeServerRunner : public DB::tests::MPPTaskTestUtils
{{"s1", TiDB::TP::TypeLong}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}},
{toNullableVec<Int32>("s1", {1, {}, 10000000, 10000000}), toNullableVec<String>("s2", {"apple", {}, "banana", "test"}), toNullableVec<String>("s3", {"apple", {}, "banana", "test"})});

/// agg table with 200 rows
std::vector<std::optional<TypeTraits<int>::FieldType>> agg_s1(200);
std::vector<std::optional<String>> agg_s2(200);
std::vector<std::optional<String>> agg_s3(200);
for (size_t i = 0; i < 200; ++i)
{
if (i % 30 != 0)
{
agg_s1[i] = i % 20;
agg_s2[i] = {fmt::format("val_{}", i % 10)};
agg_s3[i] = {fmt::format("val_{}", i)};
}
}
context.addMockTable(
{"test_db", "test_table_2"},
{{"s1", TiDB::TP::TypeLong}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}},
{toNullableVec<Int32>("s1", agg_s1), toNullableVec<String>("s2", agg_s2), toNullableVec<String>("s3", agg_s3)});

/// for join
context.addMockTable(
{"test_db", "l_table"},
Expand All @@ -43,9 +62,46 @@ class ComputeServerRunner : public DB::tests::MPPTaskTestUtils
{"test_db", "r_table"},
{{"s", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}},
{toNullableVec<String>("s", {"banana", {}, "banana"}), toNullableVec<String>("join_c", {"apple", {}, "banana"})});

/// join left table with 200 rows
std::vector<std::optional<TypeTraits<int>::FieldType>> join_s1(200);
std::vector<std::optional<String>> join_s2(200);
std::vector<std::optional<String>> join_s3(200);
for (size_t i = 0; i < 200; ++i)
{
if (i % 20 != 0)
{
agg_s1[i] = i % 5;
agg_s2[i] = {fmt::format("val_{}", i % 6)};
agg_s3[i] = {fmt::format("val_{}", i)};
}
}
context.addMockTable(
{"test_db", "l_table_2"},
{{"s1", TiDB::TP::TypeLong}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}},
{toNullableVec<Int32>("s1", agg_s1), toNullableVec<String>("s2", agg_s2), toNullableVec<String>("s3", agg_s3)});

/// join right table with 100 rows
std::vector<std::optional<TypeTraits<int>::FieldType>> join_r_s1(100);
std::vector<std::optional<String>> join_r_s2(100);
std::vector<std::optional<String>> join_r_s3(100);
for (size_t i = 0; i < 100; ++i)
{
if (i % 20 != 0)
{
join_r_s1[i] = i % 6;
join_r_s2[i] = {fmt::format("val_{}", i % 7)};
join_r_s3[i] = {fmt::format("val_{}", i)};
}
}
context.addMockTable(
{"test_db", "r_table_2"},
{{"s1", TiDB::TP::TypeLong}, {"s2", TiDB::TP::TypeString}, {"s3", TiDB::TP::TypeString}},
{toNullableVec<Int32>("s1", join_r_s1), toNullableVec<String>("s2", join_r_s2), toNullableVec<String>("s3", join_r_s3)});
}
};


TEST_F(ComputeServerRunner, runAggTasks)
try
{
Expand Down Expand Up @@ -445,5 +501,121 @@ try
}
}
CATCH

/// For FineGrainedShuffleJoin/Agg test usage, update internal exchange senders/receivers flag
/// Allow select,agg,join,tableScan,exchangeSender,exchangeReceiver,projection executors only
void setFineGrainedShuffleForExchange(tipb::Executor & root)
{
tipb::Executor * current = &root;
while (current)
{
switch (current->tp())
{
case tipb::ExecType::TypeSelection:
current = const_cast<tipb::Executor *>(&current->selection().child());
break;
case tipb::ExecType::TypeAggregation:
current = const_cast<tipb::Executor *>(&current->aggregation().child());
break;
case tipb::ExecType::TypeProjection:
current = const_cast<tipb::Executor *>(&current->projection().child());
break;
case tipb::ExecType::TypeJoin:
{
/// update build side path
JoinInterpreterHelper::TiFlashJoin tiflash_join{current->join()};
current = const_cast<tipb::Executor *>(&current->join().children()[tiflash_join.build_side_index]);
break;
}
case tipb::ExecType::TypeExchangeSender:
if (current->exchange_sender().tp() == tipb::Hash)
current->set_fine_grained_shuffle_stream_count(8);
current = const_cast<tipb::Executor *>(&current->exchange_sender().child());
break;
case tipb::ExecType::TypeExchangeReceiver:
current->set_fine_grained_shuffle_stream_count(8);
current = nullptr;
break;
case tipb::ExecType::TypeTableScan:
current = nullptr;
break;
default:
throw TiFlashException("Should not reach here", Errors::Coprocessor::Internal);
}
}
}

TEST_F(ComputeServerRunner, runFineGrainedShuffleJoinTest)
try
{
startServers(3);
constexpr size_t join_type_num = 7;
constexpr tipb::JoinType join_types[join_type_num] = {
tipb::JoinType::TypeInnerJoin,
tipb::JoinType::TypeLeftOuterJoin,
tipb::JoinType::TypeRightOuterJoin,
tipb::JoinType::TypeSemiJoin,
tipb::JoinType::TypeAntiSemiJoin,
tipb::JoinType::TypeLeftOuterSemiJoin,
tipb::JoinType::TypeAntiLeftOuterSemiJoin,
};
// fine-grained shuffle is enabled.
constexpr uint64_t enable = 8;
constexpr uint64_t disable = 0;

for (auto join_type : join_types)
{
std::cout << "JoinType: " << static_cast<int>(join_type) << std::endl;
auto properties = DB::tests::getDAGPropertiesForTest(serverNum());
auto request = context
.scan("test_db", "l_table_2")
.join(context.scan("test_db", "r_table_2"), join_type, {col("s1"), col("s2")}, disable)
.project({col("l_table_2.s1"), col("l_table_2.s2"), col("l_table_2.s3")});
const auto expected_cols = buildAndExecuteMPPTasks(request);

auto request2 = context
.scan("test_db", "l_table_2")
.join(context.scan("test_db", "r_table_2"), join_type, {col("s1"), col("s2")}, enable)
.project({col("l_table_2.s1"), col("l_table_2.s2"), col("l_table_2.s3")});
auto tasks = request2.buildMPPTasks(context, properties);
for (auto & task : tasks)
{
setFineGrainedShuffleForExchange(const_cast<tipb::Executor &>(task.dag_request->root_executor()));
}
const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap());
ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols);
}
}
CATCH

TEST_F(ComputeServerRunner, runFineGrainedShuffleAggTest)
try
{
startServers(3);
// fine-grained shuffle is enabled.
constexpr uint64_t enable = 8;
constexpr uint64_t disable = 0;
{
auto properties = DB::tests::getDAGPropertiesForTest(serverNum());
auto request = context
.scan("test_db", "test_table_2")
.aggregation({Max(col("s3"))}, {col("s1"), col("s2")}, disable);
const auto expected_cols = buildAndExecuteMPPTasks(request);

auto request2 = context
.scan("test_db", "test_table_2")
.aggregation({Max(col("s3"))}, {col("s1"), col("s2")}, enable);
auto tasks = request2.buildMPPTasks(context, properties);
for (auto & task : tasks)
{
setFineGrainedShuffleForExchange(const_cast<tipb::Executor &>(task.dag_request->root_executor()));
}

const auto actual_cols = executeMPPTasks(tasks, properties, MockComputeServerManager::instance().getServerConfigMap());
ASSERT_COLUMNS_EQ_UR(expected_cols, actual_cols);
}
}
CATCH

} // namespace tests
} // namespace DB
Loading

0 comments on commit 957d2d4

Please sign in to comment.