Skip to content

Commit

Permalink
Allow a different clock to be set from Python
Browse files Browse the repository at this point in the history
to trigger the bug in #496
  • Loading branch information
qc00 authored and Joshua Loo committed Aug 9, 2023
1 parent b2a93bc commit 1d72362
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 14 deletions.
11 changes: 11 additions & 0 deletions cpp/arcticdb/util/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ struct LinearClock {
}
};

struct ManualClock {
inline static std::atomic<entity::timestamp> time_{0};

static entity::timestamp nanos_since_epoch() {
return LinearClock::time_.load();
}
static entity::timestamp coarse_nanos_since_epoch() {
return LinearClock::time_.load();
}
};


} // namespace arcticdb::util

Expand Down
11 changes: 7 additions & 4 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
#include <arcticdb/python/gil_lock.hpp>

namespace arcticdb::version_store {

template<class ClockType>
LocalVersionedEngine::LocalVersionedEngine(
const std::shared_ptr<storage::Library>& library,
const std::optional<std::string>& license_key ARCTICDB_UNUSED) :
store_(std::make_shared<async::AsyncStore<util::SysClock>>(library, codec::default_lz4_codec(), encoding_version(library->config()))),
const ClockType&) :
store_(std::make_shared<async::AsyncStore<ClockType>>(library, codec::default_lz4_codec(), encoding_version(library->config()))),
symbol_list_(std::make_shared<SymbolList>(version_map_)){
configure(library->config());
ARCTICDB_RUNTIME_DEBUG(log::version(), "Created versioned engine at {} for library path {} with config {}", uintptr_t(this),
Expand All @@ -44,6 +44,9 @@ LocalVersionedEngine::LocalVersionedEngine(
}
}

template LocalVersionedEngine::LocalVersionedEngine(const std::shared_ptr<storage::Library>& library, const util::SysClock&);
template LocalVersionedEngine::LocalVersionedEngine(const std::shared_ptr<storage::Library>& library, const util::ManualClock&);

folly::Future<folly::Unit> LocalVersionedEngine::delete_unreferenced_pruned_indexes(
const std::vector<AtomKey> &pruned_indexes,
const AtomKey& key_to_keep
Expand Down Expand Up @@ -834,7 +837,7 @@ folly::Future<folly::Unit> LocalVersionedEngine::delete_trees_responsibly(
log::version().debug("Forbidden: {} total of data keys", data_keys_not_to_be_deleted.size());
storage::RemoveOpts remove_opts;
remove_opts.ignores_missing_key_ = true;

std::vector<entity::VariantKey> vks_column_stats;
std::transform(keys_to_delete->begin(),
keys_to_delete->end(),
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ struct IndexKeyAndUpdateInfo{
class LocalVersionedEngine : public VersionedEngine {

public:
template<class ClockType = util::SysClock>
explicit LocalVersionedEngine(
const std::shared_ptr<storage::Library>& library,
const std::optional<std::string>& license_key = std::nullopt);
const ClockType& = util::SysClock{} // Only used to allow the template variable to be inferred
);

virtual ~LocalVersionedEngine() = default;

Expand Down
11 changes: 10 additions & 1 deletion cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def_readwrite("row_filter",&UpdateQuery::row_filter);

py::class_<PythonVersionStore>(version, "PythonVersionStore")
.def(py::init<std::shared_ptr<storage::Library>, std::optional<std::string>>(),
.def(py::init([](const std::shared_ptr<storage::Library>& library, std::optional<std::string>) {
return PythonVersionStore(library);
}),
py::arg("library"),
py::arg("license_key") = std::nullopt)
.def("write_partitioned_dataframe",
Expand Down Expand Up @@ -685,6 +687,13 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
"Returns latest timestamp of a symbol")
;

py::class_<ManualClockVersionStore, PythonVersionStore>(version, "ManualClockVersionStore")
.def(py::init<const std::shared_ptr<storage::Library>&>())
.def_property_static("time",
[]() { return util::ManualClock::time_.load(); },
[](entity::timestamp ts) { util::ManualClock::time_ = ts; })
;

py::class_<LocalVersionedEngine>(version, "VersionedEngine")
.def(py::init<std::shared_ptr<storage::Library>>())
.def("read_versioned_dataframe",
Expand Down
7 changes: 6 additions & 1 deletion cpp/arcticdb/version/symbol_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,13 @@ bool SymbolList::can_update_symbol_list(const std::shared_ptr<Store>& store,
});

seg.end_row();
store->write_sync(KeyType::SYMBOL_LIST, 0, StreamId{action}, IndexValue{symbol}, IndexValue{symbol},
try {
store->write_sync(KeyType::SYMBOL_LIST, 0, StreamId{ action }, IndexValue{ symbol }, IndexValue{ symbol },
std::move(seg));
} catch (const DuplicateKeyException& e [[unused]]) {
// Both version and content hash are fixed, so collision is possible
ARCTICDB_DEBUG(log::storage(), "Symbol list DuplicateKeyException: {}", e.what());
}
}

SymbolList::CollectionType SymbolList::load_from_version_keys(const std::shared_ptr<Store>& store) {
Expand Down
5 changes: 2 additions & 3 deletions cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ using namespace arcticdb::entity;
namespace as = arcticdb::stream;
using namespace arcticdb::storage;

PythonVersionStore::PythonVersionStore(const std::shared_ptr<storage::Library>& library, const std::optional<std::string>& license_key) :
LocalVersionedEngine(library, license_key) {
}
template PythonVersionStore::PythonVersionStore(const std::shared_ptr<storage::Library>& library, const util::SysClock& ct);
template PythonVersionStore::PythonVersionStore(const std::shared_ptr<storage::Library>& library, const util::ManualClock& ct);

VersionedItem PythonVersionStore::write_dataframe_specific_version(
const StreamId& stream_id,
Expand Down
12 changes: 9 additions & 3 deletions cpp/arcticdb/version/version_store_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ namespace as = arcticdb::stream;
class PythonVersionStore : public LocalVersionedEngine {

public:
explicit PythonVersionStore(
const std::shared_ptr<storage::Library>& library,
const std::optional<std::string>& license_key = std::nullopt);
template<class ClockType = util::SysClock>
explicit PythonVersionStore(const std::shared_ptr<storage::Library>& library, const ClockType& ct = util::SysClock{}) :
LocalVersionedEngine(library, ct) {
}

VersionedItem write_dataframe_specific_version(
const StreamId& stream_id,
Expand Down Expand Up @@ -339,6 +340,11 @@ class PythonVersionStore : public LocalVersionedEngine {
void delete_snapshot_sync(const SnapshotId& snap_name, const VariantKey& snap_key);
};

struct ManualClockVersionStore : PythonVersionStore {
ManualClockVersionStore(const std::shared_ptr<storage::Library>& library) :
PythonVersionStore(library, util::ManualClock{}) {}
};

inline std::vector<std::variant<ReadResult, DataError>> frame_to_read_result(std::vector<ReadVersionOutput>&& keys_frame_and_descriptors) {
std::vector<std::variant<ReadResult, DataError>> read_results;
read_results.reserve(keys_frame_and_descriptors.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from arcticdb.version_store._store import UNSUPPORTED_S3_CHARS, MAX_SYMBOL_SIZE, VersionedItem
from arcticdb_ext.exceptions import _ArcticLegacyCompatibilityException
from arcticdb_ext.storage import KeyType, NoDataFoundException
from arcticdb_ext.version_store import NoSuchVersionException, StreamDescriptorMismatch
from arcticdb_ext.version_store import NoSuchVersionException, StreamDescriptorMismatch, ManualClockVersionStore
from arcticc.pb2.descriptors_pb2 import NormalizationMetadata # Importing from arcticdb dynamically loads arcticc.pb2
from arcticdb.util.test import (
sample_dataframe,
Expand Down Expand Up @@ -1736,6 +1736,8 @@ def test_dynamic_schema_similar_index_column_dataframe_multiple_col(lmdb_version

def test_restore_version(version_store_factory):
lmdb_version_store = version_store_factory(col_per_group=2, row_per_segment=2)
# Triggers bug https://github.com/man-group/ArcticDB/issues/469 by freezing time
lmdb_version_store.version_store = ManualClockVersionStore(lmdb_version_store._library)
symbol = "test_restore_version"
df1 = get_sample_dataframe(20, 4)
df1.index = pd.DatetimeIndex([pd.Timestamp.now()] * len(df1))
Expand Down

0 comments on commit 1d72362

Please sign in to comment.