diff --git a/dbms/src/DataStreams/NonJoinedBlockInputStream.cpp b/dbms/src/DataStreams/NonJoinedBlockInputStream.cpp index 821519b4a30..98513d86b50 100644 --- a/dbms/src/DataStreams/NonJoinedBlockInputStream.cpp +++ b/dbms/src/DataStreams/NonJoinedBlockInputStream.cpp @@ -30,7 +30,7 @@ struct AdderNonJoined; template struct AdderNonJoined { - static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right) + static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right, const void *&, const size_t) { for (size_t j = 0; j < num_columns_left; ++j) /// should fill the key column with key columns from right block @@ -48,10 +48,13 @@ struct AdderNonJoined template struct AdderNonJoined { - static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right) + static size_t add(const Mapped & mapped, size_t key_num, size_t num_columns_left, MutableColumns & columns_left, size_t num_columns_right, MutableColumns & columns_right, const void *& next_element_in_row_list, const size_t max_row_added) { size_t rows_added = 0; - for (auto current = &static_cast(mapped); current != nullptr; current = current->next) + auto current = &static_cast(mapped); + if unlikely (next_element_in_row_list != nullptr) + current = reinterpret_cast(next_element_in_row_list); + for (; rows_added < max_row_added && current != nullptr; current = current->next) { for (size_t j = 0; j < num_columns_left; ++j) /// should fill the key column with key columns from right block @@ -62,8 +65,9 @@ struct AdderNonJoined for (size_t j = 0; j < num_columns_right; ++j) columns_right[j]->insertFrom(*current->block->getByPosition(key_num + j).column.get(), current->row_num); - rows_added++; + ++rows_added; } + next_element_in_row_list = current; return rows_added; } }; @@ -216,7 +220,7 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map, /// first add rows that is not in the hash table while (current_not_mapped_row != nullptr) { - rows_added++; + ++rows_added; for (size_t j = 0; j < num_columns_left; ++j) /// should fill the key column with key columns from right block /// but we don't care about the key column now so just insert a default value is ok. @@ -249,7 +253,7 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map, auto it = reinterpret_cast(position.get()); auto end = map.getSegmentTable(current_segment).end(); - for (; *it != end || current_segment + step < map.getSegmentSize(); ++(*it)) + for (; *it != end || current_segment + step < map.getSegmentSize();) { if (*it == end) { @@ -268,15 +272,25 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map, break; } if ((*it)->getMapped().getUsed()) + { + ++(*it); continue; + } - rows_added += AdderNonJoined::add((*it)->getMapped(), key_num, num_columns_left, mutable_columns_left, num_columns_right, mutable_columns_right); - - if (rows_added >= max_block_size) + rows_added += AdderNonJoined::add((*it)->getMapped(), key_num, num_columns_left, mutable_columns_left, num_columns_right, mutable_columns_right, next_element_in_row_list, max_block_size - rows_added); + assert(rows_added <= max_block_size); + if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any) { ++(*it); - break; } + else if (next_element_in_row_list == nullptr) + { + /// next_element_in_row_list == nullptr means current row_list is done, so move the iterator + ++(*it); + } + + if (rows_added == max_block_size) + break; } return rows_added; } diff --git a/dbms/src/DataStreams/NonJoinedBlockInputStream.h b/dbms/src/DataStreams/NonJoinedBlockInputStream.h index bb254437c84..fea7f1bc1c2 100644 --- a/dbms/src/DataStreams/NonJoinedBlockInputStream.h +++ b/dbms/src/DataStreams/NonJoinedBlockInputStream.h @@ -55,6 +55,7 @@ class NonJoinedBlockInputStream : public IProfilingBlockInputStream MutableColumns columns_right; std::unique_ptr> position; /// type erasure + const void * next_element_in_row_list = nullptr; size_t current_segment = 0; Join::RowRefList * current_not_mapped_row = nullptr; diff --git a/dbms/src/Flash/tests/gtest_join_executor.cpp b/dbms/src/Flash/tests/gtest_join_executor.cpp index 977e9356f95..30d8e9e9bd7 100644 --- a/dbms/src/Flash/tests/gtest_join_executor.cpp +++ b/dbms/src/Flash/tests/gtest_join_executor.cpp @@ -745,6 +745,32 @@ try ASSERT_EQ(expect[i][j], blocks[j].rows()); } } + // test non joined data + context.addMockTable("split_test", "t3", {{"a", TiDB::TP::TypeLong}}, {toVec("a", {2, 2, 2, 2, 2})}); + context.addMockTable("split_test", "t4", {{"a", TiDB::TP::TypeLong}}, {toVec("a", {1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3})}); + request = context + .scan("split_test", "t3") + .join(context.scan("split_test", "t4"), tipb::JoinType::TypeRightOuterJoin, {col("a")}) + .build(context); + + expect = {{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, + {2, 2, 2, 2, 2, 2, 2, 2, 2, 2}, + {7, 7, 6}, + {20}, + {20}, + {20}, + {20}, + {20}}; + for (size_t i = 0; i < block_sizes.size(); ++i) + { + context.context.setSetting("max_block_size", Field(static_cast(block_sizes[i]))); + auto blocks = getExecuteStreamsReturnBlocks(request); + ASSERT_EQ(expect[i].size(), blocks.size()); + for (size_t j = 0; j < blocks.size(); ++j) + { + ASSERT_EQ(expect[i][j], blocks[j].rows()); + } + } } CATCH @@ -815,36 +841,34 @@ try /// case 1, right join without right condition auto request = context - .scan("outer_join_test", left_table_names[0]) - .join(context.scan("outer_join_test", right_table_names[0]), tipb::JoinType::TypeRightOuterJoin, {col("a")}) + .scan("outer_join_test", right_table_names[0]) + .join(context.scan("outer_join_test", left_table_names[0]), tipb::JoinType::TypeLeftOuterJoin, {col("a")}) + .project({fmt::format("{}.a", left_table_names[0]), fmt::format("{}.b", left_table_names[0]), fmt::format("{}.a", right_table_names[0]), fmt::format("{}.b", right_table_names[0])}) .build(context); context.context.setSetting("max_block_size", Field(static_cast(max_block_size))); - /// use 1 build concurrency join 1 probe concurrency as the reference + /// use right_table left join left_table as the reference auto ref_columns = executeStreams(request, original_max_streams); /// case 1.1 table scan join table scan - for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index) + for (auto & left_table_name : left_table_names) { - for (size_t right_index = 0; right_index < right_table_names.size(); ++right_index) + for (auto & right_table_name : right_table_names) { - if (left_index == 0 && right_index == 0) - continue; request = context - .scan("outer_join_test", left_table_names[left_index]) - .join(context.scan("outer_join_test", right_table_names[right_index]), tipb::JoinType::TypeRightOuterJoin, {col("a")}) + .scan("outer_join_test", left_table_name) + .join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")}) .build(context); auto result_columns = executeStreams(request, original_max_streams); ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns); } } /// case 1.2 table scan join fine grained exchange receiver - for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index) + for (auto & left_table_name : left_table_names) { - for (size_t right_index = 0; right_index < right_exchange_receiver_concurrency.size(); ++right_index) + for (size_t exchange_concurrency : right_exchange_receiver_concurrency) { - size_t exchange_concurrency = right_exchange_receiver_concurrency[right_index]; request = context - .scan("outer_join_test", left_table_names[left_index]) + .scan("outer_join_test", left_table_name) .join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {}, {}, {}, exchange_concurrency) .build(context); auto result_columns = executeStreams(request, original_max_streams); @@ -853,36 +877,34 @@ try } /// case 2, right join with right condition request = context - .scan("outer_join_test", left_table_names[0]) - .join(context.scan("outer_join_test", right_table_names[0]), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_names[0] + ".b"), lit(Field(static_cast(1000))))}, {}, {}, 0) + .scan("outer_join_test", right_table_names[0]) + .join(context.scan("outer_join_test", left_table_names[0]), tipb::JoinType::TypeLeftOuterJoin, {col("a")}, {gt(col(right_table_names[0] + ".b"), lit(Field(static_cast(1000))))}, {}, {}, {}, 0) + .project({fmt::format("{}.a", left_table_names[0]), fmt::format("{}.b", left_table_names[0]), fmt::format("{}.a", right_table_names[0]), fmt::format("{}.b", right_table_names[0])}) .build(context); context.context.setSetting("max_block_size", Field(static_cast(max_block_size))); - /// use 1 build concurrency join 1 probe concurrency as the reference + /// use right_table left join left_table as the reference ref_columns = executeStreams(request, original_max_streams); /// case 2.1 table scan join table scan - for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index) + for (auto & left_table_name : left_table_names) { - for (size_t right_index = 0; right_index < right_table_names.size(); ++right_index) + for (auto & right_table_name : right_table_names) { - if (left_index == 0 && right_index == 0) - continue; request = context - .scan("outer_join_test", left_table_names[left_index]) - .join(context.scan("outer_join_test", right_table_names[right_index]), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_names[right_index] + ".b"), lit(Field(static_cast(1000))))}, {}, {}, 0) + .scan("outer_join_test", left_table_name) + .join(context.scan("outer_join_test", right_table_name), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(right_table_name + ".b"), lit(Field(static_cast(1000))))}, {}, {}, 0) .build(context); auto result_columns = executeStreams(request, original_max_streams); ASSERT_COLUMNS_EQ_UR(ref_columns, result_columns); } } /// case 2.2 table scan join fine grained exchange receiver - for (size_t left_index = 0; left_index < left_table_names.size(); ++left_index) + for (auto & left_table_name : left_table_names) { - for (size_t right_index = 0; right_index < right_exchange_receiver_concurrency.size(); ++right_index) + for (size_t exchange_concurrency : right_exchange_receiver_concurrency) { - size_t exchange_concurrency = right_exchange_receiver_concurrency[right_index]; String exchange_name = fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency); request = context - .scan("outer_join_test", left_table_names[left_index]) + .scan("outer_join_test", left_table_name) .join(context.receive(fmt::format("right_exchange_receiver_{}_concurrency", exchange_concurrency), exchange_concurrency), tipb::JoinType::TypeRightOuterJoin, {col("a")}, {}, {gt(col(exchange_name + ".b"), lit(Field(static_cast(1000))))}, {}, {}, exchange_concurrency) .build(context); auto result_columns = executeStreams(request, original_max_streams);