Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sys-4729] ignore table handler loading error when apply replication log record #275

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 80 additions & 12 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,46 @@ class Listener : public ReplicationLogListener {
State state_{OPEN};
};


class FollowerEnv;
class FollowerRandomAccessFile: public RandomAccessFile {
public:
FollowerRandomAccessFile(std::unique_ptr<RandomAccessFile> target,
FollowerEnv* env);

Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
private:
std::unique_ptr<RandomAccessFile> target_;
FollowerEnv* env_;
};

class FollowerEnv : public EnvWrapper {
public:
FollowerEnv(std::string leader_path)
: EnvWrapper(Env::Default()), leader_path_(std::move(leader_path)) {}
FollowerEnv(std::string leader_path, Env* base_env)
: EnvWrapper(base_env), leader_path_(std::move(leader_path)) {}

Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override {
return EnvWrapper::NewRandomAccessFile(mapFilename(fname), result, options);
auto s = EnvWrapper::NewRandomAccessFile(mapFilename(fname), result, options);
if (!s.ok()) {
return s;
}

result->reset(new FollowerRandomAccessFile(std::move(*result), this));
return s;
};

Status GetFileSize(const std::string& fname, uint64_t* file_size) override {
return EnvWrapper::GetFileSize(mapFilename(fname), file_size);
}

bool shouldFailRandomRead() const { return simulate_random_read_failure_; }
void setSimulateRandomReadFailure(bool simulate) {
simulate_random_read_failure_ = simulate;
}

private:
std::string mapFilename(const std::string& fname) {
if (IsSstFile(fname)) {
Expand All @@ -85,8 +110,25 @@ class FollowerEnv : public EnvWrapper {
}

std::string leader_path_;

// If true, return io error for random access read
bool simulate_random_read_failure_ = false;
};

FollowerRandomAccessFile::FollowerRandomAccessFile(
std::unique_ptr<RandomAccessFile> target, FollowerEnv* env)
: target_(std::move(target)), env_(env) {}


Status FollowerRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
if (env_->shouldFailRandomRead()) {
return Status::IOError("simulated error");
}

return target_->Read(offset, n, result, scratch);
}

int getPersistedSequence(DB* db) {
std::string out;
auto s = db->GetPersistedReplicationSequence(&out);
Expand Down Expand Up @@ -138,7 +180,8 @@ Status countWalFiles(Env* env, const std::string& path, size_t* out) {
class ReplicationTest : public testing::Test {
public:
ReplicationTest()
: test_dir_(test::TmpDir()), follower_env_(test_dir_ + "/leader") {
: test_dir_(test::TmpDir()),
follower_env_(leaderPath(), Env::Default()) {
auto base_env = Env::Default();
DestroyDir(base_env, test_dir_ + "/leader");
DestroyDir(base_env, test_dir_ + "/follower");
Expand Down Expand Up @@ -194,7 +237,7 @@ class ReplicationTest : public testing::Test {
}

DB* openFollower() {
return openFollower(leaderOptions());
return openFollower(followerOptions());
}
DB* openFollower(Options options);

Expand All @@ -204,6 +247,7 @@ class ReplicationTest : public testing::Test {
}

Options leaderOptions() const;
Options followerOptions();

// catching up follower for `num_records`. If `num_records` not specified,
// it will catch up until end of log
Expand Down Expand Up @@ -293,6 +337,9 @@ class ReplicationTest : public testing::Test {
void resetFollowerSequence(int new_seq) {
followerSequence_ = new_seq;
}
FollowerEnv& followerEnv() {
return follower_env_;
}
private:
std::string test_dir_;
FollowerEnv follower_env_;
Expand Down Expand Up @@ -321,6 +368,16 @@ Options ReplicationTest::leaderOptions() const {
return options;
}

Options ReplicationTest::followerOptions() {
auto options = leaderOptions();
options.env = &follower_env_;
options.disable_auto_compactions = true;
// write buffer size of follower is much smaller than leader to help verify
// that disable_auto_flush works as expected
options.write_buffer_size = 10 << 10;
return options;
}

DB* ReplicationTest::openLeader(Options options) {
bool firstOpen = log_records_.empty();
auto dbname = test_dir_ + "/leader";
Expand Down Expand Up @@ -386,13 +443,6 @@ DB* ReplicationTest::openFollower(Options options) {
cf_names.push_back(kDefaultColumnFamilyName);
}

options.env = &follower_env_;
options.disable_auto_compactions = true;
// write buffer size of follower is much smaller than leader to help verify
// that disable_auto_flush works as expected
options.write_buffer_size = 10 << 10;
options.disable_auto_flush = true;

std::vector<ColumnFamilyDescriptor> column_families;
for (auto& name : cf_names) {
column_families.emplace_back(name, options);
Expand Down Expand Up @@ -1201,6 +1251,24 @@ TEST_F(ReplicationTest, DeleteRange) {
verifyEqual();
}

// Simulate footer load failure when follower applies manifest write
TEST_F(ReplicationTest, IgnoreFooterLoadFailure) {
auto leader = openLeader();
auto follower = openFollower();
ASSERT_OK(leader->Put(wo(), "k1", "v1"));
ASSERT_OK(leader->Flush({}));
// memtable write and memtable switch
EXPECT_EQ(catchUpFollower(2), 2);
followerEnv().setSimulateRandomReadFailure(true);
EXPECT_EQ(catchUpFollower(), 1);
std::string v;
EXPECT_TRUE(follower->Get({}, "k1", &v).IsIOError());

followerEnv().setSimulateRandomReadFailure(false);
EXPECT_OK(follower->Get({}, "k1", &v));
EXPECT_EQ(v, "v1");
}

} // namespace ROCKSDB_NAMESPACE

// A black-box test for the cloud wrapper around rocksdb
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1429,8 +1429,9 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
}
s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists,
&mutex_, directories_.GetDbDir(),
false /* new_descriptor_log */,
&*cf_options);
false /* new_descriptor_log */, &*cf_options,
{} /* manifest_wcbs */,
true /* ignore_table_load_error */);
if (!s.ok()) {
break;
}
Expand Down
17 changes: 13 additions & 4 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4829,7 +4829,8 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Status VersionSet::ProcessManifestWrites(
std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
FSDirectory* dir_contains_current_file, bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options) {
const ColumnFamilyOptions* new_cf_options,
bool ignore_table_load_error) {
mu->AssertHeld();
assert(!writers.empty());
ManifestWriter& first_writer = writers.front();
Expand Down Expand Up @@ -5111,9 +5112,15 @@ Status VersionSet::ProcessManifestWrites(
mutable_cf_options_ptrs[i]->prefix_extractor,
MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
if (!s.ok()) {
if (db_options_->paranoid_checks) {
// ignore_table_load_error has higher precedence than paranoid_checks
if (!ignore_table_load_error && db_options_->paranoid_checks) {
break;
}
if (ignore_table_load_error) {
ROCKS_LOG_INFO(db_options_->info_log,
"Ignore LoadTableHandler error: %s",
s.ToString().c_str());
}
s = Status::OK();
}
}
Expand Down Expand Up @@ -5451,7 +5458,8 @@ Status VersionSet::LogAndApply(
const autovector<autovector<VersionEdit*>>& edit_lists,
InstrumentedMutex* mu, FSDirectory* dir_contains_current_file,
bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options,
const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
const std::vector<std::function<void(const Status&)>>& manifest_wcbs,
bool ignore_table_load_error) {
mu->AssertHeld();
int num_edits = 0;
for (const auto& elist : edit_lists) {
Expand Down Expand Up @@ -5525,7 +5533,8 @@ Status VersionSet::LogAndApply(
return Status::ColumnFamilyDropped();
}
return ProcessManifestWrites(writers, mu, dir_contains_current_file,
new_descriptor_log, new_cf_options);
new_descriptor_log, new_cf_options,
ignore_table_load_error);
}

void VersionSet::LogAndApplyCFHelper(VersionEdit* edit,
Expand Down
12 changes: 9 additions & 3 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,9 @@ class VersionSet {
// The across-multi-cf batch version. If edit_lists contain more than
// 1 version edits, caller must ensure that no edit in the []list is column
// family manipulation.
//
// `ignore_table_load_error`: if true, errors of loading newly added table
// handlers are ignored even if paranoid_checks is true.
virtual Status LogAndApply(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
Expand All @@ -1164,7 +1167,8 @@ class VersionSet {
bool new_descriptor_log = false,
const ColumnFamilyOptions* new_cf_options = nullptr,
const std::vector<std::function<void(const Status&)>>& manifest_wcbs =
{});
{},
bool ignore_table_load_error = false);

static Status GetCurrentManifestPath(const std::string& dbname,
FileSystem* fs,
Expand Down Expand Up @@ -1603,7 +1607,8 @@ class VersionSet {
InstrumentedMutex* mu,
FSDirectory* dir_contains_current_file,
bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options);
const ColumnFamilyOptions* new_cf_options,
bool ignore_table_load_error);

void LogAndApplyCFHelper(VersionEdit* edit,
SequenceNumber* max_last_sequence);
Expand Down Expand Up @@ -1665,7 +1670,8 @@ class ReactiveVersionSet : public VersionSet {
const autovector<autovector<VersionEdit*>>& /*edit_lists*/,
InstrumentedMutex* /*mu*/, FSDirectory* /*dir_contains_current_file*/,
bool /*new_descriptor_log*/, const ColumnFamilyOptions* /*new_cf_option*/,
const std::vector<std::function<void(const Status&)>>& /*manifest_wcbs*/)
const std::vector<std::function<void(const Status&)>>& /*manifest_wcbs*/,
bool /* ignore_table_load_error */)
override {
return Status::NotSupported("not supported in reactive mode");
}
Expand Down
Loading