Skip to content

Commit

Permalink
Skip expired object when using DBWithTtl
Browse files Browse the repository at this point in the history
Currently even if objects have been expired we still return them until
compaction delete those objects.

This commit adds a skip_expired_data to the ReadOptions, if this option
is true, it Get and MultiGet will not return objects that have been
expired by the configured ttl.
Currently this feature does not include support of the flag for Merge
operation or for iterator.
  • Loading branch information
Or Friedmann committed Feb 16, 2023
1 parent ea5b771 commit 3ddffa1
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 7 deletions.
4 changes: 3 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ optimistic_transaction_example
options_file_example
simple_example
transaction_example
speedb_is_awesome_example
speedb_is_awesome_example
speedb_with_ttl_example
rocksdb_backup_restore_example
12 changes: 12 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,15 @@ add_executable(multi_processes_example
multi_processes_example.cc)
target_link_libraries(multi_processes_example
${ROCKSDB_LIB})

add_executable(speedb_with_ttl_example
EXCLUDE_FROM_ALL
speedb_with_ttl_example.cc)
target_link_libraries(speedb_with_ttl_example)
$({ROCKSDB_LIB})

add_executable(speedb_is_awesome_examle
EXCLUDE_FROM_ALL
speedb_is_awesome_examle.cc)
target_link_libraries(speedb_is_awesome_examle)
$({ROCKSDB_LIB})
7 changes: 5 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ CFLAGS += -Wstrict-prototypes

.PHONY: clean static_lib

all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example
all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example

simple_example: static_lib simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
Expand Down Expand Up @@ -54,11 +54,14 @@ multi_processes_example: static_lib multi_processes_example.cc
speedb_is_awesome_example: static_lib speedb_is_awesome_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example ./speedb_is_awesome_example
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example ./speedb_is_awesome_example ./speedb_with_ttl_example

static_lib:
LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib
70 changes: 70 additions & 0 deletions examples/speedb_with_ttl_example.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (C) 2023 Speedb Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <unistd.h>

#include <iostream>

#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/db_ttl.h"

using namespace ROCKSDB_NAMESPACE;

#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\speedb_is_awesome_example";
#else
std::string kDBPath = "/tmp/speedb_is_awesome_example";
#endif

int main() {
// Open the storage
DBWithTTL* db = nullptr;
Options options;
// create the DB if it's not already present
options.create_if_missing = true;
Status s = DBWithTTL::Open(options, kDBPath, &db, 1);
assert(s.ok());

// append new entry
std::string key1 = "key_1";
std::string key2 = "key_2";
std::string put_value = "Speedb is awesome!";
s = db->Put(WriteOptions(), key1, put_value);
s = db->Put(WriteOptions(), key2, put_value);
assert(s.ok());
sleep(2);
// retrieve entry
std::string get_value;
ReadOptions ropts = ReadOptions();
ropts.skip_expired_data = true;
s = db->Get(ropts, key1, &get_value);
if (s.IsNotFound()) {
std::cout << "Key has been expired as expected" << std::endl;
}
std::vector<rocksdb::Slice> keys = {key1, key2};
std::vector<std::string> values;
auto statuses = db->MultiGet(ropts, keys, &values);
for (const auto& i : statuses) {
if (s.IsNotFound()) {
std::cout << "Key has been expired as expected by MultiGet" << std::endl;
}
}
// close DB
s = db->Close();
delete db;
assert(s.ok());
return 0;
}
8 changes: 8 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: nullptr
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;

// When open DB with TTL it will be the time to live of each object
// Default: 0
int32_t ttl_skip_expired_data = 0;

// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
Expand Down Expand Up @@ -1721,6 +1725,10 @@ struct ReadOptions {
// Default: true
bool optimize_multiget_for_io;

// If true, DB with TTL will not Get keys that reached their timeout
// Default: false
bool skip_expired_data;

ReadOptions();
ReadOptions(bool cksum, bool cache);
};
Expand Down
6 changes: 4 additions & 2 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,8 @@ ReadOptions::ReadOptions()
value_size_soft_limit(std::numeric_limits<uint64_t>::max()),
adaptive_readahead(false),
async_io(false),
optimize_multiget_for_io(true) {}
optimize_multiget_for_io(true),
skip_expired_data(false) {}

ReadOptions::ReadOptions(bool cksum, bool cache)
: snapshot(nullptr),
Expand All @@ -723,6 +724,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
value_size_soft_limit(std::numeric_limits<uint64_t>::max()),
adaptive_readahead(false),
async_io(false),
optimize_multiget_for_io(true) {}
optimize_multiget_for_io(true),
skip_expired_data(false) {}

} // namespace ROCKSDB_NAMESPACE
2 changes: 2 additions & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
sizeof(std::shared_ptr<ConcurrentTaskLimiter>)},
{offsetof(struct ColumnFamilyOptions, sst_partitioner_factory),
sizeof(std::shared_ptr<SstPartitionerFactory>)},
{offsetof(struct ColumnFamilyOptions, ttl_skip_expired_data),
sizeof(int32_t)},
};

char* options_ptr = new char[sizeof(ColumnFamilyOptions)];
Expand Down
33 changes: 32 additions & 1 deletion utilities/ttl/db_ttl_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "utilities/ttl/db_ttl_impl.h"

#include <iostream>

#include "db/write_batch_internal.h"
#include "file/filename.h"
#include "logging/logging.h"
Expand Down Expand Up @@ -170,6 +172,8 @@ void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
options->merge_operator.reset(
new TtlMergeOperator(options->merge_operator, clock));
}

options->ttl_skip_expired_data = ttl;
}

static std::unordered_map<std::string, OptionTypeInfo> ttl_type_info = {
Expand Down Expand Up @@ -498,6 +502,16 @@ Status DBWithTTLImpl::Get(const ReadOptions& options,
if (!st.ok()) {
return st;
}
if (options.skip_expired_data) {
Options opts = GetOptions(column_family);
int32_t ttl = opts.ttl_skip_expired_data;
SystemClock* clock = (opts.env == nullptr)
? SystemClock::Default().get()
: opts.env->GetSystemClock().get();
if (IsStale(*value, ttl, clock)) {
return Status::NotFound();
}
}
return StripTS(value);
}

Expand All @@ -506,6 +520,7 @@ std::vector<Status> DBWithTTLImpl::MultiGet(
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
auto statuses = db_->MultiGet(options, column_family, keys, values);
bool is_stale = false;
for (size_t i = 0; i < keys.size(); ++i) {
if (!statuses[i].ok()) {
continue;
Expand All @@ -514,7 +529,23 @@ std::vector<Status> DBWithTTLImpl::MultiGet(
if (!statuses[i].ok()) {
continue;
}
statuses[i] = StripTS(&(*values)[i]);
if (options.skip_expired_data) {
is_stale = false;
Options opts = GetOptions(column_family[i]);
int32_t ttl = opts.ttl_skip_expired_data;
SystemClock* clock = (opts.env == nullptr)
? SystemClock::Default().get()
: opts.env->GetSystemClock().get();
if (IsStale((*values)[i], ttl, clock)) {
statuses[i] = Status::NotFound();
is_stale = true;
}
}
if (!is_stale) {
statuses[i] = StripTS(&(*values)[i]);
} else {
(*values)[i] = "";
}
}
return statuses;
}
Expand Down
4 changes: 3 additions & 1 deletion utilities/ttl/db_ttl_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DBWithTTLImpl : public DBWithTTL {

static Status StripTS(PinnableSlice* str);

static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
static const uint8_t kTSLength = sizeof(int32_t); // size of timestamp

static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8

Expand Down Expand Up @@ -201,6 +201,8 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
const Customizable* Inner() const override {
return user_comp_filter_factory_.get();
}
int32_t GetTtl() { return ttl_; }
SystemClock* GetClock() { return clock_; }

private:
int32_t ttl_;
Expand Down
22 changes: 22 additions & 0 deletions utilities/ttl/ttl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,28 @@ TEST_F(TtlTest, DeleteRangeTest) {
CloseTtl();
}

TEST_F(TtlTest, SkipExpiredTtlGetTest) {
OpenTtl(1);
ASSERT_OK(db_ttl_->Put(WriteOptions(), "a", "val"));
env_->Sleep(2);
auto ropts = ReadOptions();
ropts.skip_expired_data = true;
std::string value;
ASSERT_TRUE(db_ttl_->Get(ropts, "a", &value).IsNotFound());
CloseTtl();
}
TEST_F(TtlTest, SkipExpiredTtlGetMultiTest) {
OpenTtl(1);
ASSERT_OK(db_ttl_->Put(WriteOptions(), "a", "val"));
env_->Sleep(4);
auto ropts = ReadOptions();
ropts.skip_expired_data = true;
std::vector<std::string> values;
ASSERT_TRUE(db_ttl_->MultiGet(ropts, {"a"}, &values)[0].IsNotFound());
ASSERT_TRUE(values[0].empty());
CloseTtl();
}

class DummyFilter : public CompactionFilter {
public:
bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
Expand Down

0 comments on commit 3ddffa1

Please sign in to comment.