From 84a29b90d9cfd8a7eae2ba3e3d5a722f767ac01c Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Mon, 18 Mar 2024 17:20:17 +0800 Subject: [PATCH] [fix](move-memtable) fix commit may fail due to duplicated reports --- be/src/vec/sink/load_stream_stub.h | 11 + be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +- be/test/vec/sink/vtablet_writer_v2_test.cpp | 220 +++++++++++++++++++ 3 files changed, 232 insertions(+), 1 deletion(-) create mode 100644 be/test/vec/sink/vtablet_writer_v2_test.cpp diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index f20b0e6ea3d7c5d..c57d40d7c7a48a4 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -194,6 +194,17 @@ class LoadStreamStub { friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); +#ifdef BE_TEST + inline void add_success_tablet(int64_t tablet_id) { + std::lock_guard lock(_success_tablets_mutex); + _success_tablets.push_back(tablet_id); + } + inline void add_failed_tablet(int64_t tablet_id, Status reason) { + std::lock_guard lock(_failed_tablets_mutex); + _failed_tablets[tablet_id] = reason; + } +#endif + private: Status _encode_and_send(PStreamHeader& header, std::span data = {}); Status _send_with_buffer(butil::IOBuf& buf, bool sync = false); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index c3e4b4e2a266d6e..a738556e6729bc1 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -598,8 +598,8 @@ Status VTabletWriterV2::_generate_commit_info( std::unordered_map failed_tablets; std::unordered_map failed_reason; for (const auto& [node_id, streams] : streams_for_node) { + std::unordered_set known_tablets; for (const auto& stream : streams->streams()) { - std::unordered_set known_tablets; for (auto [tablet_id, reason] : stream->failed_tablets()) { if (known_tablets.contains(tablet_id)) { continue; diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp b/be/test/vec/sink/vtablet_writer_v2_test.cpp new file mode 100644 index 000000000000000..281401be2e449aa --- /dev/null +++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/writer/vtablet_writer_v2.h" + +#include + +#include "vec/sink/load_stream_stub.h" +#include "vec/sink/load_stream_stub_pool.h" + +namespace doris { + +class TestVTabletWriterV2 : public ::testing::Test { +public: + TestVTabletWriterV2() {} + ~TestVTabletWriterV2() {} + static void SetUpTestSuite() {} + + static void TearDownTestSuite() {} +}; + +const int64_t src_id = 1000; + +static void add_stream(std::unordered_map>& streams_for_node, + int64_t node_id, std::vector success_tablets, + std::unordered_map failed_tablets) { + PUniqueId load_id; + if (!streams_for_node.contains(node_id)) { + streams_for_node[node_id] = std::make_shared(load_id, node_id, 1, nullptr); + } + auto stub = std::make_unique(load_id, src_id, 1); + for (const auto& tablet_id : success_tablets) { + stub->add_success_tablet(tablet_id); + } + for (const auto& [tablet_id, reason] : failed_tablets) { + stub->add_failed_tablet(tablet_id, reason); + } + streams_for_node[node_id]->streams().push_back(std::move(stub)); +} + +TEST_F(TestVTabletWriterV2, one_replica) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 1; + add_stream(streams_for_node, 1001, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 2); +} + +TEST_F(TestVTabletWriterV2, one_replica_fail) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 1; + add_stream(streams_for_node, 1001, {1}, {{2, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_EQ(st, Status::InternalError("test")); +} + +TEST_F(TestVTabletWriterV2, two_replica) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 2; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, two_replica_fail) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 2; + add_stream(streams_for_node, 1001, {1}, {{2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1002, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_EQ(st, Status::InternalError("test")); +} + +TEST_F(TestVTabletWriterV2, normal) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1, 2}, {}); + add_stream(streams_for_node, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 6); +} + +TEST_F(TestVTabletWriterV2, miss_one) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1}, {}); + add_stream(streams_for_node, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 5); +} + +TEST_F(TestVTabletWriterV2, miss_two) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1}, {}); + add_stream(streams_for_node, 1003, {1}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + // BE will report commit info, and FE should detect tablet 2 is under-replicated + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, fail_one) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 5); +} + +TEST_F(TestVTabletWriterV2, fail_one_duplicate) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + // Duplicate tablets from same node should be ignored + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 5); +} + +TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_same_node) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {}, + {{1, Status::InternalError("test")}, {2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1003, {1, 2}, {}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_diff_node) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1003, {2}, {{1, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(tablet_commit_infos.size(), 4); +} + +TEST_F(TestVTabletWriterV2, fail_two_same_tablet) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1, 2}, {}); + add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1003, {1}, {{2, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + // BE should detect and abort commit if majority of replicas failed + ASSERT_EQ(st, Status::InternalError("test")); +} + +TEST_F(TestVTabletWriterV2, fail_two_miss_one_same_tablet) { + std::vector tablet_commit_infos; + std::unordered_map> streams_for_node; + const int num_replicas = 3; + add_stream(streams_for_node, 1001, {1}, {}); + add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}}); + add_stream(streams_for_node, 1003, {1}, {{2, Status::InternalError("test")}}); + auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos, + streams_for_node, num_replicas); + // BE should detect and abort commit if majority of replicas failed + ASSERT_EQ(st, Status::InternalError("test")); +} + +} // namespace doris