Skip to content

Commit

Permalink
Fix OnFlushCompleted fired before flush result write to MANIFEST (#5908)
Browse files Browse the repository at this point in the history
Summary:
When there are concurrent flush job on the same CF, `OnFlushCompleted` can be called before the flush result being install to LSM. Fixing the issue by passing `FlushJobInfo` through `MemTable`, and the thread who commit the flush result can fetch the `FlushJobInfo` and fire `OnFlushCompleted` on behave of the thread actually writing the SST.

Fix #5892
Pull Request resolved: #5908

Test Plan: Add new test. The test will fail without the fix.

Differential Revision: D17916144

Pulled By: riversand963

fbshipit-source-id: e18df67d9533b5baee52ae3605026cdeb05cbe10
  • Loading branch information
Yi Wu authored and facebook-github-bot committed Oct 16, 2019
1 parent 2c9e9f2 commit 1f9d7c0
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 62 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound.
* Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found).
* Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand.
* Fix OnFlushCompleted fired before flush result persisted in MANIFEST when there's concurrent flush job. The bug exists since OnFlushCompleted was introduced in rocksdb 3.8.
### New Features
* Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit.
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.
Expand Down
96 changes: 96 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <atomic>

#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "test_util/fault_injection_test_env.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"

namespace rocksdb {

Expand Down Expand Up @@ -323,6 +329,96 @@ TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
SyncPoint::GetInstance()->DisableProcessing();
}

#ifndef ROCKSDB_LITE
TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
class TestListener : public EventListener {
public:
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
// There's only one key in each flush.
ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
ASSERT_NE(0, info.smallest_seqno);
if (info.smallest_seqno == seq1) {
// First flush completed
ASSERT_FALSE(completed1);
completed1 = true;
CheckFlushResultCommitted(db, seq1);
} else {
// Second flush completed
ASSERT_FALSE(completed2);
completed2 = true;
ASSERT_EQ(info.smallest_seqno, seq2);
CheckFlushResultCommitted(db, seq2);
}
}

void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
InstrumentedMutex* mutex = db_impl->mutex();
mutex->Lock();
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
->cfd();
ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
mutex->Unlock();
}

std::atomic<SequenceNumber> seq1{0};
std::atomic<SequenceNumber> seq2{0};
std::atomic<bool> completed1{false};
std::atomic<bool> completed2{false};
};
std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();

SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:AfterScheduleFlush",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
{"DBImpl::FlushMemTableToOutputFile:Finish",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&listener](void* arg) {
// Wait for the second flush finished, out of mutex.
auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
"WaitSecond");
}
});

Options options = CurrentOptions();
options.create_if_missing = true;
options.listeners.push_back(listener);
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options.max_background_jobs = 8;
// Allow 2 immutable memtables.
options.max_write_buffer_number = 3;
Reopen(options);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
listener->seq1 = db_->GetLatestSequenceNumber();
// t1 will wait for the second flush complete before committing flush result.
auto t1 = port::Thread([&]() {
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
// Wait for first flush scheduled.
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
// The second flush will exit early without commit its result. The work
// is delegated to the first flush.
ASSERT_OK(Put("bar", "v"));
listener->seq2 = db_->GetLatestSequenceNumber();
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
t1.join();
ASSERT_TRUE(listener->completed1);
ASSERT_TRUE(listener->completed2);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // !ROCKSDB_LITE

TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1005,11 +1005,11 @@ class DBImpl : public DB {

void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
int job_id);

void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
void NotifyOnFlushCompleted(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info);

void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
const Status& st,
Expand Down
77 changes: 33 additions & 44 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ Status DBImpl::FlushMemTableToOutputFile(

#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
flush_job.GetTableProperties());
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE

Status s;
Expand Down Expand Up @@ -213,8 +212,8 @@ Status DBImpl::FlushMemTableToOutputFile(
if (s.ok()) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id, flush_job.GetTableProperties());
NotifyOnFlushCompleted(cfd, mutable_cf_options,
flush_job.GetCommittedFlushJobsInfo());
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
Expand All @@ -233,6 +232,7 @@ Status DBImpl::FlushMemTableToOutputFile(
}
#endif // ROCKSDB_LITE
}
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
return s;
}

Expand Down Expand Up @@ -303,7 +303,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(

autovector<Directory*> distinct_output_dirs;
autovector<std::string> distinct_output_dir_paths;
std::vector<FlushJob> jobs;
std::vector<std::unique_ptr<FlushJob>> jobs;
std::vector<MutableCFOptions> all_mutable_cf_options;
int num_cfs = static_cast<int>(cfds.size());
all_mutable_cf_options.reserve(num_cfs);
Expand All @@ -330,16 +330,16 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
jobs.emplace_back(
jobs.emplace_back(new FlushJob(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */,
thread_pri);
jobs.back().PickMemTable();
thread_pri));
jobs.back()->PickMemTable();
}

std::vector<FileMetaData> file_meta(num_cfs);
Expand All @@ -351,7 +351,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, jobs[i].GetTableProperties());
job_context->job_id);
}
#endif /* !ROCKSDB_LITE */

Expand All @@ -373,7 +373,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// TODO (yanqin): parallelize jobs with threads.
for (int i = 1; i != num_cfs; ++i) {
exec_status[i].second =
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
exec_status[i].first = true;
}
if (num_cfs > 1) {
Expand All @@ -382,8 +382,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
}
assert(exec_status.size() > 0);
assert(!file_meta.empty());
exec_status[0].second =
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].first = true;

Status error_status;
Expand Down Expand Up @@ -424,7 +426,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
auto wait_to_install_func = [&]() {
bool ready = true;
for (size_t i = 0; i != cfds.size(); ++i) {
const auto& mems = jobs[i].GetMemTables();
const auto& mems = jobs[i]->GetMemTables();
if (cfds[i]->IsDropped()) {
// If the column family is dropped, then do not wait.
continue;
Expand Down Expand Up @@ -465,7 +467,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables();
const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
Expand Down Expand Up @@ -501,12 +503,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
#ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
for (int i = 0; i != num_cfs; ++i) {
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
job_context->job_id, jobs[i].GetTableProperties());
NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
jobs[i]->GetCommittedFlushJobsInfo());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
Expand All @@ -530,12 +533,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// unref the versions.
for (int i = 0; i != num_cfs; ++i) {
if (!exec_status[i].first) {
jobs[i].Cancel();
jobs[i]->Cancel();
}
}
for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].first && exec_status[i].second.ok()) {
auto& mems = jobs[i].GetMemTables();
auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
}
Expand All @@ -549,7 +552,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(

void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
int job_id) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) {
return;
Expand Down Expand Up @@ -580,7 +583,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info);
Expand All @@ -594,15 +596,14 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
(void)file_meta;
(void)mutable_cf_options;
(void)job_id;
(void)prop;
#endif // ROCKSDB_LITE
}

void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
#ifndef ROCKSDB_LITE
assert(flush_jobs_info != nullptr);
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
Expand All @@ -619,34 +620,22 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
// release lock while notifying events
mutex_.Unlock();
{
FlushJobInfo info;
info.cf_id = cfd->GetID();
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, info);
for (auto& info : *flush_jobs_info) {
info->triggered_writes_slowdown = triggered_writes_slowdown;
info->triggered_writes_stop = triggered_writes_stop;
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, *info);
}
}
flush_jobs_info->clear();
}
mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
#else
(void)cfd;
(void)file_meta;
(void)mutable_cf_options;
(void)job_id;
(void)prop;
(void)flush_jobs_info;
#endif // ROCKSDB_LITE
}

Expand Down
Loading

0 comments on commit 1f9d7c0

Please sign in to comment.