Skip to content

Commit

Permalink
[fix](move-memtable) fix commit may fail due to duplicated reports
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Mar 18, 2024
1 parent fbb4e21 commit 84a29b9
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 1 deletion.
11 changes: 11 additions & 0 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,17 @@ class LoadStreamStub {

friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub);

#ifdef BE_TEST
inline void add_success_tablet(int64_t tablet_id) {
std::lock_guard<bthread::Mutex> lock(_success_tablets_mutex);
_success_tablets.push_back(tablet_id);
}
inline void add_failed_tablet(int64_t tablet_id, Status reason) {
std::lock_guard<bthread::Mutex> lock(_failed_tablets_mutex);
_failed_tablets[tablet_id] = reason;
}
#endif

private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data = {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ Status VTabletWriterV2::_generate_commit_info(
std::unordered_map<int64_t, int> failed_tablets;
std::unordered_map<int64_t, Status> failed_reason;
for (const auto& [node_id, streams] : streams_for_node) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams->streams()) {
std::unordered_set<int64_t> known_tablets;
for (auto [tablet_id, reason] : stream->failed_tablets()) {
if (known_tablets.contains(tablet_id)) {
continue;
Expand Down
220 changes: 220 additions & 0 deletions be/test/vec/sink/vtablet_writer_v2_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/sink/writer/vtablet_writer_v2.h"

#include <gtest/gtest.h>

#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"

namespace doris {

class TestVTabletWriterV2 : public ::testing::Test {
public:
TestVTabletWriterV2() {}
~TestVTabletWriterV2() {}
static void SetUpTestSuite() {}

static void TearDownTestSuite() {}
};

const int64_t src_id = 1000;

static void add_stream(std::unordered_map<int64_t, std::shared_ptr<LoadStreams>>& streams_for_node,
int64_t node_id, std::vector<int64_t> success_tablets,
std::unordered_map<int64_t, Status> failed_tablets) {
PUniqueId load_id;
if (!streams_for_node.contains(node_id)) {
streams_for_node[node_id] = std::make_shared<LoadStreams>(load_id, node_id, 1, nullptr);
}
auto stub = std::make_unique<LoadStreamStub>(load_id, src_id, 1);
for (const auto& tablet_id : success_tablets) {
stub->add_success_tablet(tablet_id);
}
for (const auto& [tablet_id, reason] : failed_tablets) {
stub->add_failed_tablet(tablet_id, reason);
}
streams_for_node[node_id]->streams().push_back(std::move(stub));
}

TEST_F(TestVTabletWriterV2, one_replica) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 1;
add_stream(streams_for_node, 1001, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 2);
}

TEST_F(TestVTabletWriterV2, one_replica_fail) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 1;
add_stream(streams_for_node, 1001, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_EQ(st, Status::InternalError("test"));
}

TEST_F(TestVTabletWriterV2, two_replica) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 2;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, two_replica_fail) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 2;
add_stream(streams_for_node, 1001, {1}, {{2, Status::InternalError("test")}});
add_stream(streams_for_node, 1002, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_EQ(st, Status::InternalError("test"));
}

TEST_F(TestVTabletWriterV2, normal) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1, 2}, {});
add_stream(streams_for_node, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 6);
}

TEST_F(TestVTabletWriterV2, miss_one) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1}, {});
add_stream(streams_for_node, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}

TEST_F(TestVTabletWriterV2, miss_two) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1}, {});
add_stream(streams_for_node, 1003, {1}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
// BE will report commit info, and FE should detect tablet 2 is under-replicated
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, fail_one) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(streams_for_node, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}

TEST_F(TestVTabletWriterV2, fail_one_duplicate) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(streams_for_node, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
// Duplicate tablets from same node should be ignored
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}

TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_same_node) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {},
{{1, Status::InternalError("test")}, {2, Status::InternalError("test")}});
add_stream(streams_for_node, 1003, {1, 2}, {});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, fail_two_diff_tablet_diff_node) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(streams_for_node, 1003, {2}, {{1, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}

TEST_F(TestVTabletWriterV2, fail_two_same_tablet) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1, 2}, {});
add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(streams_for_node, 1003, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}

TEST_F(TestVTabletWriterV2, fail_two_miss_one_same_tablet) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> streams_for_node;
const int num_replicas = 3;
add_stream(streams_for_node, 1001, {1}, {});
add_stream(streams_for_node, 1002, {1}, {{2, Status::InternalError("test")}});
add_stream(streams_for_node, 1003, {1}, {{2, Status::InternalError("test")}});
auto st = vectorized::VTabletWriterV2::_generate_commit_info(tablet_commit_infos,
streams_for_node, num_replicas);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}

} // namespace doris

0 comments on commit 84a29b9

Please sign in to comment.