Skip to content

Commit

Permalink
Merge branch 'main' into c_api_statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Jul 7, 2023
2 parents 5e0f901 + baf37a0 commit 8549da0
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 83 deletions.
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Add `WriteBatch::Release()` that releases the batch's serialized data to the caller.

### Public API Changes
* Add parameter `deletion_ratio` to C API `rocksdb_options_add_compact_on_deletion_collector_factory`.
* Add C API `rocksdb_options_add_compact_on_deletion_collector_factory_del_ratio`.
* change the FileSystem::use_async_io() API to SupportedOps API in order to extend it to various operations supported by underlying FileSystem. Right now it contains FSSupportedOps::kAsyncIO and FSSupportedOps::kFSBuffer. More details about FSSupportedOps in filesystem.h
* Add new tickers: `rocksdb.error.handler.bg.error.count`, `rocksdb.error.handler.bg.io.error.count`, `rocksdb.error.handler.bg.retryable.io.error.count` to replace the misspelled ones: `rocksdb.error.handler.bg.errro.count`, `rocksdb.error.handler.bg.io.errro.count`, `rocksdb.error.handler.bg.retryable.io.errro.count` ('error' instead of 'errro'). Users should switch to use the new tickers before 9.0 release as the misspelled old tickers will be completely removed then.
* Overload the API CreateColumnFamilyWithImport() to support creating ColumnFamily by importing multiple ColumnFamilies It requires that CFs should not overlap in user key range.
Expand Down
8 changes: 8 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3966,6 +3966,14 @@ void rocksdb_options_set_row_cache(rocksdb_options_t* opt,
}

void rocksdb_options_add_compact_on_deletion_collector_factory(
rocksdb_options_t* opt, size_t window_size, size_t num_dels_trigger) {
std::shared_ptr<ROCKSDB_NAMESPACE::TablePropertiesCollectorFactory>
compact_on_del =
NewCompactOnDeletionCollectorFactory(window_size, num_dels_trigger);
opt->rep.table_properties_collector_factories.emplace_back(compact_on_del);
}

void rocksdb_options_add_compact_on_deletion_collector_factory_del_ratio(
rocksdb_options_t* opt, size_t window_size, size_t num_dels_trigger,
double deletion_ratio) {
std::shared_ptr<ROCKSDB_NAMESPACE::TablePropertiesCollectorFactory>
Expand Down
4 changes: 3 additions & 1 deletion db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,9 @@ int main(int argc, char** argv) {
rocksdb_compactoptions_set_exclusive_manual_compaction(coptions, 1);

rocksdb_options_add_compact_on_deletion_collector_factory(options, 10000,
10001, 0.0);
10001);
rocksdb_options_add_compact_on_deletion_collector_factory_del_ratio(
options, 10000, 10001, 0.0);

StartPhase("destroy");
rocksdb_destroy_db(options, dbname, &err);
Expand Down
20 changes: 8 additions & 12 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,8 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,

const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
if (result.ttl == kDefaultTtl) {
if (is_block_based_table &&
result.compaction_style != kCompactionStyleFIFO) {
if (is_block_based_table) {
// For FIFO, max_open_files is checked in ValidateOptions().
result.ttl = kAdjustedTtl;
} else {
result.ttl = 0;
Expand All @@ -403,16 +403,12 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
}
} else {
// result.compaction_style == kCompactionStyleFIFO
if (result.ttl == 0) {
if (is_block_based_table) {
if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
}
result.ttl = result.periodic_compaction_seconds;
}
} else if (result.periodic_compaction_seconds != 0) {
result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
if (result.periodic_compaction_seconds != kDefaultPeriodicCompSecs &&
result.periodic_compaction_seconds > 0) {
ROCKS_LOG_WARN(
db_options.info_log.get(),
"periodic_compaction_seconds does not support FIFO compaction. You"
"may want to set option TTL instead.");
}
}

Expand Down
15 changes: 8 additions & 7 deletions db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -880,9 +880,13 @@ TEST_F(DBOptionsTest, SanitizeFIFOPeriodicCompaction) {
Options options;
options.compaction_style = kCompactionStyleFIFO;
options.env = CurrentOptions().env;
// Default value allows RocksDB to set ttl to 30 days.
ASSERT_EQ(30 * 24 * 60 * 60, dbfull()->GetOptions().ttl);

// Disable
options.ttl = 0;
Reopen(options);
ASSERT_EQ(30 * 24 * 60 * 60, dbfull()->GetOptions().ttl);
ASSERT_EQ(0, dbfull()->GetOptions().ttl);

options.ttl = 100;
Reopen(options);
Expand All @@ -892,15 +896,12 @@ TEST_F(DBOptionsTest, SanitizeFIFOPeriodicCompaction) {
Reopen(options);
ASSERT_EQ(100 * 24 * 60 * 60, dbfull()->GetOptions().ttl);

options.ttl = 200;
options.periodic_compaction_seconds = 300;
Reopen(options);
ASSERT_EQ(200, dbfull()->GetOptions().ttl);

// periodic_compaction_seconds should have no effect
// on FIFO compaction.
options.ttl = 500;
options.periodic_compaction_seconds = 300;
Reopen(options);
ASSERT_EQ(300, dbfull()->GetOptions().ttl);
ASSERT_EQ(500, dbfull()->GetOptions().ttl);
}

TEST_F(DBOptionsTest, SetFIFOCompactionOptions) {
Expand Down
132 changes: 95 additions & 37 deletions db/db_wal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,25 @@ class DBWALTestWithTimestamp
DBWALTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}

void SetUp() override {
persist_udt_ = test::ShouldPersistUDT(GetParam());
DBBasicTestWithTimestampBase::SetUp();
}

Status CreateAndReopenWithCFWithTs(const std::vector<std::string>& cfs,
const Options& options) {
const Options& options,
bool avoid_flush_during_recovery = false) {
CreateColumnFamilies(cfs, options);
return ReopenColumnFamiliesWithTs(cfs, options);
return ReopenColumnFamiliesWithTs(cfs, options,
avoid_flush_during_recovery);
}

Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
Options ts_options) {
Options ts_options,
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.create_if_missing = false;
default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
ts_options.create_if_missing = false;

std::vector<Options> cf_options(cfs.size(), ts_options);
Expand All @@ -345,97 +354,146 @@ class DBWALTestWithTimestamp
}

void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
const std::string& expected_value) {
const std::string& expected_value,
const std::string& expected_ts) {
std::string actual_value;
ASSERT_OK(db_->Get(read_opts, handles_[cf], key, &actual_value));
std::string actual_ts;
ASSERT_OK(
db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts));
ASSERT_EQ(expected_value, actual_value);
ASSERT_EQ(expected_ts, actual_ts);
}

protected:
bool persist_udt_;
};

TEST_F(DBWALTestWithTimestamp, Recover) {
TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
// Set up the option that enables user defined timestmp size.
std::string ts = Timestamp(1, 0);
const size_t kTimestampSize = ts.size();
std::string ts1 = Timestamp(1, 0);
const size_t kTimestampSize = ts1.size();
TestComparator test_cmp(kTimestampSize);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
// Test that user-defined timestamps are recovered from WAL regardless of
// the value of this flag because UDTs are saved in WAL nonetheless.
// We however need to explicitly disable flush during recovery by setting
// `avoid_flush_during_recovery=true` so that we can avoid timestamps getting
// stripped when the `persist_user_defined_timestamps` flag is false, so that
// all written timestamps are available for testing user-defined time travel
// read.
ts_options.persist_user_defined_timestamps = persist_udt_;
bool avoid_flush_during_recovery = true;

ReadOptions read_opts;
Slice ts_slice = ts;
read_opts.timestamp = &ts_slice;
do {
ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(Put(1, "foo", ts, "v1"));
ASSERT_OK(Put(1, "baz", ts, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
CheckGet(read_opts, 1, "foo", "v1");
CheckGet(read_opts, 1, "baz", "v5");
ASSERT_OK(Put(1, "bar", ts, "v2"));
ASSERT_OK(Put(1, "foo", ts, "v3"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
CheckGet(read_opts, 1, "foo", "v3");
ASSERT_OK(Put(1, "foo", ts, "v4"));
CheckGet(read_opts, 1, "foo", "v4");
CheckGet(read_opts, 1, "bar", "v2");
CheckGet(read_opts, 1, "baz", "v5");
Slice ts_slice = ts1;
read_opts.timestamp = &ts_slice;
ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, "foo", ts1, "v1"));
ASSERT_OK(Put(1, "baz", ts1, "v5"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
// Do a timestamped read with ts1 after second reopen.
CheckGet(read_opts, 1, "foo", "v1", ts1);
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Write more value versions for key "foo" and "bar" before and after second
// reopen.
std::string ts2 = Timestamp(2, 0);
ASSERT_OK(Put(1, "bar", ts2, "v2"));
ASSERT_OK(Put(1, "foo", ts2, "v3"));

ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
std::string ts3 = Timestamp(3, 0);
ASSERT_OK(Put(1, "foo", ts3, "v4"));

// Do a timestamped read with ts1 after third reopen.
CheckGet(read_opts, 1, "foo", "v1", ts1);
std::string value;
ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound());
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts2 after third reopen.
ts_slice = ts2;
CheckGet(read_opts, 1, "foo", "v3", ts2);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);

// Do a timestamped read with ts3 after third reopen.
ts_slice = ts3;
CheckGet(read_opts, 1, "foo", "v4", ts3);
CheckGet(read_opts, 1, "bar", "v2", ts2);
CheckGet(read_opts, 1, "baz", "v5", ts1);
} while (ChangeWalOptions());
}

TEST_F(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
// Set up the option that enables user defined timestmp size.
std::string ts = Timestamp(1, 0);
const size_t kTimestampSize = ts.size();
TestComparator test_cmp(kTimestampSize);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
ts_options.persist_user_defined_timestamps = persist_udt_;

ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(Put(1, "foo", ts, "v1"));
ASSERT_OK(Put(1, "baz", ts, "v5"));

// In real use cases, switching to a different user comparator is prohibited
// by a sanity check during DB open that does a user comparator name
// comparison. This test mocked and bypassed that sanity check because the
// before and after user comparator are both named "TestComparator". This is
// to test the user-defined timestamp recovery logic for WAL files have
// the intended consistency check.
// `HandleWriteBatchTimestampSizeDifference` in udt_util.h has more details.
TestComparator diff_test_cmp(kTimestampSize + 1);
ts_options.comparator = &diff_test_cmp;
ASSERT_TRUE(
ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument());
}

TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
// Set up the option that enables user defined timestmp size.
// Set up the option that enables user defined timestamp size.
std::string min_ts = Timestamp(0, 0);
std::string write_ts = Timestamp(1, 0);
const size_t kTimestampSize = write_ts.size();
TestComparator test_cmp(kTimestampSize);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;
ts_options.persist_user_defined_timestamps = persist_udt_;

std::string smallest_ukey_without_ts = "baz";
std::string largest_ukey_without_ts = "foo";

ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
// No flush, no sst files, because of no data.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));

// Very small write buffer size to force flush memtables recovered from WAL.
ts_options.write_buffer_size = 16;
ts_options.arena_block_size = 16;
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
// Memtable recovered from WAL flushed because `avoid_flush_during_recovery`
// defaults to false, created one L0 file.
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U);

std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
ASSERT_GT(level_to_files.size(), 1);
// L0 only has one SST file.
ASSERT_EQ(level_to_files[0].size(), 1);
auto meta = level_to_files[0][0];
if (persist_udt) {
if (persist_udt_) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
} else {
Expand All @@ -446,7 +504,7 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {

// Param 0: test mode for the user-defined timestamp feature
INSTANTIATE_TEST_CASE_P(
RecoverAndFlush, DBWALTestWithTimestamp,
DBWALTestWithTimestamp, DBWALTestWithTimestamp,
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));
Expand Down
6 changes: 2 additions & 4 deletions db/file_indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,15 @@ void FileIndexer::GetNextLevelIndex(const size_t level, const size_t file_index,
} else if (cmp_smallest == 0) {
*left_bound = index.smallest_lb;
*right_bound = index.smallest_rb;
} else if (cmp_smallest > 0 && cmp_largest < 0) {
} else if (cmp_largest < 0) {
*left_bound = index.smallest_lb;
*right_bound = index.largest_rb;
} else if (cmp_largest == 0) {
*left_bound = index.largest_lb;
*right_bound = index.largest_rb;
} else if (cmp_largest > 0) {
} else {
*left_bound = index.largest_lb;
*right_bound = level_rb_[level + 1];
} else {
assert(false);
}

assert(*left_bound >= 0);
Expand Down
2 changes: 1 addition & 1 deletion db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
earliest_seqno_.load(std::memory_order_relaxed);
while (
(cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
!first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
!earliest_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
}
}
if (type == kTypeRangeDeletion) {
Expand Down
Loading

0 comments on commit 8549da0

Please sign in to comment.