diff --git a/db/version_set.cc b/db/version_set.cc index 15c356d1a0..5dd79e23a5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1346,9 +1346,87 @@ Compaction* VersionSet::PickCompaction() { return c; } +// find the largest key in a vector of files. returns true if files it not empty +bool FindLargestKey(const InternalKeyComparator & icmp, const std::vector & files, InternalKey *largestKey) { + if (files.empty()) { + return false; + } + *largestKey = files[0]->largest; + for (size_t i = 1; i < files.size(); ++i) { + FileMetaData* f = files[i]; + if (icmp.Compare(f->largest, *largestKey) > 0) { + *largestKey = f->largest; + } + } + return true; +} + +// find minimum file b2=(l2, u2) in level file for which l2 > u1 and user_key(l2) = user_key(u1) +FileMetaData* FindSmallestBoundaryFile(const InternalKeyComparator & icmp, + const std::vector & levelFiles, + const InternalKey & largestKey) { + const Comparator* user_cmp = icmp.user_comparator(); + FileMetaData* smallestBoundaryFile = NULL; + for (size_t i = 0; i < levelFiles.size(); ++i) { + FileMetaData* f = levelFiles[i]; + if (icmp.Compare(f->smallest, largestKey) > 0 && + user_cmp->Compare(f->smallest.user_key(), largestKey.user_key()) == 0) { + if (smallestBoundaryFile == NULL || + icmp.Compare(f->smallest, smallestBoundaryFile->smallest) < 0) { + smallestBoundaryFile = f; + } + } + } + return smallestBoundaryFile; +} + +// If there are two blocks, b1=(l1, u1) and b2=(l2, u2) and +// user_key(u1) = user_key(l2), and if we compact b1 but not +// b2 then a subsequent get operation will yield an incorrect +// result because it will return the record from b2 in level +// i rather than from b1 because it searches level by level +// for records matching the supplied user key. +// +// This function extracts the largest file b1 from compactionFiles +// and then searches for a b2 in levelFiles for which user_key(u1) = +// user_key(l2). If it finds such a file b2 (known as a boundary file) +// it adds it to compactionFiles and then searches again using this +// new upper bound. +// +// parameters: +// in levelFiles: list of files to search for boundary files +// in/out compactionFiles: list of files to extend by adding boundary files +void AddBoundaryInputs(const InternalKeyComparator& icmp, + const std::vector& levelFiles, + std::vector* compactionFiles) { + InternalKey largestKey; + + // find largestKey in compactionFiles, quick return if compactionFiles is + // empty + if (!FindLargestKey(icmp, *compactionFiles, &largestKey)) { + return; + } + + bool continueSearching = true; + while (continueSearching) { + FileMetaData* smallestBoundaryFile = + FindSmallestBoundaryFile(icmp, levelFiles, largestKey); + + // if a boundary file was found advance largestKey, otherwise we're done + if (smallestBoundaryFile != NULL) { + compactionFiles->push_back(smallestBoundaryFile); + largestKey = smallestBoundaryFile->largest; + } else { + continueSearching = false; + } + } +} + void VersionSet::SetupOtherInputs(Compaction* c) { const int level = c->level(); InternalKey smallest, largest; + + AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]); GetRange(c->inputs_[0], &smallest, &largest); current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]); @@ -1362,6 +1440,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) { if (!c->inputs_[1].empty()) { std::vector expanded0; current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0); + AddBoundaryInputs(icmp_, current_->files_[level], &expanded0); const int64_t inputs0_size = TotalFileSize(c->inputs_[0]); const int64_t inputs1_size = TotalFileSize(c->inputs_[1]); const int64_t expanded0_size = TotalFileSize(expanded0); diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 501e34d133..7708f15aaa 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -172,7 +172,153 @@ TEST(FindFileTest, OverlappingFiles) { ASSERT_TRUE(Overlaps("600", "700")); } -} // namespace leveldb +void AddBoundaryInputs(const InternalKeyComparator &icmp, + const std::vector &levelFiles, + std::vector *compactionFiles); + +class AddBoundaryInputsTest { + public: + std::vector levelFiles_; + std::vector compactionFiles_; + std::vector allFiles_; + InternalKeyComparator icmp_; + + AddBoundaryInputsTest() : icmp_(BytewiseComparator()){}; + + ~AddBoundaryInputsTest() { + for (size_t i = 0; i < allFiles_.size(); ++i) { + delete allFiles_[i]; + } + allFiles_.clear(); + }; + + FileMetaData *CreateFileMetaData(uint64_t number, InternalKey smallest, + InternalKey largest) { + FileMetaData *f = new FileMetaData(); + f->number = number; + f->smallest = smallest; + f->largest = largest; + allFiles_.push_back(f); + return f; + } +}; + +TEST(AddBoundaryInputsTest, TestEmptyFileSets) { + AddBoundaryInputs(icmp_, levelFiles_, &compactionFiles_); + ASSERT_TRUE(compactionFiles_.empty()); + ASSERT_TRUE(levelFiles_.empty()); +} + +TEST(AddBoundaryInputsTest, TestEmptyLevelFiles) { + FileMetaData *f1 = + CreateFileMetaData(1, InternalKey("100", 2, kTypeValue), + InternalKey(InternalKey("100", 1, kTypeValue))); + compactionFiles_.push_back(f1); + + AddBoundaryInputs(icmp_, levelFiles_, &compactionFiles_); + ASSERT_EQ(1, compactionFiles_.size()); + ASSERT_EQ(f1, compactionFiles_[0]); + ASSERT_TRUE(levelFiles_.empty()); +} + +TEST(AddBoundaryInputsTest, TestEmptyCompactionFiles) { + FileMetaData *f1 = + CreateFileMetaData(1, InternalKey("100", 2, kTypeValue), + InternalKey(InternalKey("100", 1, kTypeValue))); + levelFiles_.push_back(f1); + + AddBoundaryInputs(icmp_, levelFiles_, &compactionFiles_); + ASSERT_TRUE(compactionFiles_.empty()); + ASSERT_EQ(1, levelFiles_.size()); + ASSERT_EQ(f1, levelFiles_[0]); +} + +TEST(AddBoundaryInputsTest, TestNoBoundaryFiles) { + FileMetaData *f1 = + CreateFileMetaData(1, InternalKey("100", 2, kTypeValue), + InternalKey(InternalKey("100", 1, kTypeValue))); + FileMetaData *f2 = + CreateFileMetaData(1, InternalKey("200", 2, kTypeValue), + InternalKey(InternalKey("200", 1, kTypeValue))); + FileMetaData *f3 = + CreateFileMetaData(1, InternalKey("300", 2, kTypeValue), + InternalKey(InternalKey("300", 1, kTypeValue))); + + levelFiles_.push_back(f3); + levelFiles_.push_back(f2); + levelFiles_.push_back(f1); + compactionFiles_.push_back(f2); + compactionFiles_.push_back(f3); + + AddBoundaryInputs(icmp_, levelFiles_, &compactionFiles_); + ASSERT_EQ(2, compactionFiles_.size()); +} + +TEST(AddBoundaryInputsTest, TestOneBoundaryFiles) { + FileMetaData *f1 = + CreateFileMetaData(1, InternalKey("100", 3, kTypeValue), + InternalKey(InternalKey("100", 2, kTypeValue))); + FileMetaData *f2 = + CreateFileMetaData(1, InternalKey("100", 1, kTypeValue), + InternalKey(InternalKey("200", 3, kTypeValue))); + FileMetaData *f3 = + CreateFileMetaData(1, InternalKey("300", 2, kTypeValue), + InternalKey(InternalKey("300", 1, kTypeValue))); + + levelFiles_.push_back(f3); + levelFiles_.push_back(f2); + levelFiles_.push_back(f1); + compactionFiles_.push_back(f1); + + AddBoundaryInputs(icmp_, levelFiles_, &compactionFiles_); + ASSERT_EQ(2, compactionFiles_.size()); + ASSERT_EQ(f1, compactionFiles_[0]); + ASSERT_EQ(f2, compactionFiles_[1]); +} + +TEST(AddBoundaryInputsTest, TestTwoBoundaryFiles) { + FileMetaData *f1 = + CreateFileMetaData(1, InternalKey("100", 6, kTypeValue), + InternalKey(InternalKey("100", 5, kTypeValue))); + FileMetaData *f2 = + CreateFileMetaData(1, InternalKey("100", 2, kTypeValue), + InternalKey(InternalKey("300", 1, kTypeValue))); + FileMetaData *f3 = + CreateFileMetaData(1, InternalKey("100", 4, kTypeValue), + InternalKey(InternalKey("100", 3, kTypeValue))); + + levelFiles_.push_back(f2); + levelFiles_.push_back(f3); + levelFiles_.push_back(f1); + compactionFiles_.push_back(f1); + + AddBoundaryInputs(icmp_, levelFiles_, &compactionFiles_); + ASSERT_EQ(3, compactionFiles_.size()); + ASSERT_EQ(f1, compactionFiles_[0]); + ASSERT_EQ(f3, compactionFiles_[1]); + ASSERT_EQ(f2, compactionFiles_[2]); +} + +TEST(AddBoundaryInputsTest, TestDisjoinFilePointers) { + FileMetaData *f1 = CreateFileMetaData(1, InternalKey("100", 6, kTypeValue), InternalKey(InternalKey("100", 5, kTypeValue))); + FileMetaData *f2 = CreateFileMetaData(1, InternalKey("100", 6, kTypeValue), InternalKey(InternalKey("100", 5, kTypeValue))); + FileMetaData *f3 = CreateFileMetaData(1, InternalKey("100", 2, kTypeValue), InternalKey(InternalKey("300", 1, kTypeValue))); + FileMetaData *f4 = CreateFileMetaData(1, InternalKey("100", 4, kTypeValue), InternalKey(InternalKey("100", 3, kTypeValue))); + + levelFiles_.push_back(f2); + levelFiles_.push_back(f3); + levelFiles_.push_back(f4); + + compactionFiles_.push_back(f1); + + AddBoundaryInputs(icmp_, levelFiles_, &compactionFiles_); + ASSERT_EQ(3, compactionFiles_.size()); + ASSERT_EQ(f1, compactionFiles_[0]); + ASSERT_EQ(f4, compactionFiles_[1]); + ASSERT_EQ(f3, compactionFiles_[2]); +} + +} // namespace leveldb int main(int argc, char** argv) { return leveldb::test::RunAllTests(); diff --git a/issues/issue320_test.cc b/issues/issue320_test.cc new file mode 100644 index 0000000000..619ddb553d --- /dev/null +++ b/issues/issue320_test.cc @@ -0,0 +1,139 @@ +// Copyright (c) 2019 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +using namespace std; + +namespace leveldb { + +namespace { + +unsigned int random(unsigned int max) { + return std::rand() % max; +} + +string newString(int32_t index) { + const unsigned int len = 1024; + char bytes[len]; + unsigned int i = 0; + while (i < 8) { + bytes[i] = 'a' + ((index >> (4 * i)) & 0xf); + ++i; + } + while (i < sizeof(bytes)) { + bytes[i] = 'a' + random(26); + ++i; + } + return string(bytes, sizeof(bytes)); +} + +} // namespace + +class Issue320 { }; + +TEST(Issue320, Test) { + std::srand(0); + + bool delete_before_put = false; + bool keep_snapshots = true; + + vector*> test_map(10000, nullptr); + vector snapshots(100, nullptr); + + DB* db; + Options options; + options.create_if_missing = true; + + std::string dbpath = test::TmpDir() + "/leveldb_issue320_test"; + ASSERT_OK(DB::Open(options, dbpath, &db)); + + unsigned int target_size = 10000; + unsigned int num_items = 0; + unsigned long count = 0; + string key; + string value, old_value; + + WriteOptions writeOptions; + ReadOptions readOptions; + while (count < 200000) { + if ((++count % 1000) == 0) { + cout << "count: " << count << endl; + } + + unsigned int index = random(test_map.size()); + WriteBatch batch; + + if (test_map[index] == nullptr) { + num_items++; + test_map[index] = + new pair(newString(index), newString(index)); + batch.Put(test_map[index]->first, test_map[index]->second); + } else { + ASSERT_OK(db->Get(readOptions, test_map[index]->first, &old_value)); + if (old_value != test_map[index]->second) { + cout << "ERROR incorrect value returned by Get" << endl; + cout << " count=" << count << endl; + cout << " old value=" << old_value << endl; + cout << " test_map[index]->second=" << test_map[index]->second << endl; + cout << " test_map[index]->first=" << test_map[index]->first << endl; + cout << " index=" << index << endl; + ASSERT_EQ(old_value, test_map[index]->second); + } + + if (num_items >= target_size && random(100) > 30) { + batch.Delete(test_map[index]->first); + delete test_map[index]; + test_map[index] = nullptr; + --num_items; + } else { + test_map[index]->second = newString(index); + if (delete_before_put) batch.Delete(test_map[index]->first); + batch.Put(test_map[index]->first, test_map[index]->second); + } + } + + ASSERT_OK(db->Write(writeOptions, &batch)); + + if (keep_snapshots && random(10) == 0) { + unsigned int i = random(snapshots.size()); + if (snapshots[i] != nullptr) { + db->ReleaseSnapshot(snapshots[i]); + } + snapshots[i] = db->GetSnapshot(); + } + } + + for (Snapshot const* snapshot : snapshots) { + if (snapshot) { + db->ReleaseSnapshot(snapshot); + } + } + + for (size_t i = 0; i < test_map.size(); ++i) { + if (test_map[i] != nullptr) { + delete test_map[i]; + test_map[i] = nullptr; + } + } + + delete db; + DestroyDB(dbpath, options); +} + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +}