Skip to content

Commit

Permalink
Support range deletion tombstones in IngestExternalFile SSTs (faceboo…
Browse files Browse the repository at this point in the history
…k#3778)

Summary:
Fixes facebook#3391.

This change adds a `DeleteRange` method to `SstFileWriter` and adds
support for ingesting SSTs with range deletion tombstones. This is
important for applications that need to atomically ingest SSTs while
clearing out any existing keys in a given key range.
Pull Request resolved: facebook#3778

Differential Revision: D8821836

Pulled By: anand1976

fbshipit-source-id: ca7786c1947ff129afa703dab011d524c7883844
  • Loading branch information
nvanbenschoten authored and rcane committed Sep 13, 2018
1 parent 22797e1 commit 28d4017
Show file tree
Hide file tree
Showing 19 changed files with 431 additions and 129 deletions.
2 changes: 1 addition & 1 deletion USERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Learn more about those use cases in a Tech Talk by Ankit Gupta and Naveen Somasu
Yahoo is using RocksDB as a storage engine for their biggest distributed data store Sherpa. Learn more about it here: http://yahooeng.tumblr.com/post/120730204806/sherpa-scales-new-heights

## CockroachDB
CockroachDB is an open-source geo-replicated transactional database (still in development). They are using RocksDB as their storage engine. Check out their github: https://github.com/cockroachdb/cockroach
CockroachDB is an open-source geo-replicated transactional database. They are using RocksDB as their storage engine. Check out their github: https://github.com/cockroachdb/cockroach

## DNANexus
DNANexus is using RocksDB to speed up processing of genomics data.
Expand Down
16 changes: 5 additions & 11 deletions db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ class DBRangeDelTest : public DBTestBase {
}
};

const int kRangeDelSkipConfigs =
// Plain tables do not support range deletions.
DBRangeDelTest::kSkipPlainTable |
// MmapReads disables the iterator pinning that RangeDelAggregator requires.
DBRangeDelTest::kSkipMmapReads;

// PlainTableFactory and NumTableFilesAtLevel() are not supported in
// ROCKSDB_LITE
#ifndef ROCKSDB_LITE
Expand All @@ -39,17 +33,17 @@ TEST_F(DBRangeDelTest, NonBlockBasedTableNotSupported) {
for (auto config : {kPlainTableAllBytesPrefix, /* kWalDirAndMmapReads */}) {
option_config_ = config;
DestroyAndReopen(CurrentOptions());
ASSERT_TRUE(
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1", "dr1")
.IsNotSupported());
ASSERT_TRUE(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"dr1", "dr1")
.IsNotSupported());
}
}

TEST_F(DBRangeDelTest, FlushOutputHasOnlyRangeTombstones) {
do {
DestroyAndReopen(CurrentOptions());
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "dr1",
"dr2"));
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
"dr1", "dr2"));
ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0));
} while (ChangeOptions(kRangeDelSkipConfigs));
Expand Down
7 changes: 7 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,13 @@ class DBTestBase : public testing::Test {
kSkipMmapReads = 256,
};

const int kRangeDelSkipConfigs =
// Plain tables do not support range deletions.
kSkipPlainTable |
// MmapReads disables the iterator pinning that RangeDelAggregator
// requires.
kSkipMmapReads;

explicit DBTestBase(const std::string path);

~DBTestBase();
Expand Down
160 changes: 124 additions & 36 deletions db/external_sst_file_basic_test.cc

Large diffs are not rendered by default.

62 changes: 46 additions & 16 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Status ExternalSstFileIngestionJob::Prepare(
}

for (IngestedFileInfo& f : files_to_ingest_) {
if (f.num_entries == 0) {
if (f.num_entries == 0 && f.num_range_deletions == 0) {
return Status::InvalidArgument("File contain no entries");
}

Expand Down Expand Up @@ -358,6 +358,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
}
// Get number of entries in table
file_to_ingest->num_entries = props->num_entries;
file_to_ingest->num_range_deletions = props->num_range_deletions;

ParsedInternalKey key;
ReadOptions ro;
Expand All @@ -369,26 +370,55 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ro.fill_cache = false;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get()));
std::unique_ptr<InternalIterator> range_del_iter(
table_reader->NewRangeTombstoneIterator(ro));

// Get first (smallest) key from file
// Get first (smallest) and last (largest) key from file.
bool bounds_set = false;
iter->SeekToFirst();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->smallest_user_key = key.user_key.ToString();
if (iter->Valid()) {
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->smallest_user_key = key.user_key.ToString();

iter->SeekToLast();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");
}
file_to_ingest->largest_user_key = key.user_key.ToString();

// Get last (largest) key from file
iter->SeekToLast();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
bounds_set = true;
}
if (key.sequence != 0) {
return Status::Corruption("external file have non zero sequence number");

// We may need to adjust these key bounds, depending on whether any range
// deletion tombstones extend past them.
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
if (range_del_iter != nullptr) {
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
range_del_iter->Next()) {
if (!ParseInternalKey(range_del_iter->key(), &key)) {
return Status::Corruption("external file have corrupted keys");
}
RangeTombstone tombstone(key, range_del_iter->value());

if (!bounds_set || ucmp->Compare(tombstone.start_key_,
file_to_ingest->smallest_user_key) < 0) {
file_to_ingest->smallest_user_key = tombstone.start_key_.ToString();
}
if (!bounds_set || ucmp->Compare(tombstone.end_key_,
file_to_ingest->largest_user_key) > 0) {
file_to_ingest->largest_user_key = tombstone.end_key_.ToString();
}
bounds_set = true;
}
}
file_to_ingest->largest_user_key = key.user_key.ToString();

file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);

Expand Down
2 changes: 2 additions & 0 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ struct IngestedFileInfo {
uint64_t file_size;
// total number of keys in external file
uint64_t num_entries;
// total number of range deletions in external file
uint64_t num_range_deletions;
// Id of column family this file shoule be ingested into
uint32_t cf_id;
// TableProperties read from external file
Expand Down
120 changes: 117 additions & 3 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
ASSERT_EQ(file1_info.largest_key, Key(99));
ASSERT_EQ(file1_info.num_range_del_entries, 0);
ASSERT_EQ(file1_info.smallest_range_del_key, "");
ASSERT_EQ(file1_info.largest_range_del_key, "");
// sst_file_writer already finished, cannot add this value
s = sst_file_writer.Put(Key(100), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
Expand Down Expand Up @@ -290,6 +293,58 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(file5_info.smallest_key, Key(400));
ASSERT_EQ(file5_info.largest_key, Key(499));

// file6.sst (delete 400 => 500)
std::string file6 = sst_files_dir_ + "file6.sst";
ASSERT_OK(sst_file_writer.Open(file6));
sst_file_writer.DeleteRange(Key(400), Key(500));
ExternalSstFileInfo file6_info;
s = sst_file_writer.Finish(&file6_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file6_info.file_path, file6);
ASSERT_EQ(file6_info.num_entries, 0);
ASSERT_EQ(file6_info.smallest_key, "");
ASSERT_EQ(file6_info.largest_key, "");
ASSERT_EQ(file6_info.num_range_del_entries, 1);
ASSERT_EQ(file6_info.smallest_range_del_key, Key(400));
ASSERT_EQ(file6_info.largest_range_del_key, Key(500));

// file7.sst (delete 500 => 570, put 520 => 599 divisible by 2)
std::string file7 = sst_files_dir_ + "file7.sst";
ASSERT_OK(sst_file_writer.Open(file7));
sst_file_writer.DeleteRange(Key(500), Key(550));
for (int k = 520; k < 560; k += 2) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
sst_file_writer.DeleteRange(Key(525), Key(575));
for (int k = 560; k < 600; k += 2) {
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file7_info;
s = sst_file_writer.Finish(&file7_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file7_info.file_path, file7);
ASSERT_EQ(file7_info.num_entries, 40);
ASSERT_EQ(file7_info.smallest_key, Key(520));
ASSERT_EQ(file7_info.largest_key, Key(598));
ASSERT_EQ(file7_info.num_range_del_entries, 2);
ASSERT_EQ(file7_info.smallest_range_del_key, Key(500));
ASSERT_EQ(file7_info.largest_range_del_key, Key(575));

// file8.sst (delete 600 => 700)
std::string file8 = sst_files_dir_ + "file8.sst";
ASSERT_OK(sst_file_writer.Open(file8));
sst_file_writer.DeleteRange(Key(600), Key(700));
ExternalSstFileInfo file8_info;
s = sst_file_writer.Finish(&file8_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file8_info.file_path, file8);
ASSERT_EQ(file8_info.num_entries, 0);
ASSERT_EQ(file8_info.smallest_key, "");
ASSERT_EQ(file8_info.largest_key, "");
ASSERT_EQ(file8_info.num_range_del_entries, 1);
ASSERT_EQ(file8_info.smallest_range_del_key, Key(600));
ASSERT_EQ(file8_info.largest_range_del_key, Key(700));

// Cannot create an empty sst file
std::string file_empty = sst_files_dir_ + "file_empty.sst";
ExternalSstFileInfo file_empty_info;
Expand Down Expand Up @@ -336,6 +391,16 @@ TEST_F(ExternalSSTFileTest, Basic) {
// Key range of file5 (400 => 499) dont overlap with any keys in DB
ASSERT_OK(DeprecatedAddFile({file5}));

// This file has overlapping values with the existing data
s = DeprecatedAddFile({file6});
ASSERT_FALSE(s.ok()) << s.ToString();

// Key range of file7 (500 => 598) dont overlap with any keys in DB
ASSERT_OK(DeprecatedAddFile({file7}));

// Key range of file7 (600 => 700) dont overlap with any keys in DB
ASSERT_OK(DeprecatedAddFile({file8}));

// Make sure values are correct before and after flush/compaction
for (int i = 0; i < 2; i++) {
for (int k = 0; k < 200; k++) {
Expand All @@ -349,6 +414,13 @@ TEST_F(ExternalSSTFileTest, Basic) {
std::string value = Key(k) + "_val";
ASSERT_EQ(Get(Key(k)), value);
}
for (int k = 500; k < 600; k++) {
std::string value = Key(k) + "_val";
if (k < 520 || k % 2 == 1) {
value = "NOT_FOUND";
}
ASSERT_EQ(Get(Key(k)), value);
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
Expand Down Expand Up @@ -377,7 +449,8 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
kRangeDelSkipConfigs));
}
class SstFileWriterCollector : public TablePropertiesCollector {
public:
Expand Down Expand Up @@ -518,17 +591,57 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_EQ(file5_info.smallest_key, Key(200));
ASSERT_EQ(file5_info.largest_key, Key(299));

// file6.sst (delete 0 => 100)
std::string file6 = sst_files_dir_ + "file6.sst";
ASSERT_OK(sst_file_writer.Open(file6));
ASSERT_OK(sst_file_writer.DeleteRange(Key(0), Key(75)));
ASSERT_OK(sst_file_writer.DeleteRange(Key(25), Key(100)));
ExternalSstFileInfo file6_info;
s = sst_file_writer.Finish(&file6_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file6_info.file_path, file6);
ASSERT_EQ(file6_info.num_entries, 0);
ASSERT_EQ(file6_info.smallest_key, "");
ASSERT_EQ(file6_info.largest_key, "");
ASSERT_EQ(file6_info.num_range_del_entries, 2);
ASSERT_EQ(file6_info.smallest_range_del_key, Key(0));
ASSERT_EQ(file6_info.largest_range_del_key, Key(100));

// file7.sst (delete 100 => 200)
std::string file7 = sst_files_dir_ + "file7.sst";
ASSERT_OK(sst_file_writer.Open(file7));
ASSERT_OK(sst_file_writer.DeleteRange(Key(100), Key(200)));
ExternalSstFileInfo file7_info;
s = sst_file_writer.Finish(&file7_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file7_info.file_path, file7);
ASSERT_EQ(file7_info.num_entries, 0);
ASSERT_EQ(file7_info.smallest_key, "");
ASSERT_EQ(file7_info.largest_key, "");
ASSERT_EQ(file7_info.num_range_del_entries, 1);
ASSERT_EQ(file7_info.smallest_range_del_key, Key(100));
ASSERT_EQ(file7_info.largest_range_del_key, Key(200));

// list 1 has internal key range conflict
std::vector<std::string> file_list0({file1, file2});
std::vector<std::string> file_list1({file3, file2, file1});
std::vector<std::string> file_list2({file5});
std::vector<std::string> file_list3({file3, file4});
std::vector<std::string> file_list4({file5, file7});
std::vector<std::string> file_list5({file6, file7});

DestroyAndReopen(options);

// This list of files have key ranges are overlapping with each other
// These lists of files have key ranges that overlap with each other
s = DeprecatedAddFile(file_list1);
ASSERT_FALSE(s.ok()) << s.ToString();
// Both of the following overlap on the end key of a range deletion
// tombstone. This is a limitation because these tombstones have exclusive
// end keys that should not count as overlapping with other keys.
s = DeprecatedAddFile(file_list4);
ASSERT_FALSE(s.ok()) << s.ToString();
s = DeprecatedAddFile(file_list5);
ASSERT_FALSE(s.ok()) << s.ToString();

// Add files using file path list
s = DeprecatedAddFile(file_list0);
Expand Down Expand Up @@ -619,7 +732,8 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction |
kRangeDelSkipConfigs));
}

TEST_F(ExternalSSTFileTest, AddListAtomicity) {
Expand Down
27 changes: 21 additions & 6 deletions db/range_del_aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,6 @@ Status RangeDelAggregator::AddTombstones(
input->SeekToFirst();
bool first_iter = true;
while (input->Valid()) {
// The tombstone map holds slices into the iterator's memory. This assert
// ensures pinning the iterator also pins the keys/values.
assert(input->IsKeyPinned() && input->IsValuePinned());

if (first_iter) {
if (rep_ == nullptr) {
InitRep({upper_bound_});
Expand All @@ -448,10 +444,29 @@ Status RangeDelAggregator::AddTombstones(
first_iter = false;
}
ParsedInternalKey parsed_key;
if (!ParseInternalKey(input->key(), &parsed_key)) {
bool parsed;
if (input->IsKeyPinned()) {
parsed = ParseInternalKey(input->key(), &parsed_key);
} else {
// The tombstone map holds slices into the iterator's memory. Make a
// copy of the key if it is not pinned.
rep_->pinned_slices_.emplace_back(input->key().data(),
input->key().size());
parsed = ParseInternalKey(rep_->pinned_slices_.back(), &parsed_key);
}
if (!parsed) {
return Status::Corruption("Unable to parse range tombstone InternalKey");
}
RangeTombstone tombstone(parsed_key, input->value());
RangeTombstone tombstone;
if (input->IsValuePinned()) {
tombstone = RangeTombstone(parsed_key, input->value());
} else {
// The tombstone map holds slices into the iterator's memory. Make a
// copy of the value if it is not pinned.
rep_->pinned_slices_.emplace_back(input->value().data(),
input->value().size());
tombstone = RangeTombstone(parsed_key, rep_->pinned_slices_.back());
}
// Truncate the tombstone to the range [smallest, largest].
if (smallest != nullptr) {
if (icmp_.user_comparator()->Compare(
Expand Down
Loading

0 comments on commit 28d4017

Please sign in to comment.