From 3ddffa1e85dd20739e3a8e0e3998173da2518424 Mon Sep 17 00:00:00 2001 From: Or Friedmann Date: Tue, 14 Feb 2023 11:46:37 +0200 Subject: [PATCH] Skip expired object when using DBWithTtl Currently even if objects have been expired we still return them until compaction delete those objects. This commit adds a skip_expired_data to the ReadOptions, if this option is true, it Get and MultiGet will not return objects that have been expired by the configured ttl. Currently this feature does not include support of the flag for Merge operation or for iterator. --- examples/.gitignore | 4 +- examples/CMakeLists.txt | 12 +++++ examples/Makefile | 7 ++- examples/speedb_with_ttl_example.cc | 70 +++++++++++++++++++++++++++++ include/rocksdb/options.h | 8 ++++ options/options.cc | 6 ++- options/options_settable_test.cc | 2 + utilities/ttl/db_ttl_impl.cc | 33 +++++++++++++- utilities/ttl/db_ttl_impl.h | 4 +- utilities/ttl/ttl_test.cc | 22 +++++++++ 10 files changed, 161 insertions(+), 7 deletions(-) create mode 100644 examples/speedb_with_ttl_example.cc diff --git a/examples/.gitignore b/examples/.gitignore index 158f9b50d2..51a6b19ab8 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -7,4 +7,6 @@ optimistic_transaction_example options_file_example simple_example transaction_example -speedb_is_awesome_example \ No newline at end of file +speedb_is_awesome_example +speedb_with_ttl_example +rocksdb_backup_restore_example \ No newline at end of file diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 0b93a6d8d2..d468e2e806 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -43,3 +43,15 @@ add_executable(multi_processes_example multi_processes_example.cc) target_link_libraries(multi_processes_example ${ROCKSDB_LIB}) + +add_executable(speedb_with_ttl_example + EXCLUDE_FROM_ALL + speedb_with_ttl_example.cc) + target_link_libraries(speedb_with_ttl_example) + $({ROCKSDB_LIB}) + +add_executable(speedb_is_awesome_examle + EXCLUDE_FROM_ALL + speedb_is_awesome_examle.cc) + target_link_libraries(speedb_is_awesome_examle) + $({ROCKSDB_LIB}) \ No newline at end of file diff --git a/examples/Makefile b/examples/Makefile index a535e7b589..0925f39933 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -19,7 +19,7 @@ CFLAGS += -Wstrict-prototypes .PHONY: clean static_lib -all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example +all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example simple_example: static_lib simple_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) @@ -54,11 +54,14 @@ multi_processes_example: static_lib multi_processes_example.cc speedb_is_awesome_example: static_lib speedb_is_awesome_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) +speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) clean: - rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example ./speedb_is_awesome_example + rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example ./speedb_is_awesome_example ./speedb_with_ttl_example static_lib: LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib diff --git a/examples/speedb_with_ttl_example.cc b/examples/speedb_with_ttl_example.cc new file mode 100644 index 0000000000..1d665773be --- /dev/null +++ b/examples/speedb_with_ttl_example.cc @@ -0,0 +1,70 @@ +// Copyright (C) 2023 Speedb Ltd. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rocksdb/db.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/utilities/db_ttl.h" + +using namespace ROCKSDB_NAMESPACE; + +#if defined(OS_WIN) +std::string kDBPath = "C:\\Windows\\TEMP\\speedb_is_awesome_example"; +#else +std::string kDBPath = "/tmp/speedb_is_awesome_example"; +#endif + +int main() { + // Open the storage + DBWithTTL* db = nullptr; + Options options; + // create the DB if it's not already present + options.create_if_missing = true; + Status s = DBWithTTL::Open(options, kDBPath, &db, 1); + assert(s.ok()); + + // append new entry + std::string key1 = "key_1"; + std::string key2 = "key_2"; + std::string put_value = "Speedb is awesome!"; + s = db->Put(WriteOptions(), key1, put_value); + s = db->Put(WriteOptions(), key2, put_value); + assert(s.ok()); + sleep(2); + // retrieve entry + std::string get_value; + ReadOptions ropts = ReadOptions(); + ropts.skip_expired_data = true; + s = db->Get(ropts, key1, &get_value); + if (s.IsNotFound()) { + std::cout << "Key has been expired as expected" << std::endl; + } + std::vector keys = {key1, key2}; + std::vector values; + auto statuses = db->MultiGet(ropts, keys, &values); + for (const auto& i : statuses) { + if (s.IsNotFound()) { + std::cout << "Key has been expired as expected by MultiGet" << std::endl; + } + } + // close DB + s = db->Close(); + delete db; + assert(s.ok()); + return 0; +} \ No newline at end of file diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 34fa1b0da7..da0e854224 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -334,6 +334,10 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Default: nullptr std::shared_ptr sst_partitioner_factory = nullptr; + // When open DB with TTL it will be the time to live of each object + // Default: 0 + int32_t ttl_skip_expired_data = 0; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options @@ -1721,6 +1725,10 @@ struct ReadOptions { // Default: true bool optimize_multiget_for_io; + // If true, DB with TTL will not Get keys that reached their timeout + // Default: false + bool skip_expired_data; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/options/options.cc b/options/options.cc index ea1364416e..3c4155b891 100644 --- a/options/options.cc +++ b/options/options.cc @@ -697,7 +697,8 @@ ReadOptions::ReadOptions() value_size_soft_limit(std::numeric_limits::max()), adaptive_readahead(false), async_io(false), - optimize_multiget_for_io(true) {} + optimize_multiget_for_io(true), + skip_expired_data(false) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -723,6 +724,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) value_size_soft_limit(std::numeric_limits::max()), adaptive_readahead(false), async_io(false), - optimize_multiget_for_io(true) {} + optimize_multiget_for_io(true), + skip_expired_data(false) {} } // namespace ROCKSDB_NAMESPACE diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index d15f6fd8e9..d4afc901c6 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -428,6 +428,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, sst_partitioner_factory), sizeof(std::shared_ptr)}, + {offsetof(struct ColumnFamilyOptions, ttl_skip_expired_data), + sizeof(int32_t)}, }; char* options_ptr = new char[sizeof(ColumnFamilyOptions)]; diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index 1c2c6daa1f..947c0b1981 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -6,6 +6,8 @@ #include "utilities/ttl/db_ttl_impl.h" +#include + #include "db/write_batch_internal.h" #include "file/filename.h" #include "logging/logging.h" @@ -170,6 +172,8 @@ void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, options->merge_operator.reset( new TtlMergeOperator(options->merge_operator, clock)); } + + options->ttl_skip_expired_data = ttl; } static std::unordered_map ttl_type_info = { @@ -498,6 +502,16 @@ Status DBWithTTLImpl::Get(const ReadOptions& options, if (!st.ok()) { return st; } + if (options.skip_expired_data) { + Options opts = GetOptions(column_family); + int32_t ttl = opts.ttl_skip_expired_data; + SystemClock* clock = (opts.env == nullptr) + ? SystemClock::Default().get() + : opts.env->GetSystemClock().get(); + if (IsStale(*value, ttl, clock)) { + return Status::NotFound(); + } + } return StripTS(value); } @@ -506,6 +520,7 @@ std::vector DBWithTTLImpl::MultiGet( const std::vector& column_family, const std::vector& keys, std::vector* values) { auto statuses = db_->MultiGet(options, column_family, keys, values); + bool is_stale = false; for (size_t i = 0; i < keys.size(); ++i) { if (!statuses[i].ok()) { continue; @@ -514,7 +529,23 @@ std::vector DBWithTTLImpl::MultiGet( if (!statuses[i].ok()) { continue; } - statuses[i] = StripTS(&(*values)[i]); + if (options.skip_expired_data) { + is_stale = false; + Options opts = GetOptions(column_family[i]); + int32_t ttl = opts.ttl_skip_expired_data; + SystemClock* clock = (opts.env == nullptr) + ? SystemClock::Default().get() + : opts.env->GetSystemClock().get(); + if (IsStale((*values)[i], ttl, clock)) { + statuses[i] = Status::NotFound(); + is_stale = true; + } + } + if (!is_stale) { + statuses[i] = StripTS(&(*values)[i]); + } else { + (*values)[i] = ""; + } } return statuses; } diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index 7c43501a43..72669555c5 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -95,7 +95,7 @@ class DBWithTTLImpl : public DBWithTTL { static Status StripTS(PinnableSlice* str); - static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp + static const uint8_t kTSLength = sizeof(int32_t); // size of timestamp static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 @@ -201,6 +201,8 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory { const Customizable* Inner() const override { return user_comp_filter_factory_.get(); } + int32_t GetTtl() { return ttl_; } + SystemClock* GetClock() { return clock_; } private: int32_t ttl_; diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 982051ac77..897b444411 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -726,6 +726,28 @@ TEST_F(TtlTest, DeleteRangeTest) { CloseTtl(); } +TEST_F(TtlTest, SkipExpiredTtlGetTest) { + OpenTtl(1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), "a", "val")); + env_->Sleep(2); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::string value; + ASSERT_TRUE(db_ttl_->Get(ropts, "a", &value).IsNotFound()); + CloseTtl(); +} +TEST_F(TtlTest, SkipExpiredTtlGetMultiTest) { + OpenTtl(1); + ASSERT_OK(db_ttl_->Put(WriteOptions(), "a", "val")); + env_->Sleep(4); + auto ropts = ReadOptions(); + ropts.skip_expired_data = true; + std::vector values; + ASSERT_TRUE(db_ttl_->MultiGet(ropts, {"a"}, &values)[0].IsNotFound()); + ASSERT_TRUE(values[0].empty()); + CloseTtl(); +} + class DummyFilter : public CompactionFilter { public: bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,