Skip to content

Commit

Permalink
Merge branch 'master' into columnfamilies
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcanadi committed Jan 29, 2014
2 parents 4bf2535 + 5d2c628 commit c1071ed
Show file tree
Hide file tree
Showing 8 changed files with 574 additions and 134 deletions.
27 changes: 22 additions & 5 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3540,8 +3540,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
// Pardon the long line but I think it is easier to read this way.
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n"
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < current->NumberLevels(); level++) {
Expand All @@ -3561,9 +3561,21 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
total_bytes_read += bytes_read;
total_bytes_written += stats_[level].bytes_written;

uint64_t stalls = level == 0 ?
(stall_level0_slowdown_count_ +
stall_level0_num_files_count_ +
stall_memtable_compaction_count_) :
stall_leveln_slowdown_count_[level];

double stall_us = level == 0 ?
(stall_level0_slowdown_ +
stall_level0_num_files_ +
stall_memtable_compaction_) :
stall_leveln_slowdown_[level];

snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n",
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n",
level,
files,
current->NumLevelBytes(level) / 1048576.0,
Expand All @@ -3585,8 +3597,13 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
stats_[level].files_out_levelnp1,
stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1,
stats_[level].count,
stall_leveln_slowdown_[level] / 1000000.0,
(unsigned long) stall_leveln_slowdown_count_[level]);
(int) ((double) stats_[level].micros /
1000.0 /
(stats_[level].count + 1)),
(double) stall_us / 1000.0 / (stalls + 1),
stall_us / 1000000.0,
(unsigned long) stalls);

total_slowdown += stall_leveln_slowdown_[level];
total_slowdown_count += stall_leveln_slowdown_count_[level];
value->append(buf);
Expand Down
70 changes: 68 additions & 2 deletions db/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Reader::Reader(unique_ptr<SequentialFile>&& file, Reporter* reporter,
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
read_error_(false),
eof_offset_(0),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
Expand Down Expand Up @@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}

void Reader::UnmarkEOF() {
if (read_error_) {
return;
}

eof_ = false;

if (eof_offset_ == 0) {
return;
}

// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
// of a block.
//
// consumed_bytes + buffer_size() + remaining == kBlockSize

size_t consumed_bytes = eof_offset_ - buffer_.size();
size_t remaining = kBlockSize - eof_offset_;

// backing_store_ is used to concatenate what is left in buffer_ and
// the remainder of the block. If buffer_ already uses backing_store_,
// we just append the new data.
if (buffer_.data() != backing_store_ + consumed_bytes) {
// Buffer_ does not use backing_store_ for storage.
// Copy what is left in buffer_ to backing_store.
memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
}

Slice read_buffer;
Status status = file_->Read(remaining, &read_buffer,
backing_store_ + eof_offset_);

size_t added = read_buffer.size();
end_of_buffer_offset_ += added;

if (!status.ok()) {
if (added > 0) {
ReportDrop(added, status);
}

read_error_ = true;
return;
}

if (read_buffer.data() != backing_store_ + eof_offset_) {
// Read did not write to backing_store_
memmove(backing_store_ + eof_offset_, read_buffer.data(),
read_buffer.size());
}

buffer_ = Slice(backing_store_ + consumed_bytes,
eof_offset_ + added - consumed_bytes);

if (added < remaining) {
eof_ = true;
eof_offset_ += added;
} else {
eof_offset_ = 0;
}
}

void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
Expand All @@ -184,18 +249,19 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
read_error_ = true;
return kEof;
} else if (buffer_.size() < (size_t)kBlockSize) {
eof_ = true;
eof_offset_ = buffer_.size();
}
continue;
} else if (buffer_.size() == 0) {
Expand Down
12 changes: 9 additions & 3 deletions db/log_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ class Reader {

// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
void UnmarkEOF() {
eof_ = false;
}
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
void UnmarkEOF();

SequentialFile* file() { return file_.get(); }

Expand All @@ -82,6 +83,11 @@ class Reader {
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file

// Offset of the file position indicator within the last block when an
// EOF was detected.
size_t eof_offset_;

// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
Expand Down
Loading

0 comments on commit c1071ed

Please sign in to comment.