Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip expired object while using DBWithTtl #403

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build_tools/check-sources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ fi

git grep -n 'using namespace' -- ':!build_tools' ':!docs' \
':!third-party/folly/folly/lang/Align.h' \
':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h'
':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h' \
':!examples/speedb_with_ttl_example.cc'
ofriedma marked this conversation as resolved.
Show resolved Hide resolved
if [ "$?" != "1" ]; then
echo '^^^^ Do not use "using namespace"'
BAD=1
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ DECLARE_bool(test_cf_consistency);
DECLARE_bool(test_multi_ops_txns);
DECLARE_int32(threads);
DECLARE_int32(ttl);
DECLARE_bool(skip_expired_data);
DECLARE_int32(value_size_mult);
DECLARE_int32(compaction_readahead_size);
DECLARE_bool(enable_pipelined_write);
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ DEFINE_int32(ttl, -1,
"Carefully specify a large value such that verifications on "
"deleted values don't fail");

DEFINE_bool(skip_expired_data, false, "If true, will skip keys expired by TTL");

DEFINE_int32(value_size_mult, 8,
"Size of value will be this number times rand_int(1,3) bytes");

Expand Down
58 changes: 43 additions & 15 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,12 @@ void StressTest::OperateDb(ThreadState* thread) {
read_opts.async_io = FLAGS_async_io;
read_opts.adaptive_readahead = FLAGS_adaptive_readahead;
read_opts.readahead_size = FLAGS_readahead_size;
if (gflags::GetCommandLineFlagInfoOrDie("ttl").is_default &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does changing the ttl to 0 (non default) mean with respect to the skip_expired_data flag and should it be included in this check and error reporting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add a check for the ttl with skip_expired_data to be greater than 0

FLAGS_skip_expired_data && FLAGS_ttl < 1) {
auto error_msg =
IOStatus::InvalidArgument("skip_expired_data must be set with ttl");
}
read_opts.skip_expired_data = FLAGS_skip_expired_data;
WriteOptions write_opts;
if (FLAGS_rate_limit_auto_wal_flush) {
write_opts.rate_limiter_priority = Env::IO_USER;
Expand Down Expand Up @@ -2492,7 +2498,6 @@ void StressTest::Open(SharedState* shared) {

Status s;

if (FLAGS_ttl == -1) {
std::vector<std::string> existing_column_families;
s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
&existing_column_families); // ignore errors
Expand Down Expand Up @@ -2611,11 +2616,44 @@ void StressTest::Open(SharedState* shared) {
#endif // !ROCKSDB_LITE
{
if (db_preload_finished_.load() && FLAGS_read_only) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
cf_descriptors, &column_families_, &db_);
if (FLAGS_ttl == -1) {
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
cf_descriptors, &column_families_, &db_);
} else {
DBWithTTL* dbttl;
std::vector<int32_t> ttls;
for (size_t i = 0; i < cf_descriptors.size(); ++i) {
ttls.push_back(FLAGS_ttl);
}
s = DBWithTTL::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &dbttl, ttls, true);
if (!s.ok()) {
fprintf(stderr, "Cannot read only open db with ttl. %s\n",
s.ToString().c_str());
exit(1);
}
db_ = dbttl;
}
} else {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
if (FLAGS_ttl == -1) {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &db_);
} else {
std::vector<int32_t> ttls;
for (size_t i = 0; i < cf_descriptors.size(); ++i) {
ttls.push_back(FLAGS_ttl);
}
DBWithTTL* dbttl;

s = DBWithTTL::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
&column_families_, &dbttl, ttls);
if (!s.ok()) {
fprintf(stderr, "Cannot open db with ttl. %s\n",
s.ToString().c_str());
exit(1);
}
db_ = dbttl;
}
}
}

Expand Down Expand Up @@ -2743,16 +2781,6 @@ void StressTest::Open(SharedState* shared) {
exit(1);
#endif // !ROCKSDB_LITE
}
} else {
#ifndef ROCKSDB_LITE
DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
db_ = db_with_ttl;
#else
fprintf(stderr, "TTL is not supported in LITE mode\n");
exit(1);
#endif
}

if (FLAGS_preserve_unverified_changes) {
// Up until now, no live file should have become obsolete due to these
Expand Down
4 changes: 3 additions & 1 deletion examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ optimistic_transaction_example
options_file_example
simple_example
transaction_example
speedb_is_awesome_example
rocksdb_backup_restore_example
speedb_is_awesome_example
speedb_with_ttl_example
10 changes: 10 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ add_executable(multi_processes_example
multi_processes_example.cc)
target_link_libraries(multi_processes_example
${ROCKSDB_LIB})

add_executable(speedb_with_ttl_example
speedb_with_ttl_example.cc)
target_link_libraries(speedb_with_ttl_example
${ROCKSDB_LIB})

add_executable(speedb_is_awesome_example
speedb_is_awesome_example.cc)
target_link_libraries(speedb_is_awesome_example
${ROCKSDB_LIB})
10 changes: 8 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ CFLAGS += -Wstrict-prototypes

.PHONY: clean static_lib

all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example
all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example \
transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example

simple_example: static_lib simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
Expand Down Expand Up @@ -54,11 +55,16 @@ multi_processes_example: static_lib multi_processes_example.cc
speedb_is_awesome_example: static_lib speedb_is_awesome_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example ./speedb_is_awesome_example
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o \
./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example \
./speedb_is_awesome_example ./speedb_with_ttl_example

static_lib:
LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib
133 changes: 133 additions & 0 deletions examples/speedb_with_ttl_example.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (C) 2023 Speedb Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <unistd.h>

#include <iostream>

#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/db_ttl.h"

using namespace ROCKSDB_NAMESPACE;

ofriedma marked this conversation as resolved.
Show resolved Hide resolved
#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\speedb_with_ttl_example";
#else
std::string kDBPath = "/tmp/speedb_with_ttl_example";
#endif
ofriedma marked this conversation as resolved.
Show resolved Hide resolved

int main() {
// Open the storage
DBWithTTL* db = nullptr;
Options options;
// Create the DB if it's not already present
options.create_if_missing = true;
// Configure time to live of the objects
int32_t ttl = 1;
// Keys to insert to the db
std::string key1 = "key_1";
std::string key2 = "key_2";
std::string key3 = "key_3";
// Value for the keys
std::string put_value1 = "1 Speedb is awesome!";
std::string put_value2 = "2 Speedb is awesome!";
std::string put_value3 = "3 Speedb is awesome!";
// Value to fetch from the db
std::string get_value;
ReadOptions ropts = ReadOptions();
// Configure that we will not get keys that have been expired by ttl.
// The default behaviour is to return keys until the compation will delete.
ropts.skip_expired_data = true;
std::vector<rocksdb::Slice> keys = {key1, key2};
std::vector<std::string> values;

Status s = DBWithTTL::Open(options, kDBPath, &db, ttl);
assert(s.ok());

s = db->Put(WriteOptions(), key1, put_value1);
assert(s.ok());
s = db->Put(WriteOptions(), key2, put_value2);
assert(s.ok());
s = db->Get(ropts, key1, &get_value);
assert(s.ok());
std::cout << "The value returned by db Get before expiration is: "
<< std::endl
<< get_value << std::endl
<< std::endl;
std::cout << "The value returned by db MultiGet before expiration are: "
<< std::endl;
auto statuses = db->MultiGet(ropts, keys, &values);
for (const auto& status : statuses) {
assert(status.ok());
}
for (const auto& value : values) {
std::cout << value << std::endl;
}
std::cout << std::endl;
// sleeps more than the ttl to emphasize the expiration of objects
sleep(ttl + 1);

s = db->Get(ropts, key1, &get_value);
ofriedma marked this conversation as resolved.
Show resolved Hide resolved
if (s.IsNotFound()) {
std::cout << "Key has been expired as expected by Get" << std::endl;
}
statuses = db->MultiGet(ropts, keys, &values);
for (const auto& i : statuses) {
if (i.IsNotFound()) {
std::cout << "Key has been expired as expected by MultiGet" << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to cout the key's contents that has been expired

}
}
ofriedma marked this conversation as resolved.
Show resolved Hide resolved
ropts.skip_expired_data = false;
std::cout << "Keys actually stored but expired by MultiGet, without "
"skip_expired_data"
<< std::endl;
statuses = db->MultiGet(ropts, keys, &values);
for (size_t i = 0; i < statuses.size(); ++i) {
if (statuses[i].ok()) {
std::cout << keys[i].ToStringView() << ":" << values[i] << std::endl;
}
}
ropts.skip_expired_data = true;
db->SetTtl(1000);
s = db->Get(ropts, key1, &get_value);
assert(s.ok());
// close DB
s = db->Close();
s = DBWithTTL::Open(options, kDBPath, &db, ttl, true);
sleep(ttl + 1);
s = db->Get(ropts, key1, &get_value);
assert(s.IsNotFound());
std::cout << "Open DB with read_only will not return expired keys "
<< std::endl
<< std::endl;
db->Close();
s = DBWithTTL::Open(options, kDBPath, &db, ttl);
ropts = ReadOptions();
ropts.skip_expired_data = true;
s = db->Put(WriteOptions(), key3, put_value3);
auto it = db->NewIterator(ropts);

assert(s.ok());

it->SeekToFirst();
if (it->Valid()) {
// Because key_1 and key_2 expired this line should print key_3
std::cout << "skip to: " << it->key().ToStringView() << std::endl;
udi-speedb marked this conversation as resolved.
Show resolved Hide resolved
}
delete it;
delete db;
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although it shows our partial / hybrid support, it may be helpful to also add an iterator that will return the keys after expiration, to make sure the user understands that this is how it's working at the moment.

4 changes: 4 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,10 @@ struct ReadOptions {
// Default: true
bool optimize_multiget_for_io;

// If true, DB with TTL will not Get keys that reached their timeout
// Default: false
bool skip_expired_data = false;

ReadOptions();
ReadOptions(bool cksum, bool cache);
};
Expand Down
Loading