Skip to content

Commit

Permalink
[CF] Handle failure in WriteBatch::Handler
Browse files Browse the repository at this point in the history
Summary:
* Add ColumnFamilyHandle::GetID() function. Client needs to know column family's ID to be able to construct WriteBatch
* Handle WriteBatch::Handler failure gracefully. Since WriteBatch is not a very smart function (it takes raw CF id), client can add data to WriteBatch for column family that doesn't exist. In that case, we need to gracefully return failure status from DB::Write(). To do that, I added a return Status to WriteBatch functions PutCF, DeleteCF and MergeCF.

Test Plan: Added test to column_family_test

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16323
  • Loading branch information
igorcanadi committed Feb 26, 2014
1 parent 944ff67 commit 8b7ab99
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 48 deletions.
1 change: 0 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
executed in high priority thread pool.

## Unreleased (will be relased in 2.8)
## Unreleased

### Public API changes

Expand Down
2 changes: 2 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
}
}

uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>
Expand Down
2 changes: 2 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }

virtual uint32_t GetID() const override;

private:
ColumnFamilyData* cfd_;
DBImpl* db_;
Expand Down
11 changes: 11 additions & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ TEST(ColumnFamilyTest, DropTest) {
}
}

TEST(ColumnFamilyTest, WriteBatchFailure) {
Open();
WriteBatch batch;
batch.Put(1, Slice("non-existing"), Slice("column-family"));
Status s = db_->Write(WriteOptions(), &batch);
ASSERT_TRUE(s.IsInvalidArgument());
CreateColumnFamilies({"one"});
ASSERT_OK(db_->Write(WriteOptions(), &batch));
Close();
}

TEST(ColumnFamilyTest, ReadWrite) {
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"one", "two"});
Expand Down
15 changes: 7 additions & 8 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3304,11 +3304,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer);

if (!status.ok()) {
// Panic for in-memory corruptions
// Iteration failed (either in-memory writebatch corruption (very
// bad), or the client specified invalid column family). Return
// failure.
// Note that existing logic was not sound. Any partial failure writing
// into the memtable would result in a state that some write ops might
// have succeeded in memtable but Status reports error for all writes.
throw std::runtime_error("In memory WriteBatch corruption!");
return status;
}
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
last_sequence);
Expand Down Expand Up @@ -3822,24 +3824,21 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
// and we allocate 11 extra bytes for key length, as well as value length.
WriteBatch batch(key.size() + value.size() + 24);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Put(cfh->cfd()->GetID(), key, value);
batch.Put(column_family->GetID(), key, value);
return Write(opt, &batch);
}

Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatch batch;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Delete(cfh->cfd()->GetID(), key);
batch.Delete(column_family->GetID(), key);
return Write(opt, &batch);
}

Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
WriteBatch batch;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
batch.Merge(cfh->cfd()->GetID(), key, value);
batch.Merge(column_family->GetID(), key, value);
return Write(opt, &batch);
}

Expand Down
9 changes: 6 additions & 3 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4804,19 +4804,22 @@ TEST(DBTest, TransactionLogIteratorBlobs) {
auto res = OpenTransactionLogIter(0)->GetBatch();
struct Handler : public WriteBatch::Handler {
std::string seen;
virtual void PutCF(uint32_t cf, const Slice& key, const Slice& value) {
virtual Status PutCF(uint32_t cf, const Slice& key, const Slice& value) {
seen += "Put(" + std::to_string(cf) + ", " + key.ToString() + ", " +
std::to_string(value.size()) + ")";
return Status::OK();
}
virtual void MergeCF(uint32_t cf, const Slice& key, const Slice& value) {
virtual Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) {
seen += "Merge(" + std::to_string(cf) + ", " + key.ToString() + ", " +
std::to_string(value.size()) + ")";
return Status::OK();
}
virtual void LogData(const Slice& blob) {
seen += "LogData(" + blob.ToString() + ")";
}
virtual void DeleteCF(uint32_t cf, const Slice& key) {
virtual Status DeleteCF(uint32_t cf, const Slice& key) {
seen += "Delete(" + std::to_string(cf) + ", " + key.ToString() + ")";
return Status::OK();
}
} handler;
res.writeBatchPtr->Iterate(&handler);
Expand Down
61 changes: 39 additions & 22 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ Status WriteBatch::Iterate(Handler* handler) const {
input.remove_prefix(kHeader);
Slice key, value, blob;
int found = 0;
while (!input.empty() && handler->Continue()) {
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
char tag = input[0];
input.remove_prefix(1);
uint32_t column_family = 0; // default
Expand All @@ -98,11 +99,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
// intentional fallthrough
// intentional fallthrough
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->PutCF(column_family, key, value);
s = handler->PutCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Put");
Expand All @@ -112,10 +113,10 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
// intentional fallthrough
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->DeleteCF(column_family, key);
s = handler->DeleteCF(column_family, key);
found++;
} else {
return Status::Corruption("bad WriteBatch Delete");
Expand All @@ -125,11 +126,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
if (!GetVarint32(&input, &column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
// intentional fallthrough
// intentional fallthrough
case kTypeMerge:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->MergeCF(column_family, key, value);
s = handler->MergeCF(column_family, key, value);
found++;
} else {
return Status::Corruption("bad WriteBatch Merge");
Expand All @@ -146,7 +147,10 @@ Status WriteBatch::Iterate(Handler* handler) const {
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
if (!s.ok()) {
return s;
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
Expand Down Expand Up @@ -251,13 +255,15 @@ class MemTableInserter : public WriteBatch::Handler {
return log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber();
}

virtual void PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
bool found = cf_mems_->Seek(column_family_id);
// TODO(icanadi) if found = false somehow return the error to caller
// Will need to change public API to do this
if (!found || IgnoreUpdate()) {
return;
if (!found) {
return Status::InvalidArgument(
"Invalid column family specified in write batch");
}
if (IgnoreUpdate()) {
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
Expand Down Expand Up @@ -304,13 +310,18 @@ class MemTableInserter : public WriteBatch::Handler {
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++;
return Status::OK();
}

virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
bool found = cf_mems_->Seek(column_family_id);
if (!found || IgnoreUpdate()) {
return;
if (!found) {
return Status::InvalidArgument(
"Invalid column family specified in write batch");
}
if (IgnoreUpdate()) {
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
Expand Down Expand Up @@ -372,12 +383,17 @@ class MemTableInserter : public WriteBatch::Handler {
}

sequence_++;
return Status::OK();
}

virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
bool found = cf_mems_->Seek(column_family_id);
if (!found || IgnoreUpdate()) {
return;
if (!found) {
return Status::InvalidArgument(
"Invalid column family specified in write batch");
}
if (IgnoreUpdate()) {
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
Expand All @@ -393,11 +409,12 @@ class MemTableInserter : public WriteBatch::Handler {
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(options->statistics.get(), NUMBER_FILTERED_DELETES);
return;
return Status::OK();
}
}
mem->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
return Status::OK();
}
};
} // namespace
Expand Down
21 changes: 12 additions & 9 deletions db/write_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,34 +145,37 @@ TEST(WriteBatchTest, Append) {
namespace {
struct TestHandler : public WriteBatch::Handler {
std::string seen;
virtual void PutCF(uint32_t column_family_id, const Slice& key,
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
seen += "Put(" + key.ToString() + ", " + value.ToString() + ")";
} else {
seen += "PutCF(" + std::to_string(column_family_id) + ", " +
key.ToString() + ", " + value.ToString() + ")";
}
return Status::OK();
}
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
seen += "Merge(" + key.ToString() + ", " + value.ToString() + ")";
} else {
seen += "MergeCF(" + std::to_string(column_family_id) + ", " +
key.ToString() + ", " + value.ToString() + ")";
}
return Status::OK();
}
virtual void LogData(const Slice& blob) {
seen += "LogData(" + blob.ToString() + ")";
}
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
seen += "Delete(" + key.ToString() + ")";
} else {
seen += "DeleteCF(" + std::to_string(column_family_id) + ", " +
key.ToString() + ")";
}
return Status::OK();
}
};
}
Expand Down Expand Up @@ -212,23 +215,23 @@ TEST(WriteBatchTest, Continue) {

struct Handler : public TestHandler {
int num_seen = 0;
virtual void PutCF(uint32_t column_family_id, const Slice& key,
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
++num_seen;
TestHandler::PutCF(column_family_id, key, value);
return TestHandler::PutCF(column_family_id, key, value);
}
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
++num_seen;
TestHandler::MergeCF(column_family_id, key, value);
return TestHandler::MergeCF(column_family_id, key, value);
}
virtual void LogData(const Slice& blob) {
++num_seen;
TestHandler::LogData(blob);
}
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
++num_seen;
TestHandler::DeleteCF(column_family_id, key);
return TestHandler::DeleteCF(column_family_id, key);
}
virtual bool Continue() override {
return num_seen < 3;
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ using std::unique_ptr;
class ColumnFamilyHandle {
public:
virtual ~ColumnFamilyHandle() {}

virtual uint32_t GetID() const = 0;
};
extern const std::string default_column_family_name;

Expand Down
22 changes: 17 additions & 5 deletions include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,41 @@ class WriteBatch {
// default implementation will just call Put without column family for
// backwards compatibility. If the column family is not default,
// the function is noop
virtual void PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
// Put() historically doesn't return status. We didn't want to be
// backwards incompatible so we didn't change the return status
// (this is a public API). We do an ordinary get and return Status::OK()
Put(key, value);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and PutCF not implemented");
}
virtual void Put(const Slice& key, const Slice& value);
// Merge and LogData are not pure virtual. Otherwise, we would break
// existing clients of Handler on a source code level. The default
// implementation of Merge simply throws a runtime exception.
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
Merge(key, value);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and MergeCF not implemented");
}
virtual void Merge(const Slice& key, const Slice& value);
// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
Delete(key);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and DeleteCF not implemented");
}
virtual void Delete(const Slice& key);
// Continue is called by WriteBatch::Iterate. If it returns false,
Expand Down

0 comments on commit 8b7ab99

Please sign in to comment.