Skip to content

Commit

Permalink
refactor some Pegasus modifications (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 authored Apr 19, 2019
1 parent e17dfe6 commit cd7551c
Show file tree
Hide file tree
Showing 14 changed files with 32 additions and 50 deletions.
4 changes: 2 additions & 2 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1080,12 +1080,12 @@ void ColumnFamilySet::SetPegasusDataVersion(uint32_t version) {
pegasus_data_version_ = version;
}

uint64_t ColumnFamilySet::GetLastManualCompactFinishTime() {
uint64_t ColumnFamilySet::GetLastManualCompactFinishTime() const {
return last_manual_compact_finish_time_;
}

void ColumnFamilySet::SetLastManualCompactFinishTime(uint64_t ms) {
last_manual_compact_finish_time_ = ms;
last_manual_compact_finish_time_ = ms;
}

// under a DB mutex AND write thread
Expand Down
2 changes: 1 addition & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ class ColumnFamilySet {
size_t NumberOfColumnFamilies() const;
uint32_t GetPegasusDataVersion() const;
void SetPegasusDataVersion(uint32_t version);
uint64_t GetLastManualCompactFinishTime();
uint64_t GetLastManualCompactFinishTime() const;
void SetLastManualCompactFinishTime(uint64_t ms);
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
Expand Down
2 changes: 1 addition & 1 deletion db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
Status DBImpl::GetLiveFilesQuick(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
SequenceNumber* last_sequence,
uint64_t* last_decree) {
uint64_t* last_decree) const {
// ATTENTION(laiyingchun): only used for Pegasus.
assert(pegasus_data_);
*manifest_file_size = 0;
Expand Down
12 changes: 6 additions & 6 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence();
}

uint64_t DBImpl::GetLastFlushedDecree() {
uint64_t DBImpl::GetLastFlushedDecree() const {
SequenceNumber seq;
uint64_t d;

Expand All @@ -771,11 +771,11 @@ uint32_t DBImpl::GetPegasusDataVersion() const {
return version;
}

uint64_t DBImpl::GetLastManualCompactFinishTime() {
mutex_.Lock();
uint64_t ms = versions_->GetColumnFamilySet()->GetLastManualCompactFinishTime();
mutex_.Unlock();
return ms;
uint64_t DBImpl::GetLastManualCompactFinishTime() const {
mutex_.Lock();
uint64_t ms = versions_->GetColumnFamilySet()->GetLastManualCompactFinishTime();
mutex_.Unlock();
return ms;
}

SequenceNumber DBImpl::IncAndFetchSequenceNumber() {
Expand Down
6 changes: 3 additions & 3 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ class DBImpl : public DB {
bool HasActiveSnapshotInRange(SequenceNumber lower_bound,
SequenceNumber upper_bound);

virtual uint64_t GetLastFlushedDecree() override;
virtual uint64_t GetLastFlushedDecree() const override;

virtual uint32_t GetPegasusDataVersion() const override;

virtual uint64_t GetLastManualCompactFinishTime() override;
virtual uint64_t GetLastManualCompactFinishTime() const override;

#ifndef ROCKSDB_LITE
using DB::ResetStats;
Expand All @@ -250,7 +250,7 @@ class DBImpl : public DB {
virtual Status GetLiveFilesQuick(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
SequenceNumber* last_sequence,
uint64_t* last_decree) override;
uint64_t* last_decree) const override;
virtual Status GetSortedWalFiles(VectorLogPtr& files) override;

virtual Status GetUpdatesSince(
Expand Down
6 changes: 2 additions & 4 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;

const bool concurrent_update = concurrent_prepare_;

// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
Expand Down Expand Up @@ -278,9 +277,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = write_options.given_sequence_number == 0 ?
(last_sequence + 1) : write_options.given_sequence_number;
last_sequence = current_sequence + seq_inc - 1;
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;

if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
Expand Down
2 changes: 1 addition & 1 deletion db/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ class MemTable {
return oldest_key_time_.load(std::memory_order_relaxed);
}

void GetLastSeqDecree(SequenceNumber* sequence, uint64_t* decree) {
void GetLastSeqDecree(SequenceNumber* sequence, uint64_t* decree) const {
*sequence = last_sequence_;
*decree = last_decree_;
}
Expand Down
6 changes: 1 addition & 5 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,7 @@ class VersionEdit {
return last_sequence_;
}

bool HasLastFlushSeqDecree() const {
return has_last_flush_seq_decree_;
}

void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) {
void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) const {
*sequence = last_flush_sequence_;
*decree = last_flush_decree_;
}
Expand Down
8 changes: 4 additions & 4 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2478,7 +2478,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
SequenceNumber seq;
uint64_t d;
current->GetLastFlushSeqDecree(&seq, &d);
v->UpdateLastFlushSeqDecree(seq, d);
v->UpdateLastFlushSeqDecreeIfNeeded(seq, d);
}
current->Unref();
}
Expand Down Expand Up @@ -2725,7 +2725,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
SequenceNumber seq;
uint64_t d;
e->GetLastFlushSeqDecree(&seq, &d);
v->UpdateLastFlushSeqDecree(seq, d);
v->UpdateLastFlushSeqDecreeIfNeeded(seq, d);
}
}
if (max_log_number_in_batch != 0) {
Expand Down Expand Up @@ -3153,7 +3153,7 @@ Status VersionSet::Recover(
if (db_options_->pegasus_data) {
// update last flush sequence/decree
auto &p = last_flush_seq_decree_map[cfd->GetID()];
v->UpdateLastFlushSeqDecree(p.first, p.second);
v->UpdateLastFlushSeqDecreeIfNeeded(p.first, p.second);
}

// Install recovered version
Expand Down Expand Up @@ -3547,7 +3547,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
builder->SaveTo(v->storage_info());

auto& p = last_flush_seq_decree_map[cfd->GetID()];
v->UpdateLastFlushSeqDecree(p.first, p.second);
v->UpdateLastFlushSeqDecreeIfNeeded(p.first, p.second);

v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);

Expand Down
6 changes: 3 additions & 3 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,12 +617,12 @@ class Version {

void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta);

void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) {
void GetLastFlushSeqDecree(SequenceNumber* sequence, uint64_t* decree) const {
*sequence = last_flush_sequence_;
*decree = last_flush_decree_;
}

void UpdateLastFlushSeqDecree(SequenceNumber sequence, uint64_t decree) {
void UpdateLastFlushSeqDecreeIfNeeded(SequenceNumber sequence, uint64_t decree) {
if (sequence > last_flush_sequence_) {
assert(decree >= last_flush_decree_);
last_flush_sequence_ = sequence;
Expand Down Expand Up @@ -795,7 +795,7 @@ class VersionSet {
}

// Return the last flush sequence number of default column family.
uint64_t LastFlushSequence() {
uint64_t LastFlushSequence() const {
assert(db_options_->pegasus_data);
assert(column_family_set_->NumberOfColumnFamilies() == 1u);
SequenceNumber seq;
Expand Down
6 changes: 3 additions & 3 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -884,12 +884,12 @@ class DB {
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0;

// The last flushed decree.
virtual uint64_t GetLastFlushedDecree() { return 0; }
virtual uint64_t GetLastFlushedDecree() const { return 0; }

// The Pegasus data version.
virtual uint32_t GetPegasusDataVersion() const { return 0; }

virtual uint64_t GetLastManualCompactFinishTime() { return 0; }
virtual uint64_t GetLastManualCompactFinishTime() const { return 0; }

#ifndef ROCKSDB_LITE

Expand Down Expand Up @@ -933,7 +933,7 @@ class DB {
virtual Status GetLiveFilesQuick(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
SequenceNumber* last_sequence,
uint64_t* last_decree) { return Status::NotSupported(); }
uint64_t* last_decree) const { return Status::NotSupported(); }

// Retrieve the sorted list of all wal files with earliest file first
virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;
Expand Down
9 changes: 0 additions & 9 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1157,14 +1157,6 @@ struct WriteOptions {
// Default: false
bool low_pri;

// Sequence number is usually controlled by the db itself as 1,2,3, ...
// however, in cases where the upper frameworks (e.g., replication), the sequence
// number is given and the underlying db should use this given sequence number directly
// instead of generating one by itself.
//
// Default: 0 (rocksdb should generate the number by itself in this case)
SequenceNumber given_sequence_number;

// Decree is an value affiliated to the write.
uint64_t given_decree;

Expand All @@ -1174,7 +1166,6 @@ struct WriteOptions {
ignore_missing_column_families(false),
no_slowdown(false),
low_pri(false),
given_sequence_number(0),
given_decree(0) {}
};

Expand Down
3 changes: 0 additions & 3 deletions include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,6 @@ class WriteBatch : public WriteBatchBase {
protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_

bool use_shared_sequence_number_; // when seq is given and shared by multiple write ops,
// see more comments for WriteOptions

// Intentionally copyable
};

Expand Down
10 changes: 5 additions & 5 deletions utilities/checkpoint/checkpoint_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}

struct LogReporter : public log::Reader::Reporter {
Status* status;
Status* status = nullptr;

virtual void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
void Corruption(size_t bytes, const Status& s) override {
if (this->status && this->status->ok()) *this->status = s;
}
};

Expand Down Expand Up @@ -484,8 +484,8 @@ Status CheckpointImpl::CreateCheckpointQuick(const std::string& checkpoint_dir,
db_->EnableFileDeletions(false);

if (s.ok()) {
// modify menifest file to set correct last_seq in VersionEdit, because
// the last_seq recorded in menifest may be greater than the real value
// modify manifest file to set correct last_seq in VersionEdit, because
// the last_seq recorded in manifest may be greater than the real value
assert(!manifest_file_path.empty());
s = ModifyManifestFileLastSeq(db_->GetEnv(), db_->GetOptions(),
manifest_file_path, last_sequence);
Expand Down

0 comments on commit cd7551c

Please sign in to comment.