Skip to content

Commit

Permalink
enable cf uses separete write buffer manager (#343)
Browse files Browse the repository at this point in the history
Signed-off-by: Spade A <u6748471@anu.edu.au>
  • Loading branch information
SpadeA-Tang authored Aug 29, 2023
1 parent fe76937 commit 6121b2d
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 11 deletions.
5 changes: 4 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1672,8 +1672,11 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
auto* write_buffer_manager = options.cf_write_buffer_manager != nullptr
? options.cf_write_buffer_manager.get()
: write_buffer_manager_;
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
id, name, dummy_versions, table_cache_, write_buffer_manager, options,
*db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
db_session_id_);
column_families_.insert({name, id});
Expand Down
20 changes: 19 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,11 @@ Status DBImpl::CloseHelper() {
delete txn_entry.second;
}

// We can only access cf_based_write_buffer_manager_ before versions_.reset(),
// after which all cf write buffer managers will be freed.
for (auto m : cf_based_write_buffer_manager_) {
m->UnregisterDB(this);
}
// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
versions_.reset();
Expand Down Expand Up @@ -2832,7 +2837,20 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
if (s.ok()) {
NewThreadStatusCfInfo(
static_cast_with_check<ColumnFamilyHandleImpl>(*handle)->cfd());
if (write_buffer_manager_ != nullptr) {
if (cf_options.cf_write_buffer_manager != nullptr) {
auto* write_buffer_manager = cf_options.cf_write_buffer_manager.get();
bool exist = false;
for (auto m : cf_based_write_buffer_manager_) {
if (m == write_buffer_manager) {
exist = true;
}
}
if (!exist) {
return Status::NotSupported(
"New cf write buffer manager is not supported after Open");
}
write_buffer_manager->RegisterColumnFamily(this, *handle);
} else if (write_buffer_manager_ != nullptr) {
write_buffer_manager_->RegisterColumnFamily(this, *handle);
}
}
Expand Down
4 changes: 4 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2288,6 +2288,10 @@ class DBImpl : public DB {
Directories directories_;

WriteBufferManager* write_buffer_manager_;
// For simplicity, CF based write buffer manager does not support stall the
// write.
// Note: It's only modifed in Open, so mutex is not needed.
autovector<WriteBufferManager*> cf_based_write_buffer_manager_;

WriteThread write_thread_;
WriteBatch tmp_batch_;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
cfh->GetName().c_str());
Status s;
TEST_SYNC_POINT_CALLBACK("DBImpl::Flush:ScheduleFlushReq", column_family);
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
FlushReason::kManualFlush);
Expand Down
46 changes: 38 additions & 8 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,22 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}

DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
for (auto cf : column_families) {
if (cf.options.cf_write_buffer_manager != nullptr) {
auto* write_buffer_manager = cf.options.cf_write_buffer_manager.get();
bool already_exist = false;
for (auto m : impl->cf_based_write_buffer_manager_) {
if (m == write_buffer_manager) {
already_exist = true;
break;
}
}
if (!already_exist) {
impl->cf_based_write_buffer_manager_.push_back(write_buffer_manager);
}
}
}

s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) {
std::vector<std::string> paths;
Expand Down Expand Up @@ -1896,20 +1912,34 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
impl->StartPeriodicWorkScheduler();

// Newly created handles are already registered during
// `CreateColumnFamily`. We must clear them all to avoid duplicate
// registration.
if (impl->write_buffer_manager_) {
// Newly created handles are already registered during
// `CreateColumnFamily`. We must clear them all to avoid duplicate
// registration.
impl->write_buffer_manager_->UnregisterDB(impl);
for (auto* cf : *handles) {
}
for (auto m : impl->cf_based_write_buffer_manager_) {
m->UnregisterDB(impl);
}

for (size_t i = 0; i < (*handles).size(); ++i) {
auto cf_opt = column_families[i].options;

auto* cf = (*handles)[i];
std::string cf_name = cf->GetName();
auto* write_buffer_manager = cf_opt.cf_write_buffer_manager != nullptr
? cf_opt.cf_write_buffer_manager.get()
: impl->write_buffer_manager_;
if (write_buffer_manager) {
if (cf->GetName() == kDefaultColumnFamilyName) {
impl->write_buffer_manager_->RegisterColumnFamily(
impl, impl->default_cf_handle_);
write_buffer_manager->RegisterColumnFamily(impl,
impl->default_cf_handle_);
} else if (cf->GetName() == kPersistentStatsColumnFamilyName) {
impl->write_buffer_manager_->RegisterColumnFamily(
write_buffer_manager->RegisterColumnFamily(
impl, impl->persist_stats_cf_handle_);
} else {
impl->write_buffer_manager_->RegisterColumnFamily(impl, cf);
write_buffer_manager->RegisterColumnFamily(impl, cf);
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
write_buffer_manager_->MaybeFlush(this);
}
for (auto write_buffer_manager : cf_based_write_buffer_manager_) {
if (UNLIKELY(status.ok() && write_buffer_manager->ShouldFlush())) {
write_buffer_manager->MaybeFlush(this);
}
}

if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) {
InstrumentedMutexLock l(&mutex_);
Expand Down Expand Up @@ -2278,6 +2283,9 @@ size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
if (immutable_db_options_.write_buffer_manager) {
size_t buffer_size =
immutable_db_options_.write_buffer_manager->flush_size();
for (auto manager : cf_based_write_buffer_manager_) {
buffer_size += manager->flush_size();
}
if (buffer_size > 0) {
bsize = std::min<size_t>(bsize, buffer_size);
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_merge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DBMergeTest : public testing::Test {
std::to_string(cf_id), ColumnFamilyOptions(options_)));
}
}
return std::move(column_families);
return column_families;
}

std::string GenDBPath(uint32_t db_id) {
Expand Down
16 changes: 16 additions & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,22 @@ void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}

void DBTestBase::OpenWithCFWriteBufferManager(
const std::vector<std::string>& cfs,
const std::vector<std::shared_ptr<WriteBufferManager>> wbms,
const Options& options) {
CreateColumnFamilies(cfs, options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
std::vector<Options> cf_options;
for (size_t i = 0; i < wbms.size(); ++i) {
auto o = options;
o.cf_write_buffer_manager = wbms[i];
cf_options.push_back(o);
}
ReopenWithColumnFamilies(cfs_plus_default, cf_options);
}

void DBTestBase::SetTimeElapseOnlySleepOnReopen(DBOptions* options) {
time_elapse_only_sleep_on_reopen_ = true;

Expand Down
5 changes: 5 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,11 @@ class DBTestBase : public testing::Test {
Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options& options);

void OpenWithCFWriteBufferManager(
const std::vector<std::string>& cfs,
const std::vector<std::shared_ptr<WriteBufferManager>> wbms,
const Options& options);

void Reopen(const Options& options);

void Close();
Expand Down
Loading

0 comments on commit 6121b2d

Please sign in to comment.