Skip to content

Commit

Permalink
TTL: Skip expired object when using DBWithTtl (#403)
Browse files Browse the repository at this point in the history
Currently even if objects have been expired we still return them until
compaction delete those objects.

This commit adds a skip_expired_data to the ReadOptions, if this option
is true, it Get, MultiGet and iterator will not return objects that have been
expired by the configured ttl.
Currently this feature does not include support for Merge operations.
For Read Only and iterator pinned data the skip_expired_data will be
enforced and expired keys will not be returned.

Resolves: #394, #417

Pull Request: #403
  • Loading branch information
ofriedma authored and udi-speedb committed Dec 4, 2023
1 parent 559acc5 commit 1a4190d
Show file tree
Hide file tree
Showing 13 changed files with 862 additions and 37 deletions.
3 changes: 2 additions & 1 deletion build_tools/check-sources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ fi

git grep -n 'using namespace' -- ':!build_tools' ':!docs' \
':!third-party/folly/folly/lang/Align.h' \
':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h'
':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h' \
':!examples/speedb_with_ttl_example.cc'
if [ "$?" != "1" ]; then
echo '^^^^ Do not use "using namespace"'
BAD=1
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ DECLARE_bool(test_cf_consistency);
DECLARE_bool(test_multi_ops_txns);
DECLARE_int32(threads);
DECLARE_int32(ttl);
DECLARE_bool(skip_expired_data);
DECLARE_int32(value_size_mult);
DECLARE_int32(compaction_readahead_size);
DECLARE_bool(enable_pipelined_write);
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ DEFINE_int32(ttl, -1,
"Carefully specify a large value such that verifications on "
"deleted values don't fail");

DEFINE_bool(skip_expired_data, false, "If true, will skip keys expired by TTL");

DEFINE_int32(value_size_mult, 8,
"Size of value will be this number times rand_int(1,3) bytes");

Expand Down
60 changes: 43 additions & 17 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,12 @@ void StressTest::OperateDb(ThreadState* thread) {
read_opts.adaptive_readahead = FLAGS_adaptive_readahead;
read_opts.readahead_size = FLAGS_readahead_size;
read_opts.auto_readahead_size = FLAGS_auto_readahead_size;
if (gflags::GetCommandLineFlagInfoOrDie("ttl").is_default &&
FLAGS_skip_expired_data && FLAGS_ttl < 1) {
auto error_msg =
IOStatus::InvalidArgument("skip_expired_data must be set with ttl");
}
read_opts.skip_expired_data = FLAGS_skip_expired_data;
WriteOptions write_opts;
if (FLAGS_rate_limit_auto_wal_flush) {
write_opts.rate_limiter_priority = Env::IO_USER;
Expand Down Expand Up @@ -2721,7 +2727,6 @@ void StressTest::Open(SharedState* shared, bool reopen) {

Status s;

if (FLAGS_ttl == -1) {
std::vector<std::string> existing_column_families;
s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
&existing_column_families); // ignore errors
Expand Down Expand Up @@ -2840,11 +2845,44 @@ void StressTest::Open(SharedState* shared, bool reopen) {
}
} else {
if (db_preload_finished_.load() && FLAGS_read_only) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
cf_descriptors, &column_families_, &db_);
if (FLAGS_ttl == -1) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
cf_descriptors, &column_families_, &db_);
} else {
DBWithTTL* dbttl;
std::vector<int32_t> ttls;
for (size_t i = 0; i < cf_descriptors.size(); ++i) {
ttls.push_back(FLAGS_ttl);
}
s = DBWithTTL::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &dbttl, ttls, true);
if (!s.ok()) {
fprintf(stderr, "Cannot read only open db with ttl. %s\n",
s.ToString().c_str());
exit(1);
}
db_ = dbttl;
}
} else {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
if (FLAGS_ttl == -1) {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
} else {
std::vector<int32_t> ttls;
for (size_t i = 0; i < cf_descriptors.size(); ++i) {
ttls.push_back(FLAGS_ttl);
}
DBWithTTL* dbttl;

s = DBWithTTL::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &dbttl, ttls);
if (!s.ok()) {
fprintf(stderr, "Cannot open db with ttl. %s\n",
s.ToString().c_str());
exit(1);
}
db_ = dbttl;
}
}
}

Expand Down Expand Up @@ -2982,18 +3020,6 @@ void StressTest::Open(SharedState* shared, bool reopen) {
assert(s.ok());
assert(cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
}
} else {
DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
db_ = db_with_ttl;
assert(options_.disable_auto_compactions);
if (s.ok()) {
s = db_->DisableFileDeletions();
}
if (s.ok()) {
s = db_->EnableAutoCompaction(column_families_);
}
}

if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
Expand Down
4 changes: 3 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ options_file_example
rocksdb_backup_restore_example
simple_example
transaction_example
speedb_is_awesome_example
rocksdb_backup_restore_example
speedb_is_awesome_example
speedb_with_ttl_example
10 changes: 10 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ add_executable(multi_processes_example
multi_processes_example.cc)
target_link_libraries(multi_processes_example
${ROCKSDB_LIB})

add_executable(speedb_with_ttl_example
speedb_with_ttl_example.cc)
target_link_libraries(speedb_with_ttl_example
${ROCKSDB_LIB})

add_executable(speedb_is_awesome_example
speedb_is_awesome_example.cc)
target_link_libraries(speedb_is_awesome_example
${ROCKSDB_LIB})
10 changes: 8 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ CFLAGS += -Wstrict-prototypes

.PHONY: clean static_lib

all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example
all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example \
transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example

simple_example: static_lib simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
Expand Down Expand Up @@ -54,11 +55,16 @@ multi_processes_example: static_lib multi_processes_example.cc
speedb_is_awesome_example: static_lib speedb_is_awesome_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example ./speedb_is_awesome_example
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o \
./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example \
./speedb_is_awesome_example ./speedb_with_ttl_example

static_lib:
LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib
133 changes: 133 additions & 0 deletions examples/speedb_with_ttl_example.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (C) 2023 Speedb Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <unistd.h>

#include <iostream>

#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/db_ttl.h"

using namespace ROCKSDB_NAMESPACE;

#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\speedb_with_ttl_example";
#else
std::string kDBPath = "/tmp/speedb_with_ttl_example";
#endif

int main() {
// Open the storage
DBWithTTL* db = nullptr;
Options options;
// Create the DB if it's not already present
options.create_if_missing = true;
// Configure time to live of the objects
int32_t ttl = 1;
// Keys to insert to the db
std::string key1 = "key_1";
std::string key2 = "key_2";
std::string key3 = "key_3";
// Value for the keys
std::string put_value1 = "1 Speedb is awesome!";
std::string put_value2 = "2 Speedb is awesome!";
std::string put_value3 = "3 Speedb is awesome!";
// Value to fetch from the db
std::string get_value;
ReadOptions ropts = ReadOptions();
// Configure that we will not get keys that have been expired by ttl.
// The default behaviour is to return keys until the compation will delete.
ropts.skip_expired_data = true;
std::vector<rocksdb::Slice> keys = {key1, key2};
std::vector<std::string> values;

Status s = DBWithTTL::Open(options, kDBPath, &db, ttl);
assert(s.ok());

s = db->Put(WriteOptions(), key1, put_value1);
assert(s.ok());
s = db->Put(WriteOptions(), key2, put_value2);
assert(s.ok());
s = db->Get(ropts, key1, &get_value);
assert(s.ok());
std::cout << "The value returned by db Get before expiration is: "
<< std::endl
<< get_value << std::endl
<< std::endl;
std::cout << "The value returned by db MultiGet before expiration are: "
<< std::endl;
auto statuses = db->MultiGet(ropts, keys, &values);
for (const auto& status : statuses) {
assert(status.ok());
}
for (const auto& value : values) {
std::cout << value << std::endl;
}
std::cout << std::endl;
// sleeps more than the ttl to emphasize the expiration of objects
sleep(ttl + 1);

s = db->Get(ropts, key1, &get_value);
if (s.IsNotFound()) {
std::cout << "Key has been expired as expected by Get" << std::endl;
}
statuses = db->MultiGet(ropts, keys, &values);
for (const auto& i : statuses) {
if (i.IsNotFound()) {
std::cout << "Key has been expired as expected by MultiGet" << std::endl;
}
}
ropts.skip_expired_data = false;
std::cout << "Keys actually stored but expired by MultiGet, without "
"skip_expired_data"
<< std::endl;
statuses = db->MultiGet(ropts, keys, &values);
for (size_t i = 0; i < statuses.size(); ++i) {
if (statuses[i].ok()) {
std::cout << keys[i].ToStringView() << ":" << values[i] << std::endl;
}
}
ropts.skip_expired_data = true;
db->SetTtl(1000);
s = db->Get(ropts, key1, &get_value);
assert(s.ok());
// close DB
s = db->Close();
s = DBWithTTL::Open(options, kDBPath, &db, ttl, true);
sleep(ttl + 1);
s = db->Get(ropts, key1, &get_value);
assert(s.IsNotFound());
std::cout << "Open DB with read_only will not return expired keys "
<< std::endl
<< std::endl;
db->Close();
s = DBWithTTL::Open(options, kDBPath, &db, ttl);
ropts = ReadOptions();
ropts.skip_expired_data = true;
s = db->Put(WriteOptions(), key3, put_value3);
auto it = db->NewIterator(ropts);

assert(s.ok());

it->SeekToFirst();
if (it->Valid()) {
// Because key_1 and key_2 expired this line should print key_3
std::cout << "skip to: " << it->key().ToStringView() << std::endl;
}
delete it;
delete db;
return 0;
}
4 changes: 4 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,10 @@ struct ReadOptions {
ReadOptions() {}
ReadOptions(bool _verify_checksums, bool _fill_cache);
explicit ReadOptions(Env::IOActivity _io_activity);

// If true, DB with TTL will not Get keys that reached their timeout
// Default: false
bool skip_expired_data = false;
};

// Options that control write operations
Expand Down
Loading

0 comments on commit 1a4190d

Please sign in to comment.