Skip to content

Commit

Permalink
Merge pull request facebook#25 from petermattis/pmattis/ingest-extern…
Browse files Browse the repository at this point in the history
…a-file-deadlock

Allow file-ingest-triggered flush to skip waiting for write-stall clear
  • Loading branch information
petermattis authored Feb 12, 2019
2 parents dc30625 + 7bb890c commit c905cbc
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
77 changes: 77 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4005,6 +4005,83 @@ TEST_F(DBCompactionTest, PartialManualCompaction) {
dbfull()->CompactRange(cro, nullptr, nullptr);
}

// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
// file ingestion do not cause deadlock in the event of write stall triggered
// by number of L0 files reaching level0_stop_writes_trigger.
TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
const int kNumKeysPerFile = 100;
// Generate SST files.
Options options = CurrentOptions();

// Generate an external SST file containing a single key, i.e. 99
std::string sst_files_dir = dbname_ + "/sst_files/";
test::DestroyDir(env_, sst_files_dir);
ASSERT_OK(env_->CreateDir(sst_files_dir));
SstFileWriter sst_writer(EnvOptions(), options);
const std::string sst_file_path = sst_files_dir + "test.sst";
ASSERT_OK(sst_writer.Open(sst_file_path));
ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
ASSERT_OK(sst_writer.Finish());

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
"BackgroundCallCompaction:0"},
});
SyncPoint::GetInstance()->EnableProcessing();

options.write_buffer_size = 110 << 10; // 110KB
options.level0_file_num_compaction_trigger =
options.level0_stop_writes_trigger;
options.max_subcompactions = max_subcompactions_;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
Random rnd(301);

// Generate level0_stop_writes_trigger L0 files to trigger write stop
for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
for (int j = 0; j != kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
}
if (0 == i) {
// When we reach here, the memtables have kNumKeysPerFile keys. Note that
// flush is not yet triggered. We need to write an extra key so that the
// write path will call PreprocessWrite and flush the previous key-value
// pairs to e flushed. After that, there will be the newest key in the
// memtable, and a bunch of L0 files. Since there is already one key in
// the memtable, then for i = 1, 2, ..., we do not have to write this
// extra key to trigger flush.
ASSERT_OK(Put("", ""));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
}
// When we reach this point, there will be level0_stop_writes_trigger L0
// files and one extra key (99) in memory, which overlaps with the external
// SST file. Write stall triggers, and can be cleared only after compaction
// reduces the number of L0 files.

// Compaction will also be triggered since we have reached the threshold for
// auto compaction. Note that compaction may begin after the following file
// ingestion thread and waits for ingestion to finish.

// Thread to ingest file with overlapping key range with the current
// memtable. Consequently ingestion will trigger a flush. The flush MUST
// proceed without waiting for the write stall condition to clear, otherwise
// deadlock can happen.
port::Thread ingestion_thr([&]() {
IngestExternalFileOptions ifo;
Status s = db_->IngestExternalFile({sst_file_path}, ifo);
ASSERT_OK(s);
});

// More write to trigger write stop
ingestion_thr.join();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Close();
}

#endif // !defined(ROCKSDB_LITE)
} // namespace rocksdb

Expand Down
5 changes: 4 additions & 1 deletion db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3059,6 +3059,7 @@ Status DBImpl::IngestExternalFile(
}

num_running_ingest_file_++;
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");

// We cannot ingest a file into a dropped CF
if (cfd->IsDropped()) {
Expand All @@ -3074,7 +3075,9 @@ Status DBImpl::IngestExternalFile(
&need_flush);
if (status.ok() && need_flush) {
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(),
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
mutex_.Lock();
Expand Down

0 comments on commit c905cbc

Please sign in to comment.