Skip to content

Commit

Permalink
Rebase to latest master after resolving conflicts
Browse files Browse the repository at this point in the history
Summary:
The main conflict is with the commit 7daae51, which refactors flush
request processing.

Test Plan:

Reviewers:

Subscribers:

Tasks:

Tags:
  • Loading branch information
Anand Ananthabhotla committed Aug 29, 2018
1 parent feea658 commit a59b8b2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 29 deletions.
59 changes: 33 additions & 26 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -967,15 +967,16 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
return s;
}


Status DBImpl::FlushAllCFs(FlushReason flush_reason) {
Status ret;
Status s;
WriteContext context;
WriteThread::Writer w;

mutex_.AssertHeld();
write_thread_.EnterUnbatched(&w, &mutex_);

std::vector<std::pair<ColumnFamilyData*, uint64_t>> cfd_id_pairs;
FlushRequest flush_req;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
cached_recoverable_state_empty_.load()) {
Expand All @@ -984,44 +985,48 @@ Status DBImpl::FlushAllCFs(FlushReason flush_reason) {
}

// SwitchMemtable() will release and reacquire mutex during execution
Status s;
s = SwitchMemtable(cfd, &context);
if (!s.ok()) {
ret = s;
break;
}

cfd->imm()->FlushRequested();

// schedule flush
SchedulePendingFlush(cfd, flush_reason);
flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
}

// schedule flush
if (s.ok() && !flush_req.empty()) {
SchedulePendingFlush(flush_req, flush_reason);
MaybeScheduleFlushOrCompaction();
cfd_id_pairs.emplace_back(
std::make_pair(cfd, cfd->imm()->GetLatestMemTableID()));
}

write_thread_.ExitUnbatched(&w);

for (auto& pair : cfd_id_pairs) {
auto cfd = pair.first;
auto flush_memtable_id = pair.second;
while (cfd->imm()->NumNotFlushed() > 0 &&
cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) {
if (!error_handler_.GetRecoveryError().ok()) {
break;
}
if (cfd->IsDropped()) {
// FlushJob cannot flush a dropped CF, if we did not break here
// we will loop forever since cfd->imm()->NumNotFlushed() will never
// drop to zero
continue;
if (s.ok()) {
for (auto& flush : flush_req) {
auto cfd = flush.first;
auto flush_memtable_id = flush.second;
while (cfd->imm()->NumNotFlushed() > 0 &&
cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) {
if (!error_handler_.GetRecoveryError().ok()) {
break;
}
if (cfd->IsDropped()) {
// FlushJob cannot flush a dropped CF, if we did not break here
// we will loop forever since cfd->imm()->NumNotFlushed() will never
// drop to zero
continue;
}
cfd->Ref();
bg_cv_.Wait();
cfd->Unref();
}
cfd->Ref();
bg_cv_.Wait();
cfd->Unref();
}
}
return ret;

flush_req.clear();
return s;
}

Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
Expand Down Expand Up @@ -1533,13 +1538,15 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
}
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
// All the CFDs in the FlushReq must have the same flush reason, so just
// grab the first one
*reason = bg_flush_args[0].cfd_->GetFlushReason();
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->Unref()) {
delete cfd;
arg.cfd_ = nullptr;
}
*reason = cfd->GetFlushReason();
}
}
return status;
Expand Down
15 changes: 12 additions & 3 deletions util/sst_file_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger,
max_trash_db_ratio, bytes_max_delete_chunk),
cv_(&mu_),
closing_(false),
bg_thread_(nullptr),
reserved_disk_buffer_(0),
free_space_trigger_(0),
cur_instance_(nullptr) {
bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
}

SstFileManagerImpl::~SstFileManagerImpl() {
Expand Down Expand Up @@ -300,7 +300,11 @@ void SstFileManagerImpl::ClearError() {
// seconds
int64_t wait_until = env_->NowMicros() + 5000000;
cv_.TimedWait(wait_until);
} else {
}

// Check again for error_handler_list_ empty, as a DB instance shutdown
// could have removed it from the queue while we were in timed wait
if (error_handler_list_.empty()) {
ROCKS_LOG_INFO(logger_, "Clearing error\n");
bg_err_ = Status::OK();
return;
Expand Down Expand Up @@ -330,11 +334,16 @@ void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
// and recover from this condition
if (error_handler_list_.empty()) {
error_handler_list_.push_back(handler);
// Release lock before calling join. Its ok to do so because
// error_handler_list_ is now non-empty, so no other invocation of this
// function will execute this piece of code
mu_.Unlock();
if (bg_thread_) {
bg_thread_->join();
}
// Start a new thread. The previous one may have exited.
// Start a new thread. The previous one would have exited.
bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
mu_.Lock();
} else {
// Check if this DB instance is already in the list
for (auto iter = error_handler_list_.begin();
Expand Down

0 comments on commit a59b8b2

Please sign in to comment.