diff --git a/build_tools/check-sources.sh b/build_tools/check-sources.sh index 69a7d8ef9e..4c7fe6e59a 100755 --- a/build_tools/check-sources.sh +++ b/build_tools/check-sources.sh @@ -31,7 +31,8 @@ fi git grep -n 'using namespace' -- ':!build_tools' ':!docs' \ ':!third-party/folly/folly/lang/Align.h' \ - ':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h' + ':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h' \ + ':!examples/speedb_with_ttl_example.cc' if [ "$?" != "1" ]; then echo '^^^^ Do not use "using namespace"' BAD=1 diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index c361720fe2..c3fe8e7b52 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -91,6 +91,7 @@ DECLARE_bool(test_cf_consistency); DECLARE_bool(test_multi_ops_txns); DECLARE_int32(threads); DECLARE_int32(ttl); +DECLARE_bool(skip_expired_data); DECLARE_int32(value_size_mult); DECLARE_int32(compaction_readahead_size); DECLARE_bool(enable_pipelined_write); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 79dd5f0812..0707e0e7e7 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -101,6 +101,8 @@ DEFINE_int32(ttl, -1, "Carefully specify a large value such that verifications on " "deleted values don't fail"); +DEFINE_bool(skip_expired_data, false, "If true, will skip keys expired by TTL"); + DEFINE_int32(value_size_mult, 8, "Size of value will be this number times rand_int(1,3) bytes"); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index c8b7580a39..dc8a978699 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -682,6 +682,12 @@ void StressTest::OperateDb(ThreadState* thread) { read_opts.async_io = FLAGS_async_io; read_opts.adaptive_readahead = FLAGS_adaptive_readahead; read_opts.readahead_size = FLAGS_readahead_size; + if (gflags::GetCommandLineFlagInfoOrDie("ttl").is_default && + FLAGS_skip_expired_data && FLAGS_ttl < 1) { + auto error_msg = + IOStatus::InvalidArgument("skip_expired_data must be set with ttl"); + } + read_opts.skip_expired_data = FLAGS_skip_expired_data; WriteOptions write_opts; if (FLAGS_rate_limit_auto_wal_flush) { write_opts.rate_limiter_priority = Env::IO_USER; @@ -2492,7 +2498,6 @@ void StressTest::Open(SharedState* shared) { Status s; - if (FLAGS_ttl == -1) { std::vector existing_column_families; s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, &existing_column_families); // ignore errors @@ -2611,11 +2616,44 @@ void StressTest::Open(SharedState* shared) { #endif // !ROCKSDB_LITE { if (db_preload_finished_.load() && FLAGS_read_only) { - s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, - cf_descriptors, &column_families_, &db_); + if (FLAGS_ttl == -1) { + s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, + cf_descriptors, &column_families_, &db_); + } else { + DBWithTTL* dbttl; + std::vector ttls; + for (size_t i = 0; i < cf_descriptors.size(); ++i) { + ttls.push_back(FLAGS_ttl); + } + s = DBWithTTL::Open(DBOptions(options_), FLAGS_db, cf_descriptors, + &column_families_, &dbttl, ttls, true); + if (!s.ok()) { + fprintf(stderr, "Cannot read only open db with ttl. %s\n", + s.ToString().c_str()); + exit(1); + } + db_ = dbttl; + } } else { - s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, - &column_families_, &db_); + if (FLAGS_ttl == -1) { + s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, + &column_families_, &db_); + } else { + std::vector ttls; + for (size_t i = 0; i < cf_descriptors.size(); ++i) { + ttls.push_back(FLAGS_ttl); + } + DBWithTTL* dbttl; + + s = DBWithTTL::Open(DBOptions(options_), FLAGS_db, cf_descriptors, + &column_families_, &dbttl, ttls); + if (!s.ok()) { + fprintf(stderr, "Cannot open db with ttl. %s\n", + s.ToString().c_str()); + exit(1); + } + db_ = dbttl; + } } } @@ -2743,16 +2781,6 @@ void StressTest::Open(SharedState* shared) { exit(1); #endif // !ROCKSDB_LITE } - } else { -#ifndef ROCKSDB_LITE - DBWithTTL* db_with_ttl; - s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); - db_ = db_with_ttl; -#else - fprintf(stderr, "TTL is not supported in LITE mode\n"); - exit(1); -#endif - } if (FLAGS_preserve_unverified_changes) { // Up until now, no live file should have become obsolete due to these diff --git a/examples/.gitignore b/examples/.gitignore index 158f9b50d2..9dac44fc23 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -7,4 +7,6 @@ optimistic_transaction_example options_file_example simple_example transaction_example -speedb_is_awesome_example \ No newline at end of file +rocksdb_backup_restore_example +speedb_is_awesome_example +speedb_with_ttl_example diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 0b93a6d8d2..d761c88247 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -43,3 +43,13 @@ add_executable(multi_processes_example multi_processes_example.cc) target_link_libraries(multi_processes_example ${ROCKSDB_LIB}) + +add_executable(speedb_with_ttl_example + speedb_with_ttl_example.cc) + target_link_libraries(speedb_with_ttl_example + ${ROCKSDB_LIB}) + +add_executable(speedb_is_awesome_example + speedb_is_awesome_example.cc) + target_link_libraries(speedb_is_awesome_example + ${ROCKSDB_LIB}) diff --git a/examples/Makefile b/examples/Makefile index a535e7b589..59e2ca2475 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -19,7 +19,8 @@ CFLAGS += -Wstrict-prototypes .PHONY: clean static_lib -all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example +all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example \ + transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example simple_example: static_lib simple_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) @@ -54,11 +55,16 @@ multi_processes_example: static_lib multi_processes_example.cc speedb_is_awesome_example: static_lib speedb_is_awesome_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) +speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) clean: - rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example ./speedb_is_awesome_example + rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o \ + ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example \ + ./speedb_is_awesome_example ./speedb_with_ttl_example static_lib: LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib diff --git a/examples/speedb_with_ttl_example.cc b/examples/speedb_with_ttl_example.cc new file mode 100644 index 0000000000..b8fbaad8d4 --- /dev/null +++ b/examples/speedb_with_ttl_example.cc @@ -0,0 +1,133 @@ +// Copyright (C) 2023 Speedb Ltd. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/db_ttl.h" + +using namespace ROCKSDB_NAMESPACE; + +#if defined(OS_WIN) +std::string kDBPath = "C:\\Windows\\TEMP\\speedb_with_ttl_example"; +#else +std::string kDBPath = "/tmp/speedb_with_ttl_example"; +#endif + +int main() { + // Open the storage + DBWithTTL* db = nullptr; + Options options; + // Create the DB if it's not already present + options.create_if_missing = true; + // Configure time to live of the objects + int32_t ttl = 1; + // Keys to insert to the db + std::string key1 = "key_1"; + std::string key2 = "key_2"; + std::string key3 = "key_3"; + // Value for the keys + std::string put_value1 = "1 Speedb is awesome!"; + std::string put_value2 = "2 Speedb is awesome!"; + std::string put_value3 = "3 Speedb is awesome!"; + // Value to fetch from the db + std::string get_value; + ReadOptions ropts = ReadOptions(); + // Configure that we will not get keys that have been expired by ttl. + // The default behaviour is to return keys until the compation will delete. + ropts.skip_expired_data = true; + std::vector keys = {key1, key2}; + std::vector values; + + Status s = DBWithTTL::Open(options, kDBPath, &db, ttl); + assert(s.ok()); + + s = db->Put(WriteOptions(), key1, put_value1); + assert(s.ok()); + s = db->Put(WriteOptions(), key2, put_value2); + assert(s.ok()); + s = db->Get(ropts, key1, &get_value); + assert(s.ok()); + std::cout << "The value returned by db Get before expiration is: " + << std::endl + << get_value << std::endl + << std::endl; + std::cout << "The value returned by db MultiGet before expiration are: " + << std::endl; + auto statuses = db->MultiGet(ropts, keys, &values); + for (const auto& status : statuses) { + assert(status.ok()); + } + for (const auto& value : values) { + std::cout << value << std::endl; + } + std::cout << std::endl; + // sleeps more than the ttl to emphasize the expiration of objects + sleep(ttl + 1); + + s = db->Get(ropts, key1, &get_value); + if (s.IsNotFound()) { + std::cout << "Key has been expired as expected by Get" << std::endl; + } + statuses = db->MultiGet(ropts, keys, &values); + for (const auto& i : statuses) { + if (i.IsNotFound()) { + std::cout << "Key has been expired as expected by MultiGet" << std::endl; + } + } + ropts.skip_expired_data = false; + std::cout << "Keys actually stored but expired by MultiGet, without " + "skip_expired_data" + << std::endl; + statuses = db->MultiGet(ropts, keys, &values); + for (size_t i = 0; i < statuses.size(); ++i) { + if (statuses[i].ok()) { + std::cout << keys[i].ToStringView() << ":" << values[i] << std::endl; + } + } + ropts.skip_expired_data = true; + db->SetTtl(1000); + s = db->Get(ropts, key1, &get_value); + assert(s.ok()); + // close DB + s = db->Close(); + s = DBWithTTL::Open(options, kDBPath, &db, ttl, true); + sleep(ttl + 1); + s = db->Get(ropts, key1, &get_value); + assert(s.IsNotFound()); + std::cout << "Open DB with read_only will not return expired keys " + << std::endl + << std::endl; + db->Close(); + s = DBWithTTL::Open(options, kDBPath, &db, ttl); + ropts = ReadOptions(); + ropts.skip_expired_data = true; + s = db->Put(WriteOptions(), key3, put_value3); + auto it = db->NewIterator(ropts); + + assert(s.ok()); + + it->SeekToFirst(); + if (it->Valid()) { + // Because key_1 and key_2 expired this line should print key_3 + std::cout << "skip to: " << it->key().ToStringView() << std::endl; + } + delete it; + delete db; + return 0; +} \ No newline at end of file diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1855af721b..57125a3034 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1731,6 +1731,10 @@ struct ReadOptions { // Default: true bool optimize_multiget_for_io; + // If true, DB with TTL will not Get keys that reached their timeout + // Default: false + bool skip_expired_data = false; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 56e7ebca42..b28cf4bed7 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -73,6 +73,7 @@ #ifndef ROCKSDB_LITE #include "rocksdb/utilities/replayer.h" #endif // ROCKSDB_LITE +#include "rocksdb/utilities/db_ttl.h" #include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" @@ -1933,6 +1934,9 @@ DEFINE_bool(track_and_verify_wals_in_manifest, ROCKSDB_NAMESPACE::Options().track_and_verify_wals_in_manifest, "If true, enable WAL tracking in the MANIFEST"); +DEFINE_bool(skip_expired_data, false, "If true, will skip keys expired by TTL"); + +DEFINE_int32(ttl, -1, "Opens the db with this ttl value if value is positive"); namespace { // Auxiliary collection of the indices of the DB-s to be used in the next group std::vector db_idxs_to_use; @@ -3744,6 +3748,7 @@ class Benchmark { read_options_.adaptive_readahead = FLAGS_adaptive_readahead; read_options_.async_io = FLAGS_async_io; read_options_.optimize_multiget_for_io = FLAGS_optimize_multiget_for_io; + read_options_.skip_expired_data = FLAGS_skip_expired_data; void (Benchmark::*method)(ThreadState*) = nullptr; void (Benchmark::*post_process_method)() = nullptr; @@ -3753,6 +3758,14 @@ class Benchmark { int num_repeat = 1; int num_warmup = 0; + if (!gflags::GetCommandLineFlagInfoOrDie("ttl").is_default && + FLAGS_ttl < 1) { + ErrorExit("ttl must be positive value"); + } + if (gflags::GetCommandLineFlagInfoOrDie("ttl").is_default && + FLAGS_skip_expired_data) { + ErrorExit("ttl must be set to use skip_expired_data"); + } if (!name.empty() && *name.rbegin() == ']') { auto it = name.find('['); if (it == std::string::npos) { @@ -5248,8 +5261,19 @@ class Benchmark { } #ifndef ROCKSDB_LITE if (FLAGS_readonly) { - s = DB::OpenForReadOnly(options, db_name, column_families, - &db->cfh, &db->db); + if (FLAGS_ttl > 0) { + DBWithTTL* db_with_ttl; + // true means read only + std::vector ttls(column_families.size(), FLAGS_ttl); + s = DBWithTTL::Open(options, db_name, column_families, &db->cfh, + &db_with_ttl, ttls, true); + if (s.ok()) { + db->db = db_with_ttl; + } + } else { + s = DB::OpenForReadOnly(options, db_name, column_families, &db->cfh, + &db->db); + } } else if (FLAGS_optimistic_transaction_db) { s = OptimisticTransactionDB::Open(options, db_name, column_families, &db->cfh, &db->opt_txn_db); @@ -5270,7 +5294,17 @@ class Benchmark { db->db = ptr; } } else { - s = DB::Open(options, db_name, column_families, &db->cfh, &db->db); + if (FLAGS_ttl > 0) { + DBWithTTL* db_with_ttl; + std::vector ttls(column_families.size(), FLAGS_ttl); + s = DBWithTTL::Open(options, db_name, column_families, &db->cfh, + &db_with_ttl, ttls); + if (s.ok()) { + db->db = db_with_ttl; + } + } else { + s = DB::Open(options, db_name, column_families, &db->cfh, &db->db); + } } #else s = DB::Open(options, db_name, column_families, &db->cfh, &db->db); @@ -5281,7 +5315,16 @@ class Benchmark { db->cfh_idx_to_prob = std::move(cfh_idx_to_prob); #ifndef ROCKSDB_LITE } else if (FLAGS_readonly) { - s = DB::OpenForReadOnly(options, db_name, &db->db); + if (FLAGS_ttl > 0) { + DBWithTTL* db_with_ttl; + // true means read only + s = DBWithTTL::Open(options, db_name, &db_with_ttl, FLAGS_ttl, true); + if (s.ok()) { + db->db = db_with_ttl; + } + } else { + s = DB::OpenForReadOnly(options, db_name, &db->db); + } } else if (FLAGS_optimistic_transaction_db) { s = OptimisticTransactionDB::Open(options, db_name, &db->opt_txn_db); if (s.ok()) { @@ -5346,6 +5389,20 @@ class Benchmark { FLAGS_secondary_update_interval, db)); } #endif // ROCKSDB_LITE + } else if (FLAGS_ttl > 0) { + std::vector column_families; + column_families.push_back(ColumnFamilyDescriptor( + kDefaultColumnFamilyName, ColumnFamilyOptions(options))); + DBWithTTL* db_with_ttl; + std::vector ttls(column_families.size(), FLAGS_ttl); + s = DBWithTTL::Open(options, db_name, column_families, &db->cfh, + &db_with_ttl, ttls); + if (s.ok()) { + db->db = db_with_ttl; + db->cfh.resize(1); + db->num_created = 1; + db->num_hot = 1; + } } else { std::vector column_families; column_families.push_back(ColumnFamilyDescriptor( @@ -6346,7 +6403,12 @@ class Benchmark { options.timestamp = &ts; ts_ptr = &ts_ret; } - auto status = db->Get(options, key, &value, ts_ptr); + Status status; + if (user_timestamp_size_ > 0) { + status = db->Get(options, key, &value, ts_ptr); + } else { + status = db->Get(options, key, &value); + } if (status.ok()) { ++found; } else if (!status.IsNotFound()) { @@ -6479,8 +6541,10 @@ class Benchmark { options, cfh, key, pinnable_vals.data(), &get_merge_operands_options, &number_of_operands); } - } else { + } else if (user_timestamp_size_ > 0) { s = db_with_cfh->db->Get(options, cfh, key, &pinnable_val, ts_ptr); + } else { + s = db_with_cfh->db->Get(options, cfh, key, &pinnable_val); } if (s.ok()) { diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index 1c2c6daa1f..44b31ae412 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -433,7 +433,7 @@ Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) { return Status::Corruption("Error: value's length less than timestamp's\n"); } // Checks that TS is not lesser than kMinTimestamp - // Gaurds against corruption & normal database opened incorrectly in ttl mode + // Guards against corruption & normal database opened incorrectly in ttl mode int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength); if (timestamp_value < kMinTimestamp) { return Status::Corruption("Error: Timestamp < ttl feature release time!\n"); @@ -442,6 +442,7 @@ Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) { } // Checks if the string is stale or not according to TTl provided +// Generic IsStale implementation bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, SystemClock* clock) { if (ttl <= 0) { // Data is fresh if TTL is non-positive @@ -456,6 +457,30 @@ bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, return (timestamp_value + ttl) < curtime; } +// IsStale for strict ttl +bool DBWithTTLImpl::IsStaleStrictTtl(const Slice& value, + ColumnFamilyHandle* column_family, + const ReadOptions& options) { + Options opts = GetOptions(column_family); + auto filter = std::static_pointer_cast( + opts.compaction_filter_factory); + int32_t ttl = filter->GetTtl(); + if (ttl <= 0) { + return false; + } + if (options.snapshot == nullptr) { + SystemClock* clock = (opts.env == nullptr) + ? SystemClock::Default().get() + : opts.env->GetSystemClock().get(); + return IsStale(value, ttl, clock); + } else { + int64_t snapshot_time = options.snapshot->GetUnixTime(); + int32_t timestamp_value = + DecodeFixed32(value.data() + value.size() - kTSLength); + return (timestamp_value + ttl) < snapshot_time; + } +} + // Strips the TS from the end of the slice Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) { if (pinnable_val->size() < kTSLength) { @@ -498,6 +523,11 @@ Status DBWithTTLImpl::Get(const ReadOptions& options, if (!st.ok()) { return st; } + if (options.skip_expired_data) { + if (IsStaleStrictTtl(*value, column_family, options)) { + return Status::NotFound(); + } + } return StripTS(value); } @@ -514,7 +544,20 @@ std::vector DBWithTTLImpl::MultiGet( if (!statuses[i].ok()) { continue; } - statuses[i] = StripTS(&(*values)[i]); + // check if the key has been expired if is_stale == true it's expired + // re-check if the key expired for each key requested by the multiget + bool is_stale = false; + if (options.skip_expired_data) { + if (IsStaleStrictTtl((*values)[i], column_family[i], options)) { + statuses[i] = Status::NotFound(); + is_stale = true; + } + } + if (!is_stale) { + statuses[i] = StripTS(&(*values)[i]); + } else { + (*values)[i] = ""; + } } return statuses; } @@ -592,7 +635,40 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts, ColumnFamilyHandle* column_family) { - return new TtlIterator(db_->NewIterator(opts, column_family)); + Options cfopts = GetOptions(column_family); + auto filter = std::static_pointer_cast( + cfopts.compaction_filter_factory); + int32_t ttl = filter->GetTtl(); + bool skip_expired = opts.skip_expired_data; + int64_t creation_time; + if (opts.snapshot == nullptr) { + auto status = + cfopts.env->GetSystemClock().get()->GetCurrentTime(&creation_time); + if (!status.ok()) { + return NewErrorIterator(status); + } + } else { + creation_time = opts.snapshot->GetUnixTime(); + } + return new TtlIterator(db_->NewIterator(opts, column_family), ttl, + skip_expired, creation_time); +} + +void TtlIterator::HandleExpired(bool move_forward) { + if (!skip_expired_data_) { + return; + } + while (Valid()) { + if ((ttl_timestamp() + ttl_) < creation_time_) { + if (move_forward) { + iter_->Next(); + } else { + iter_->Prev(); + } + } else { + return; + } + } } void DBWithTTLImpl::SetTtl(ColumnFamilyHandle *h, int32_t ttl) { diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index 7c43501a43..218ea077b8 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -86,6 +86,10 @@ class DBWithTTLImpl : public DBWithTTL { static bool IsStale(const Slice& value, int32_t ttl, SystemClock* clock); + // IsStale for strict ttl + bool IsStaleStrictTtl(const Slice& value, ColumnFamilyHandle* column_family, + const ReadOptions& options); + static Status AppendTS(const Slice& val, std::string* val_with_ts, SystemClock* clock); @@ -113,23 +117,52 @@ class DBWithTTLImpl : public DBWithTTL { class TtlIterator : public Iterator { public: - explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); } + explicit TtlIterator(Iterator* iter, int32_t ttl, bool skip_expired_data, + int64_t creation_time) + : iter_(iter), + ttl_(ttl), + skip_expired_data_(skip_expired_data), + creation_time_(creation_time) + + { + assert(iter_); + } ~TtlIterator() { delete iter_; } bool Valid() const override { return iter_->Valid(); } - void SeekToFirst() override { iter_->SeekToFirst(); } + void SeekToFirst() override { + iter_->SeekToFirst(); + HandleExpired(true); + } - void SeekToLast() override { iter_->SeekToLast(); } + void SeekToLast() override { + iter_->SeekToLast(); + HandleExpired(false); + } - void Seek(const Slice& target) override { iter_->Seek(target); } + void Seek(const Slice& target) override { + iter_->Seek(target); + HandleExpired(true); + } - void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); } + void SeekForPrev(const Slice& target) override { + iter_->SeekForPrev(target); + HandleExpired(false); + } + + void Next() override { + iter_->Next(); + HandleExpired(true); + } - void Next() override { iter_->Next(); } + void Prev() override { + iter_->Prev(); + HandleExpired(false); + } - void Prev() override { iter_->Prev(); } + void HandleExpired(bool is_next); Slice key() const override { return iter_->key(); } @@ -150,6 +183,9 @@ class TtlIterator : public Iterator { private: Iterator* iter_; + int32_t ttl_ = 0; + bool skip_expired_data_ = false; + int64_t creation_time_; }; class TtlCompactionFilter : public LayeredCompactionFilterBase { @@ -192,6 +228,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory { void SetTtl(int32_t ttl) { ttl_ = ttl; } + int32_t GetTtl() { return ttl_; } const char* Name() const override { return kClassName(); } static const char* kClassName() { return "TtlCompactionFilterFactory"; } diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 982051ac77..81b9e0c343 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -400,6 +400,7 @@ class TtlTest : public testing::Test { // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer static const int64_t kSampleSize_ = 100; + static const int32_t ttl_ = 1; std::string dbname_; DBWithTTL* db_ttl_; std::unique_ptr env_; @@ -726,6 +727,467 @@ TEST_F(TtlTest, DeleteRangeTest) { CloseTtl(); } +// This test is a placeholder and disabled as the current ttl compaction deletes +// kv pair although they are part of a snapshot +TEST_F(TtlTest, DISABLED_CompactionTTLDoNotAffectSnapTest) { + OpenTtl(ttl_); + std::string key_1 = "a"; + std::string put_value = "val"; + auto ropts = ReadOptions(); + std::string value; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + ropts.snapshot = db_ttl_->GetSnapshot(); + ASSERT_NE(ropts.snapshot, nullptr); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // TODO prevent from ttl compaction to delete keys referenced by snapshot + // ASSERT_OK(db_ttl_->Get(ropts, key_1, &value)); + db_ttl_->ReleaseSnapshot(ropts.snapshot); + CloseTtl(); +} + +// Test if Merge is updating the timestamp after it has been ran +TEST_F(TtlTest, CompactionTTLConsiderLatestMergeTest) { + Options options; + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + std::string key_1 = "a"; + std::string put_value = "1"; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + auto ropts = ReadOptions(); + std::string value; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Merge(WriteOptions(), key_1, put_value)); + ASSERT_OK(db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_ttl_->Get(ropts, key_1, &value)); + ASSERT_TRUE(value.compare(put_value + "," + put_value) == 0); + db_ttl_->ReleaseSnapshot(ropts.snapshot); + CloseTtl(); +} + +// Check that strict ttl is taking into account new updated timestamp by merge +TEST_F(TtlTest, CompactionStrictTTLConsiderLatestMergeTest) { + Options options; + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + std::string key_1 = "a"; + std::string put_value = "1"; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Merge(WriteOptions(), key_1, put_value)); + ASSERT_OK(db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(db_ttl_->Get(ropts, key_1, &value)); + ASSERT_TRUE(value.compare(put_value + "," + put_value) == 0); + db_ttl_->ReleaseSnapshot(ropts.snapshot); + CloseTtl(); +} + +// Test if strict ttl skip expired keys +TEST_F(TtlTest, SkipExpiredTtlGetTest) { + OpenTtl(ttl_); + std::string key = "a"; + std::string put_value = "val"; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + env_->Sleep(ttl_ + 1); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + ASSERT_TRUE(db_ttl_->Get(ropts, key, &value).IsNotFound()); + CloseTtl(); +} + +// Test if strict ttl skip expired keys accessed by iterators seek to first +TEST_F(TtlTest, SkipExpiredTtlIterFirstTest) { + OpenTtl(ttl_); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string key_1 = "a"; + std::string key_2 = "b"; + std::string put_value = "val"; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_2, put_value)); + auto itr = db_ttl_->NewIterator(ropts); + std::string value; + itr->SeekToFirst(); + ASSERT_TRUE(itr->Valid()); + ASSERT_TRUE(itr->key().ToString().compare(key_2) == 0); + delete itr; + CloseTtl(); +} + +// Test if strict ttl skip expired keys accessed by iterators seek to last +TEST_F(TtlTest, SkipExpiredTtlIterLastTest) { + OpenTtl(ttl_); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string key_1 = "a"; + std::string key_2 = "b"; + std::string put_value = "val"; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_2, put_value)); + auto itr = db_ttl_->NewIterator(ropts); + std::string value; + itr->SeekToLast(); + ASSERT_TRUE(itr->Valid()); + ASSERT_TRUE(itr->key().ToString().compare(key_2) == 0); + delete itr; + CloseTtl(); +} + +// Test if strict ttl skip expired keys accessed by iterators seek next +TEST_F(TtlTest, SkipExpiredTtlIterNextTest) { + OpenTtl(ttl_); + std::string key_1 = "a"; + std::string key_2 = "b"; + std::string key_3 = "c"; + std::string key_4 = "d"; + std::string put_value = "val"; + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_4, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_2, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_3, put_value)); + auto itr = db_ttl_->NewIterator(ropts); + std::string value; + itr->SeekToFirst(); + ASSERT_TRUE(itr->Valid()); + itr->Next(); + ASSERT_TRUE(itr->Valid()); + ASSERT_TRUE(itr->key().ToString().compare(key_3) == 0); + delete itr; + CloseTtl(); +} + +// Test if strict ttl skip expired keys accessed by iterators seek prev +TEST_F(TtlTest, SkipExpiredTtlIterPrevTest) { + OpenTtl(ttl_); + std::string key_1 = "a"; + std::string key_2 = "b"; + std::string key_3 = "c"; + std::string key_4 = "d"; + std::string put_value = "val"; + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_4, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_2, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_3, put_value)); + auto itr = db_ttl_->NewIterator(ropts); + std::string value; + itr->SeekToLast(); + ASSERT_TRUE(itr->Valid()); + itr->Prev(); + ASSERT_TRUE(itr->Valid()); + ASSERT_TRUE(itr->key().ToString().compare(key_1) == 0); + delete itr; + CloseTtl(); +} + +// Test if strict ttl skip expired keys accessed by iterators seek +TEST_F(TtlTest, SkipExpiredTtlIterSeekTest) { + OpenTtl(ttl_); + std::string key_1 = "a"; + std::string key_2 = "b"; + std::string key_3 = "c"; + std::string key_4 = "d"; + std::string put_value = "val"; + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_4, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_2, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_3, put_value)); + auto itr = db_ttl_->NewIterator(ropts); + std::string value; + itr->Seek("b"); + ASSERT_TRUE(itr->Valid()); + ASSERT_TRUE(itr->key().ToString().compare(key_3) == 0); + delete itr; + CloseTtl(); +} + +// Test if strict ttl skip expired keys accessed by iterators seek prev +TEST_F(TtlTest, SkipExpiredTtlIterSeekPrevTest) { + OpenTtl(ttl_); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string key_1 = "a"; + std::string key_2 = "b"; + std::string key_3 = "c"; + std::string key_4 = "d"; + std::string put_value = "val"; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_4, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_2, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_3, put_value)); + auto itr = db_ttl_->NewIterator(ropts); + std::string value; + itr->SeekForPrev(key_2); + ASSERT_TRUE(itr->Valid()); + ASSERT_TRUE(itr->key().ToString().compare(key_1) == 0); + delete itr; + CloseTtl(); +} + +// Test if strict ttl skip expired keys when multiget is being used +TEST_F(TtlTest, SkipExpiredTtlGetMultiTest) { + OpenTtl(1); + std::string key = "a"; + std::string put_value = "val"; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + env_->Sleep(4); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::vector values; + ASSERT_TRUE(db_ttl_->MultiGet(ropts, {key}, &values)[0].IsNotFound()); + CloseTtl(); +} + +// Test if strict ttl returns non expired items +TEST_F(TtlTest, GetNotExpiredTtlGetTest) { + OpenTtl(ttl_ + 1); + std::string key = "a"; + std::string put_value = "val"; + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + env_->Sleep(ttl_); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + ASSERT_OK(db_ttl_->Get(ropts, "a", &value)); + CloseTtl(); +} + +// Test if strict ttl skip expired as read only +TEST_F(TtlTest, SkipExpiredReadOnlyTtlMultiGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string key_1 = "a"; + std::string key_2 = "b"; + std::string put_value = "val"; + std::vector values; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_1, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key_2, put_value)); + db_ttl_->Close(); + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_, true)); + env_->Sleep(ttl_ + 1); + auto statuses = db_ttl_->MultiGet(ropts, {key_1, key_2}, &values); + for (auto& status : statuses) { + ASSERT_TRUE(status.IsNotFound()); + } + CloseTtl(); +} + +// Test if strict ttl does not skip unexpired as read only +TEST_F(TtlTest, GetNotExpiredReadOnlyTtlGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + std::string key = "a"; + std::string put_value = "val"; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + db_ttl_->Close(); + // open ttl as read only + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_, true)); + env_->Sleep(ttl_ + 1); + ASSERT_TRUE(db_ttl_->Get(ropts, key, &value).IsNotFound()); + CloseTtl(); +} + +// Test if the expiration time is based on snapshot creation and not the current +// time (should not skip here) +TEST_F(TtlTest, GetFromSnapshotTtlGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + std::string key = "a"; + std::string put_value = "val"; + const Snapshot* snap; + int ttl = 2; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + snap = db_ttl_->GetSnapshot(); + ropts.snapshot = snap; + env_->Sleep(ttl + 1); + ASSERT_TRUE(db_ttl_->Get(ropts, "a", &value).ok()); + db_ttl_->ReleaseSnapshot(snap); + CloseTtl(); +} + +// Test if the expiration time is based on snapshot creation and not the current +// time (should skip here) +TEST_F(TtlTest, ExpireSnapshotTtlGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + std::string key = "a"; + std::string put_value = "val"; + const Snapshot* snap; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + env_->Sleep(ttl_ + 1); + snap = db_ttl_->GetSnapshot(); + ropts.snapshot = snap; + ASSERT_TRUE(db_ttl_->Get(ropts, "a", &value).IsNotFound()); + db_ttl_->ReleaseSnapshot(snap); + CloseTtl(); +} + +// Test if the expiration time is based on iterator creation and not the current +// time (should not skip here) +TEST_F(TtlTest, GetFromIteratorTtlGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string key = "a"; + std::string put_value = "val"; + std::string value; + Iterator* iter; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + iter = db_ttl_->NewIterator(ropts); + env_->Sleep(ttl_ + 1); + ASSERT_NE(iter, nullptr); + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(iter->value().ToString().compare(put_value) == 0); + delete iter; + CloseTtl(); +} + +// Test if the expiration time is based on iterator creation and not the current +// time (should skip here) +TEST_F(TtlTest, ExpireIteratorTtlGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + Iterator* iter; + std::string key = "a"; + std::string put_value = "val"; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + env_->Sleep(ttl_ + 1); + iter = db_ttl_->NewIterator(ropts); + iter->Seek(key); + ASSERT_FALSE(iter->Valid()); + delete iter; + CloseTtl(); +} + +// Test if the expiration time is based on snapshot creation and not the +// iterator creation (should not skip here) +TEST_F(TtlTest, GetFromSnapshotIteratorTtlGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + const Snapshot* snap; + std::string key = "a"; + std::string put_value = "val"; + Iterator* iter; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + snap = db_ttl_->GetSnapshot(); + ropts.snapshot = snap; + env_->Sleep(ttl_ + 1); + iter = db_ttl_->NewIterator(ropts); + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(iter->value().ToString().compare(put_value) == 0); + delete iter; + db_ttl_->ReleaseSnapshot(snap); + CloseTtl(); +} + +// Test if the expiration time is based on snapshot creation and not the +// iterator creation (should skip here) +TEST_F(TtlTest, ExpireIteratorFromSnapshotTtlGetTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + const Snapshot* snap; + std::string key = "a"; + std::string put_value = "val"; + Iterator* iter; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_, ttl_)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), key, put_value)); + env_->Sleep(ttl_ + 1); + snap = db_ttl_->GetSnapshot(); + ropts.snapshot = snap; + iter = db_ttl_->NewIterator(ropts); + iter->Seek(key); + ASSERT_FALSE(iter->Valid()); + delete iter; + db_ttl_->ReleaseSnapshot(snap); + CloseTtl(); +} + +// Test strict ttl with multiple CFs +TEST_F(TtlTest, SkipExpiredColumnFamiliesTest) { + Options options; + options.create_if_missing = true; + options.env = env_.get(); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string key = "a"; + std::string put_value = "val"; + std::string value; + std::vector handles; + ASSERT_OK(DBWithTTL::Open(options, dbname_, &db_ttl_)); + ColumnFamilyHandle* first_handle; + ColumnFamilyHandle* second_handle; + ASSERT_OK(db_ttl_->CreateColumnFamilyWithTtl(options, "ttl_column_family_1", + &first_handle, ttl_)); + handles.push_back(first_handle); + ASSERT_OK(db_ttl_->CreateColumnFamilyWithTtl(options, "ttl_column_family_2", + &second_handle, 0)); + handles.push_back(second_handle); + ASSERT_OK(db_ttl_->Put(WriteOptions(), handles[0], key, put_value)); + ASSERT_OK(db_ttl_->Put(WriteOptions(), handles[1], key, put_value)); + env_->Sleep(ttl_ + 1); + ASSERT_TRUE(db_ttl_->Get(ropts, handles[0], key, &value).IsNotFound()); + ASSERT_OK(db_ttl_->Get(ropts, handles[1], key, &value)); + for (auto& h : handles) { + delete h; + h = nullptr; + } +} + class DummyFilter : public CompactionFilter { public: bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,