Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
wankai committed Sep 3, 2014
2 parents dff2b1a + 9b58c73 commit 5d25a46
Show file tree
Hide file tree
Showing 47 changed files with 796 additions and 483 deletions.
6 changes: 6 additions & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,15 @@ TEST(ColumnFamilyTest, WriteBatchFailure) {
Open();
CreateColumnFamiliesAndReopen({"one", "two"});
WriteBatch batch;
batch.Put(handles_[0], Slice("existing"), Slice("column-family"));
batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
DropColumnFamilies({1});
WriteOptions woptions_ignore_missing_cf;
woptions_ignore_missing_cf.ignore_missing_column_families = true;
batch.Put(handles_[0], Slice("still here"), Slice("column-family"));
ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch));
ASSERT_EQ("column-family", Get(0, "still here"));
Status s = db_->Write(WriteOptions(), &batch);
ASSERT_TRUE(s.IsInvalidArgument());
Close();
Expand Down
142 changes: 93 additions & 49 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
return result;
}

namespace {

Status SanitizeDBOptionsByCFOptions(
DBOptions* db_opts,
const DBOptions* db_opts,
const std::vector<ColumnFamilyDescriptor>& column_families) {
Status s;
for (auto cf : column_families) {
Expand All @@ -303,7 +305,6 @@ Status SanitizeDBOptionsByCFOptions(
return Status::OK();
}

namespace {
CompressionType GetCompressionFlush(const Options& options) {
// Compressing memtable flushes might not help unless the sequential load
// optimization is used for leveled compaction. Otherwise the CPU and
Expand Down Expand Up @@ -631,7 +632,7 @@ bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first,
} else if (first.file_name < second.file_name) {
return false;
} else {
return (first.path_id > first.path_id);
return (first.path_id > second.path_id);
}
}
}; // namespace
Expand Down Expand Up @@ -1301,14 +1302,20 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
WriteBatch batch;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);

// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case -- we
// just ignore the update. That's why we set ignore missing column families
// to true
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), true, log_number);
&batch, column_family_memtables_.get(),
true /* ignore missing column families */, log_number);

MaybeIgnoreError(&status);
if (!status.ok()) {
Expand Down Expand Up @@ -1677,6 +1684,13 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
}
LogFlush(options_.info_log);

{
MutexLock l(&mutex_);
// an automatic compaction that has been scheduled might have been
// preempted by the manual compactions. Need to schedule it back.
MaybeScheduleFlushOrCompaction();
}

return s;
}

Expand Down Expand Up @@ -1864,18 +1878,15 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
bg_cv_.Wait();
} else {
manual_compaction_ = &manual;
MaybeScheduleFlushOrCompaction();
assert(bg_compaction_scheduled_ == 0);
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
}
}

assert(!manual.in_progress);
assert(bg_manual_only_ > 0);
--bg_manual_only_;
if (bg_manual_only_ == 0) {
// an automatic compaction should have been scheduled might have be
// preempted by the manual compactions. Need to schedule it back.
MaybeScheduleFlushOrCompaction();
}
return manual.status;
}

Expand Down Expand Up @@ -1963,11 +1974,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {

// Schedule BGWorkCompaction if there's a compaction pending (or a memtable
// flush, but the HIGH pool is not enabled)
// Do it only if max_background_compactions hasn't been reached and, in case
// bg_manual_only_ > 0, if it's a manual compaction.
if ((manual_compaction_ || is_compaction_needed ||
(is_flush_pending && options_.max_background_flushes == 0)) &&
(!bg_manual_only_ || manual_compaction_)) {
// Do it only if max_background_compactions hasn't been reached and
// bg_manual_only_ == 0
if (!bg_manual_only_ &&
(is_compaction_needed ||
(is_flush_pending && options_.max_background_flushes == 0))) {
if (bg_compaction_scheduled_ < options_.max_background_compactions) {
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
Expand All @@ -1979,7 +1990,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
}

void DBImpl::RecordFlushIOStats() {
RecordTick(stats_, FLUSH_WRITE_BYTES, iostats_context.bytes_written);
RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}

Expand Down Expand Up @@ -2194,6 +2205,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (is_manual) {
// another thread cannot pick up the same work
manual_compaction_->in_progress = true;
} else if (manual_compaction_ != nullptr) {
// there should be no automatic compactions running when manual compaction
// is running
return Status::OK();
}

// FLUSH preempts compaction
Expand Down Expand Up @@ -2313,7 +2328,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,

if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
} else if (status.IsShutdownInProgress()) {
// Ignore compaction errors found during shutting down
} else {
Log(InfoLogLevel::WARN_LEVEL, options_.info_log, "Compaction error: %s",
Expand Down Expand Up @@ -2573,6 +2588,10 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
if (options_.max_background_flushes > 0) {
// flush thread will take care of this
return 0;
}
if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
Expand Down Expand Up @@ -2626,9 +2645,29 @@ Status DBImpl::ProcessKeyValueCompaction(
compaction_filter = compaction_filter_from_factory.get();
}

int64_t key_drop_user = 0;
int64_t key_drop_newer_entry = 0;
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;
while (input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) {
RecordCompactionIOStats();
if (++loop_cnt > 1000) {
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
key_drop_user = 0;
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
key_drop_newer_entry);
key_drop_newer_entry = 0;
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
key_drop_obsolete = 0;
}
RecordCompactionIOStats();
loop_cnt = 0;
}
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
Expand Down Expand Up @@ -2709,7 +2748,7 @@ Status DBImpl::ProcessKeyValueCompaction(
ParseInternalKey(key, &ikey);
// no value associated with delete
value.clear();
RecordTick(stats_, COMPACTION_KEY_DROP_USER);
++key_drop_user;
} else if (value_changed) {
value = compaction_filter_value;
}
Expand All @@ -2733,7 +2772,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// TODO: why not > ?
assert(last_sequence_for_key >= ikey.sequence);
drop = true; // (A)
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY);
++key_drop_newer_entry;
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot &&
compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) {
Expand All @@ -2745,7 +2784,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE);
++key_drop_obsolete;
} else if (ikey.type == kTypeMerge) {
if (!merge.HasOperator()) {
LogToBuffer(log_buffer, "Options::merge_operator is null.");
Expand Down Expand Up @@ -2892,7 +2931,15 @@ Status DBImpl::ProcessKeyValueCompaction(
input->Next();
}
}

if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
}
if (key_drop_newer_entry > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
}
if (key_drop_obsolete > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
}
RecordCompactionIOStats();

return status;
Expand Down Expand Up @@ -3367,7 +3414,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_AUTO(get_snapshot_time);
PERF_TIMER_GUARD(get_snapshot_time);

auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
Expand All @@ -3391,27 +3438,27 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
PERF_TIMER_STOP(get_snapshot_time);

if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) {
// Done
RecordTick(stats_, MEMTABLE_HIT);
} else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) {
// Done
RecordTick(stats_, MEMTABLE_HIT);
} else {
PERF_TIMER_START(get_from_output_files_time);

PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(options, lkey, value, &s, &merge_context, value_found);
PERF_TIMER_STOP(get_from_output_files_time);
RecordTick(stats_, MEMTABLE_MISS);
}

PERF_TIMER_START(get_post_process_time);
{
PERF_TIMER_GUARD(get_post_process_time);

ReturnAndCleanupSuperVersion(cfd, sv);
ReturnAndCleanupSuperVersion(cfd, sv);

RecordTick(stats_, NUMBER_KEYS_READ);
RecordTick(stats_, BYTES_READ, value->size());
PERF_TIMER_STOP(get_post_process_time);
RecordTick(stats_, NUMBER_KEYS_READ);
RecordTick(stats_, BYTES_READ, value->size());
}
return s;
}

Expand All @@ -3421,7 +3468,7 @@ std::vector<Status> DBImpl::MultiGet(
const std::vector<Slice>& keys, std::vector<std::string>* values) {

StopWatch sw(env_, stats_, DB_MULTIGET);
PERF_TIMER_AUTO(get_snapshot_time);
PERF_TIMER_GUARD(get_snapshot_time);

SequenceNumber snapshot;

Expand Down Expand Up @@ -3497,7 +3544,7 @@ std::vector<Status> DBImpl::MultiGet(
}

// Post processing (decrement reference counts and record statistics)
PERF_TIMER_START(get_post_process_time);
PERF_TIMER_GUARD(get_post_process_time);
autovector<SuperVersion*> superversions_to_delete;

// TODO(icanadi) do we need lock here or just around Cleanup()?
Expand Down Expand Up @@ -3870,7 +3917,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!");
}
PERF_TIMER_AUTO(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_pre_and_post_process_time);
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
Expand Down Expand Up @@ -4003,7 +4050,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {

uint64_t log_size = 0;
if (!options.disableWAL) {
PERF_TIMER_START(write_wal_time);
PERF_TIMER_GUARD(write_wal_time);
Slice log_entry = WriteBatchInternal::Contents(updates);
status = log_->AddRecord(log_entry);
total_log_size_ += log_entry.size();
Expand All @@ -4021,13 +4068,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
status = log_->file()->Sync();
}
}
PERF_TIMER_STOP(write_wal_time);
}
if (status.ok()) {
PERF_TIMER_START(write_memtable_time);
PERF_TIMER_GUARD(write_memtable_time);

status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(), false, 0, this, false);
updates, column_family_memtables_.get(),
options.ignore_missing_column_families, 0, this, false);
// A non-OK status here indicates iteration failure (either in-memory
// writebatch corruption (very bad), or the client specified invalid
// column family). This will later on trigger bg_error_.
Expand All @@ -4036,8 +4083,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// into the memtable would result in a state that some write ops might
// have succeeded in memtable but Status reports error for all writes.

PERF_TIMER_STOP(write_memtable_time);

SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
}
PERF_TIMER_START(write_pre_and_post_process_time);
Expand Down Expand Up @@ -4071,7 +4116,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
RecordTick(stats_, WRITE_TIMEDOUT);
}

PERF_TIMER_STOP(write_pre_and_post_process_time);
return status;
}

Expand Down Expand Up @@ -4759,11 +4803,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = SanitizeDBOptionsByCFOptions(&db_options, column_families);
if (!s.ok()) {
return s;
}
s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
Expand All @@ -4776,6 +4816,10 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
Status s = SanitizeDBOptionsByCFOptions(&db_options, column_families);
if (!s.ok()) {
return s;
}
if (db_options.db_paths.size() > 1) {
for (auto& cfd : column_families) {
if (cfd.options.compaction_style != kCompactionStyleUniversal) {
Expand All @@ -4801,7 +4845,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}

DBImpl* impl = new DBImpl(db_options, dbname);
Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
if (s.ok()) {
for (auto db_path : impl->options_.db_paths) {
s = impl->env_->CreateDirIfMissing(db_path.path);
Expand Down
Loading

0 comments on commit 5d25a46

Please sign in to comment.