diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 2353fe3f41..b27fb2eda0 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -547,56 +547,76 @@ TEST(VersionMap, StorageLogging) { ASSERT_EQ(tomb_keys, 3u); } -std::shared_ptr write_two_versions( - std::shared_ptr store, - std::shared_ptr version_map, - const StreamId& id) { +struct VersionChainOperation { + enum class Type { + WRITE, + TOMBSTONE, + TOMBSTONE_ALL + } type {Type::WRITE}; + + std::optional version_id { std::nullopt }; +}; + +/** + * @param operations write operations with their specified version_id in this order. + */ +std::shared_ptr write_versions( + const std::shared_ptr& store, + const std::shared_ptr& version_map, + const StreamId& id, + const std::vector& operations) { auto entry = version_map->check_reload( store, id, LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, __FUNCTION__); - auto key1 = atom_key_with_version(id, 0, 0); - version_map->do_write(store, key1, entry); - write_symbol_ref(store, key1, std::nullopt, entry->head_.value()); - auto key2 = atom_key_with_version(id, 1, 1); - version_map->do_write(store, key2, entry); - // We override the symbol ref without a prev_key on purpose. This way we'll only load the version=1 from the ref key - write_symbol_ref(store, key2, std::nullopt, entry->head_.value()); + for (const auto& [type, version_id_opt]: operations) { + switch (type) { + case VersionChainOperation::Type::WRITE: { + auto key = atom_key_with_version(id, *version_id_opt, *version_id_opt); + version_map->do_write(store, key, entry); + write_symbol_ref(store, key, std::nullopt, entry->head_.value()); + break; + } + case VersionChainOperation::Type::TOMBSTONE: { + version_map->write_tombstone(store, *version_id_opt, id, entry); + break; + } + case VersionChainOperation::Type::TOMBSTONE_ALL: { + std::optional key = std::nullopt; + if (version_id_opt.has_value()) { + key = atom_key_builder() + .version_id(*version_id_opt) + .build(id, KeyType::VERSION); + } + version_map->tombstone_from_key_or_all(store, id, key); + break; + } + } + } return entry; } // Produces the following version chain: v0 <- tombstone_all <- v1 <- v2 <- tombstone void write_alternating_deleted_undeleted(std::shared_ptr store, std::shared_ptr version_map, StreamId id) { - auto entry = version_map->check_reload( - store, - id, - LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}, - __FUNCTION__); - - auto key1 = atom_key_with_version(id, 0, 0); - auto key2 = atom_key_with_version(id, 1, 1); - auto key3 = atom_key_with_version(id, 2, 2); - - // Write version 0 - version_map->do_write(store, key1, entry); - write_symbol_ref(store, key1, std::nullopt, entry->head_.value()); - - // Tombstone_all on version 0 - version_map->delete_all_versions(store, id); - - // Write version 1 - version_map->do_write(store, key2, entry); - write_symbol_ref(store, key2, std::nullopt, entry->head_.value()); - - // Write version 2 - version_map->do_write(store, key3, entry); - write_symbol_ref(store, key3, std::nullopt, entry->head_.value()); + using Type = VersionChainOperation::Type; + write_versions(store, version_map, id, { + {Type::WRITE, 0}, + {Type::TOMBSTONE_ALL}, + {Type::WRITE, 1}, + {Type::WRITE, 2}, + {Type::TOMBSTONE, 2} + }); +} - // Tombstone version 2 - version_map->write_tombstone(store, VersionId{2}, id, entry, timestamp{3}); +void write_versions(std::shared_ptr store, std::shared_ptr version_map, StreamId id, int number_of_versions) { + std::vector version_chain; + for (int i = 0; i < number_of_versions; i++) { + version_chain.emplace_back(VersionChainOperation::Type::WRITE, i); + } + write_versions(store, version_map, id, version_chain); } TEST(VersionMap, FollowingVersionChain){ @@ -669,8 +689,14 @@ TEST(VersionMap, FollowingVersionChainWithCaching){ // LATEST should still be cached, but the cached entry now needs to have no undeleted keys check_loads_versions(LoadStrategy{LoadType::LATEST, LoadObjective::INCLUDE_DELETED}, 2, 0); + EXPECT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(-1)})); // FROM_TIME UNDELETED_ONLY should no longer be cached even though we used the same request before because the undeleted key it went to got deleted. So it will load the entire version chain check_loads_versions(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(10)}, 3, 0); + // We have the full version chain loaded, so has_cached_entry should always return true (even when requesting timestamp before earliest version) + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(-1)})); + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(-1)})); + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED})); + EXPECT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY})); // We add a new undeleted key auto key4 = atom_key_with_version(id, 3, 5); @@ -692,7 +718,7 @@ TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { auto version_map = std::make_shared(); StreamId id{"test"}; - write_two_versions(store, version_map, id); + write_versions(store, version_map, id, 2); // Deleting should add a TOMBSTONE_ALL which should end searching for undeleted versions early. version_map->delete_all_versions(store, id); @@ -726,13 +752,22 @@ TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { } } -TEST(VersionMap, CacheInvalidation) { +TEST(VersionMap, HasCachedEntry) { ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); - // Set up the version chain v0(tombstone_all) <- v1 <- v2(tombstoned) + // Set up the version chain v0 <- v1(tombstone_all) <- v2 <- v3(tombstoned) auto store = std::make_shared(); auto version_map = std::make_shared(); StreamId id{"test"}; - write_alternating_deleted_undeleted(store, version_map, id); + using Type = VersionChainOperation::Type; + std::vector version_chain = { + {Type::WRITE, 0}, + {Type::WRITE, 1}, + {Type::TOMBSTONE_ALL}, + {Type::WRITE, 2}, + {Type::WRITE, 3}, + {Type::TOMBSTONE, 3} + }; + write_versions(store, version_map, id, version_chain); auto check_caching = [&](LoadStrategy to_load, LoadStrategy to_check_if_cached, bool expected_outcome){ auto clean_version_map = std::make_shared(); @@ -750,48 +785,47 @@ TEST(VersionMap, CacheInvalidation) { } }; - auto load_all_param = LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED}; - auto load_all_undeleted_param = LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY}; - check_caching(load_all_param, load_all_undeleted_param, true); - check_caching(load_all_undeleted_param, load_all_param, false); - - constexpr auto num_versions = 3u; + constexpr auto num_versions = 4u; std::vector should_load_to_v[num_versions] = { // Different parameters which should all load to v0 std::vector{ LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-3)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-4)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(0)}, + LoadStrategy{LoadType::ALL, LoadObjective::INCLUDE_DELETED} }, // Different parameters which should all load to v1 std::vector{ LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-2)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-3)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(1)}, - LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, - static_cast(2)}, // when include_deleted=false FROM_TIME searches for an undeleted version - LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, + LoadStrategy{LoadType::ALL, LoadObjective::UNDELETED_ONLY} }, // Different parameters which should all load to v2 std::vector{ LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(2)}, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-1)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-2)}, LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(2)}, + LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, + static_cast(3)}, // when include_deleted=false FROM_TIME searches for an undeleted version + LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, + }, + // Different parameters which should all load to v3 + std::vector{ + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(3)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(-1)}, + LoadStrategy{LoadType::FROM_TIME, LoadObjective::INCLUDE_DELETED, static_cast(3)}, LoadStrategy{LoadType::LATEST, LoadObjective::INCLUDE_DELETED}, } }; for (auto i=0u; i(); StreamId id{"test"}; - write_two_versions(store, version_map, id); + write_versions(store, version_map, id, 2); // Use a clean version_map version_map = std::make_shared(); @@ -841,45 +875,104 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAfterLoad) { TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { using namespace arcticdb; - // Given - symbol with 2 versions - load downto version 0 + // Given - symbol with 3 versions - load downto version 1 or 0 // never time-invalidate the cache so we can test our other cache invalidation logic ScopedConfig sc("VersionMap.ReloadInterval", std::numeric_limits::max()); - auto store = std::make_shared(); - - auto version_map = std::make_shared(); StreamId id{"test"}; - write_two_versions(store, version_map, id); + std::shared_ptr version_map; + std::shared_ptr store; + + auto validate_load_strategy = [&](const LoadStrategy& load_strategy, bool should_be_cached, int expected_cached = -1) { + if (should_be_cached) { + // Store is nullptr as we shouldn't go to storage + auto entry = version_map->check_reload(nullptr, id, load_strategy, __FUNCTION__); + ASSERT_EQ(std::ranges::count_if(entry->keys_, [](const auto& key) { return key.type() == KeyType::TABLE_INDEX;}), expected_cached); + } + else { + ASSERT_FALSE(version_map->has_cached_entry(id, load_strategy)); + } + }; - // Use a clean version_map + for (const auto& load_strategy : {LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, + LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}}) { + store = std::make_shared(); + version_map = std::make_shared(); + + // Version chain is v0 <- v1 <- v2 + write_versions(store, version_map, id, 3); + + // Use a clean version_map + version_map = std::make_shared(); + const bool is_loaded_to_0 = load_strategy.load_until_version_ == 0; + auto entry = version_map->check_reload( + store, + id, + load_strategy, + __FUNCTION__); + + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, is_loaded_to_0, is_loaded_to_0 ? 3 : 0); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-1)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-3)}, is_loaded_to_0, is_loaded_to_0 ? 3 : 0); + + // When - we delete version 2 + auto tombstone_key = version_map->write_tombstone(store, VersionId{2}, id, entry); + + // We should not invalidate the cache because the version we loaded to is still undeleted + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 3 : 2); + ASSERT_EQ(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}), is_loaded_to_0); + + // When - we delete all versions without reloading + version_map->write_tombstone_all_key_internal(store, tombstone_key, entry); + + // Tombstone All should not invalidate cache as it deletes everything so all undeleted versions have been loaded. + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 3 : 2); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, true, is_loaded_to_0 ? 3 : 2); + + // When - we add a new version so that tombstone all isn't the latest + auto key = atom_key_with_version(id, 5, 5); + version_map->do_write(store, key, entry); + write_symbol_ref(store, key, std::nullopt, entry->head_.value()); + + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, is_loaded_to_0 ? 4 : 3); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, is_loaded_to_0 ? 4 : 3); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, true, is_loaded_to_0 ? 4 : 3); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, true, is_loaded_to_0 ? 4 : 3); + } + + // Given tombstone all isn't the latest version + // v0 <- v1 <- v2 <- Tombstone_all(v1) + store = std::make_shared(); version_map = std::make_shared(); + using Type = VersionChainOperation::Type; + write_versions(store, version_map, id, { + {Type::WRITE, 0}, + {Type::WRITE, 1}, + {Type::WRITE, 2}, + {Type::TOMBSTONE, 1} + }); + version_map = std::make_shared(); auto entry = version_map->check_reload( - store, - id, - LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}, - __FUNCTION__); - - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::DOWNTO, LoadObjective::UNDELETED_ONLY, static_cast(-2)})); - - // When - we delete version 1 - auto tombstone_key = version_map->write_tombstone(store, VersionId{1}, id, entry); - - // We should not invalidate the cache because the version we loaded to is still undeleted - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); - - // When - we delete all versions without reloading - version_map->write_tombstone_all_key_internal(store, tombstone_key, entry); - - // We should invalidate cached undeleted checks - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)})); - ASSERT_FALSE(version_map->has_cached_entry(id, LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)})); + store, + id, + LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, + __FUNCTION__); + + validate_load_strategy(LoadStrategy{LoadType::LATEST, LoadObjective::UNDELETED_ONLY}, true, 1); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(2)}, true, 1); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(1)}, false); + validate_load_strategy(LoadStrategy{LoadType::FROM_TIME, LoadObjective::UNDELETED_ONLY, static_cast(0)}, false); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(1)}, false); + validate_load_strategy(LoadStrategy{LoadType::DOWNTO, LoadObjective::INCLUDE_DELETED, static_cast(0)}, false); } TEST(VersionMap, CompactionUpdateCache) { @@ -979,8 +1072,8 @@ TEST(VersionMap, TombstoneAllFromEntry) { ASSERT_EQ(version_id, 0); - // With cached entry from the write ops - // Tombstone all should succeed as we are not relying on the ref key + // With cached entry from the write ops + // Tombstone all should succeed as we are not relying on the ref key version_map->tombstone_from_key_or_all(store, id, dummy_key, entry); auto [maybe_prev_cached_entry, deleted_cached_entry] = get_latest_version(store, version_map, id); diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index 2fc3232055..c9c6bef136 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -581,7 +581,11 @@ class VersionMapImpl { return false; } - LoadType cached_load_type = entry->load_strategy_.load_type_; + const bool has_loaded_everything = entry->load_progress_.is_earliest_version_loaded; + const bool has_loaded_earliest_undeleted = entry->tombstone_all_.has_value() && entry->load_progress_.oldest_loaded_index_version_ <= entry->tombstone_all_->version_id(); + if (has_loaded_everything || (!requested_load_strategy.should_include_deleted() && has_loaded_earliest_undeleted)) { + return true; + } switch (requested_load_type) { case LoadType::NOT_LOADED: @@ -606,14 +610,9 @@ class VersionMapImpl { return cached_timestamp <= requested_load_strategy.load_from_time_.value(); } case LoadType::ALL: - // We can use cache when it was populated by a ALL call, in which case it is only unsafe to use - // when the cache is of undeleted versions and the request is for all versions - if (cached_load_type==LoadType::ALL){ - return entry->load_strategy_.should_include_deleted() || !requested_load_strategy.should_include_deleted(); - } - return false; + case LoadType::UNKNOWN: default: - util::raise_rte("Unexpected load type in cache {}", cached_load_type); + return false; } } @@ -763,12 +762,10 @@ class VersionMapImpl { const auto clock_unsync_tolerance = ConfigsMap::instance()->get_int("VersionMap.UnsyncTolerance", DEFAULT_CLOCK_UNSYNC_TOLERANCE); entry->last_reload_time_ = Clock::nanos_since_epoch() - clock_unsync_tolerance; - entry->load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}; // FUTURE: to make more thread-safe with #368 auto temp = std::make_shared(*entry); load_via_ref_key(store, stream_id, load_strategy, temp); std::swap(*entry, *temp); - entry->load_strategy_ = load_strategy; util::check(entry->keys_.empty() || entry->head_, "Non-empty VersionMapEntry should set head"); if (validate_) diff --git a/cpp/arcticdb/version/version_map_entry.hpp b/cpp/arcticdb/version/version_map_entry.hpp index 944ad007ad..309f0d474c 100644 --- a/cpp/arcticdb/version/version_map_entry.hpp +++ b/cpp/arcticdb/version/version_map_entry.hpp @@ -203,6 +203,7 @@ struct LoadProgress { VersionId oldest_loaded_undeleted_index_version_ = std::numeric_limits::max(); timestamp earliest_loaded_timestamp_ = std::numeric_limits::max(); timestamp earliest_loaded_undeleted_timestamp_ = std::numeric_limits::max(); + bool is_earliest_version_loaded { false }; }; struct VersionMapEntry { @@ -241,7 +242,6 @@ struct VersionMapEntry { tombstone_all_.reset(); keys_.clear(); load_progress_ = LoadProgress{}; - load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}; } bool empty() const { @@ -258,14 +258,13 @@ struct VersionMapEntry { swap(left.last_reload_time_, right.last_reload_time_); swap(left.tombstone_all_, right.tombstone_all_); swap(left.head_, right.head_); - swap(left.load_strategy_, right.load_strategy_); swap(left.load_progress_, right.load_progress_); } // Below four functions used to return optional of the tombstone, but copying keys is expensive and only // one function was actually interested in the key, so they now return bool. See get_tombstone(). bool has_individual_tombstone(VersionId version_id) const { - return tombstones_.count(version_id) != 0; + return tombstones_.contains(version_id); } bool is_tombstoned_via_tombstone_all(VersionId version_id) const { @@ -432,7 +431,6 @@ struct VersionMapEntry { } std::optional head_; - LoadStrategy load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::INCLUDE_DELETED}; timestamp last_reload_time_ = 0; LoadProgress load_progress_; std::deque keys_; diff --git a/cpp/arcticdb/version/version_utils.hpp b/cpp/arcticdb/version/version_utils.hpp index 73e1cc3153..d522fa662b 100644 --- a/cpp/arcticdb/version/version_utils.hpp +++ b/cpp/arcticdb/version/version_utils.hpp @@ -108,6 +108,7 @@ inline std::optional read_segment_with_keys( load_progress.oldest_loaded_undeleted_index_version_ = std::min(load_progress.oldest_loaded_undeleted_index_version_, oldest_loaded_undeleted_index); load_progress.earliest_loaded_timestamp_ = std::min(load_progress.earliest_loaded_timestamp_, earliest_loaded_timestamp); load_progress.earliest_loaded_undeleted_timestamp_ = std::min(load_progress.earliest_loaded_undeleted_timestamp_, earliest_loaded_undeleted_timestamp); + load_progress.is_earliest_version_loaded = !next.has_value(); return next; } diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index c2c6d3dda8..bedafdb719 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -2706,3 +2706,72 @@ def test_missing_first_version_key_batch(basic_store): vits = lib.batch_read(symbols, as_ofs=write_times) for x in range(num_items): assert_equal(vits[symbols[x]].data, expected[x]) + +@pytest.mark.parametrize("use_caching", [True, False]) +def test_version_chain_cache(basic_store, use_caching): + timeout = sys.maxsize if use_caching else 0 + lib = basic_store + symbol = "test" + # Will write 10 versions + num_of_versions = 10 + dataframes = [sample_dataframe() for _ in range(num_of_versions)] + timestamps = [] + + def assert_correct_dataframe(timestamp_and_version_index, deleted_versions): + # Version + version_index = timestamp_and_version_index + if i in deleted_versions: + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=version_index) + else: + assert_equal(lib.read(symbol, as_of=version_index).data, dataframes[i]) + + # Timestamp + timestamp_index = timestamp_and_version_index + def find_expected_version(first_to_check): + for num in range(first_to_check, -1, -1): + if num not in deleted_versions: + return num + return None + + for timestamp, is_before in [(timestamps[timestamp_index].before, True), (timestamps[timestamp_index].after, False)]: + first_version_to_check = timestamp_index - 1 if is_before else timestamp_index + expected_version_to_find = find_expected_version(first_version_to_check) + if expected_version_to_find is None: + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=timestamp) + else: + assert_frame_equal(lib.read(symbol, as_of=timestamp).data, dataframes[expected_version_to_find]) + + with config_context("VersionMap.ReloadInterval", timeout): + # Write versions and keep track of time before and after writing + for i in range(num_of_versions): + with distinct_timestamps(lib) as timestamp: + lib.write(symbol, dataframes[i]) + timestamps.append(timestamp) + + # Validate the most recent version + assert_equal(lib.read(symbol).data, dataframes[-1]) + + # Check reading specific versions + for i in range(num_of_versions): + assert_correct_dataframe(i, {}) + + # Ensure reading a non-existent version raises an exception + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=pd.Timestamp(0)) + + # Delete specific versions + delete_versions = {1, 3, 7, 9} + for version in delete_versions: + lib.delete_version(symbol, version) + for i in range(num_of_versions): + assert_correct_dataframe(i, delete_versions) + + with pytest.raises(NoSuchVersionException): + lib.read(symbol, as_of=pd.Timestamp(0)) + + # Delete all versions + lib.delete(symbol) + for i in range(num_of_versions): + assert_correct_dataframe(i, set(range(num_of_versions))) \ No newline at end of file