Skip to content

Commit

Permalink
fix : potential bug scheduling unnecessary thread
Browse files Browse the repository at this point in the history
this fix is due to facebook#6104.
unscheduled_flushes_ was not handled well in the current version (v6.1.2.)
This leads flush_scheduled_ flags to be always full

author : Junhan
  • Loading branch information
junhanLee95 committed Jun 29, 2023
1 parent bade249 commit fa826d9
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
fta->thread_pri_ = Env::Priority::HIGH;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
&DBImpl::UnscheduleFlushCallback);
--unscheduled_flushes_;
}

// special case -- if high-pri (flush) thread pool is empty, then schedule
Expand All @@ -1981,6 +1982,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
fta->thread_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
&DBImpl::UnscheduleFlushCallback);
unscheduled_flushes_;
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
Expand Down Expand Up @@ -2164,16 +2166,32 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(

void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
FlushReason flush_reason) {
mutex_.AssertHeld();
if (flush_req.empty()) {
return;
}
for (auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
cfd->SetFlushReason(flush_reason);
if (!immutable_db_options_.atomic_flush) {
// For the non-atomic flush case, we never schedule multiple column
// families in the same flush request.
assert(flush_req.size() == 1);
ColumnFamilyData* cfd = flush_req[0].first;
assert(cfd);
if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
cfd->Ref();
cfd->set_queued_for_flush(true);
cfd->SetFlushReason(flush_reason);
++unscheduled_flushes_
flush_queue_.push_back(flush_req);
}
} else {
for (auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
cfd->Ref();
cfd->SetFlushReason(flush_reason);
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
}
unscheduled_flushes_ += static_cast<int>(flush_req.size());
flush_queue_.push_back(flush_req);
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
Expand Down

0 comments on commit fa826d9

Please sign in to comment.