Skip to content

Commit

Permalink
LogAndApply to take ColumnFamilyData
Browse files Browse the repository at this point in the history
Summary: This removes the default implementation of LogAndApply that applied the changed to the default column family by default. It is mostly simple reformatting.

Test Plan: make check

Reviewers: dhruba, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15465
  • Loading branch information
igorcanadi committed Jan 27, 2014
1 parent eb05560 commit 511b03a
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 31 deletions.
26 changes: 15 additions & 11 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit.SetLogNumber(log_number + 1);
status = versions_->LogAndApply(&edit, &mutex_);
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
}

return status;
Expand Down Expand Up @@ -1204,8 +1204,9 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,

// Replace immutable memtable with the generated Table
s = default_cfd_->imm.InstallMemtableFlushResults(
mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
default_cfd_, mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get());

if (s.ok()) {
InstallSuperVersion(default_cfd_, deletion_state);
Expand Down Expand Up @@ -1333,7 +1334,8 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());

status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_,
db_directory_.get());
superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion);
new_superversion = nullptr;

Expand Down Expand Up @@ -1906,7 +1908,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
status = versions_->LogAndApply(default_cfd_, c->edit(), &mutex_,
db_directory_.get());
InstallSuperVersion(default_cfd_, deletion_state);

Version::LevelSummaryStorage tmp;
Expand Down Expand Up @@ -2155,8 +2158,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->output_level(), out.number, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_,
db_directory_.get());
return versions_->LogAndApply(default_cfd_, compact->compaction->edit(),
&mutex_, db_directory_.get());
}

//
Expand Down Expand Up @@ -2949,7 +2952,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
edit.AddColumnFamily(column_family_name);
handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
edit.SetColumnFamily(handle->id);
Status s = versions_->LogAndApply(&edit, &mutex_);
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
if (s.ok()) {
// add to internal data structures
versions_->CreateColumnFamily(options, &edit);
Expand All @@ -2968,7 +2971,7 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
VersionEdit edit;
edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id);
Status s = versions_->LogAndApply(&edit, &mutex_);
Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
if (s.ok()) {
// remove from internal data structures
versions_->DropColumnFamily(&edit);
Expand Down Expand Up @@ -3830,7 +3833,8 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_,
db_directory_.get());
if (status.ok()) {
InstallSuperVersion(default_cfd_, deletion_state);
}
Expand Down Expand Up @@ -3977,7 +3981,7 @@ Status DB::OpenWithColumnFamilies(
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
s = impl->versions_->LogAndApply(&edit, &impl->mutex_,
s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_,
impl->db_directory_.get());
}
if (s.ok()) {
Expand Down
5 changes: 3 additions & 2 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5046,14 +5046,15 @@ void BM_LogAndApply(int iters, int num_base_files) {
std::vector<ColumnFamilyDescriptor> dummy;
dummy.push_back(ColumnFamilyDescriptor());
ASSERT_OK(vset.Recover(dummy));
auto default_cfd = vset.GetColumnFamilySet()->GetDefault();
VersionEdit vbase;
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1);
}
ASSERT_OK(vset.LogAndApply(&vbase, &mu));
ASSERT_OK(vset.LogAndApply(default_cfd, &vbase, &mu));

uint64_t start_micros = env->NowMicros();

Expand All @@ -5063,7 +5064,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1);
vset.LogAndApply(&vedit, &mu);
vset.LogAndApply(default_cfd, &vedit, &mu);
}
uint64_t stop_micros = env->NowMicros();
unsigned int us = stop_micros - start_micros;
Expand Down
6 changes: 3 additions & 3 deletions db/memtablelist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ void MemTableList::PickMemtablesToFlush(std::vector<MemTable*>* ret) {

// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
const std::vector<MemTable*>& mems, VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
ColumnFamilyData* cfd, const std::vector<MemTable*>& mems, VersionSet* vset,
Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, std::vector<MemTable*>* to_delete,
Directory* db_directory) {
mu->AssertHeld();
Expand Down Expand Up @@ -177,7 +177,7 @@ Status MemTableList::InstallMemtableFlushResults(
(unsigned long)m->file_number_);

// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu, db_directory);
s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory);

// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
Expand Down
14 changes: 7 additions & 7 deletions db/memtablelist.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <string>
#include <list>
#include <vector>
#include <set>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
Expand All @@ -17,6 +18,7 @@

namespace rocksdb {

class ColumnFamilyData;
class InternalKeyComparator;
class Mutex;

Expand Down Expand Up @@ -90,13 +92,11 @@ class MemTableList {
void PickMemtablesToFlush(std::vector<MemTable*>* mems);

// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(const std::vector<MemTable*>& m,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete,
Directory* db_directory);
Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const std::vector<MemTable*>& m, VersionSet* vset,
Status flushStatus, port::Mutex* mu, Logger* info_log,
uint64_t file_number, std::set<uint64_t>& pending_outputs,
std::vector<MemTable*>* to_delete, Directory* db_directory);

// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().
Expand Down
3 changes: 2 additions & 1 deletion db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true);
return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve,
&dummy_mutex, nullptr, true);
}

Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Expand Down
7 changes: 0 additions & 7 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,6 @@ class VersionSet {
port::Mutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false);

Status LogAndApply(VersionEdit* edit, port::Mutex* mu,
Directory* db_directory = nullptr,
bool new_descriptor_log = false) {
return LogAndApply(column_family_set_->GetDefault(), edit, mu, db_directory,
new_descriptor_log);
}

// Recover the last saved descriptor from persistent storage.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families);

Expand Down

0 comments on commit 511b03a

Please sign in to comment.