Skip to content

Commit

Permalink
Make timing-dependent tests more reliable (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
qc00 committed Aug 2, 2023
1 parent d3e3249 commit 7ce532f
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 94 deletions.
6 changes: 6 additions & 0 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,12 @@ class LocalVersionedEngine : public VersionedEngine {
void _test_set_store(std::shared_ptr<Store> store);
std::shared_ptr<VersionMap> _test_get_version_map();

/** Get the time used by the Store (e.g. that would be used in the AtomKey).
For testing purposes only. */
entity::timestamp get_store_current_timestamp_for_tests() {
return store()->current_timestamp();
}

protected:
VersionedItem compact_incomplete_dynamic(
const StreamId& stream_id,
Expand Down
5 changes: 4 additions & 1 deletion cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,13 +651,16 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def("latest_timestamp",
&PythonVersionStore::latest_timestamp,
"Returns latest timestamp of a symbol")
.def("_get_store_current_timestamp",
&PythonVersionStore::get_store_current_timestamp_for_tests,
"For testing purposes only")
;

py::class_<ManualClockVersionStore, PythonVersionStore>(version, "ManualClockVersionStore")
.def(py::init<const std::shared_ptr<storage::Library>&>())
.def_property_static("time",
[]() { return util::ManualClock::time_.load(); },
[](entity::timestamp ts) { util::ManualClock::time_ = ts; })
[](const py::class_<ManualClockVersionStore>&, entity::timestamp ts) { util::ManualClock::time_ = ts; })
;

py::class_<LocalVersionedEngine>(version, "VersionedEngine")
Expand Down
31 changes: 31 additions & 0 deletions python/arcticdb/util/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import pytest
import string
import random
import time
import attr
from six import PY3
from copy import deepcopy
from functools import wraps
from packaging import version

from arcticdb.config import Defaults
from arcticdb.log import configure, logger_by_name
from arcticdb.version_store import NativeVersionStore
from arcticdb.version_store._custom_normalizers import CustomNormalizer
from arcticc.pb2.descriptors_pb2 import NormalizationMetadata
from arcticc.pb2.logger_pb2 import LoggerConfig, LoggersConfig
Expand Down Expand Up @@ -490,3 +493,31 @@ def regularize_dataframe(df):
output = output.reset_index(drop=True)
output = output.astype("float", errors="ignore")
return output


@attr.s(slots=True, auto_attribs=True)
class BeforeAfterTimestamp:
before: pd.Timestamp
after: Optional[pd.Timestamp]


@contextmanager
def distinct_timestamps(lib: NativeVersionStore):
"""Ensures the timestamp used by ArcticDB operations before, during and leaving the context are all different.
Yields
------
BeforeAfterTimestamp
"""
get_ts = lib.version_store._get_store_current_timestamp
before = get_ts()
while get_ts() == before:
time.sleep(0.000001) # 1us - The worst resolution in our clock implementations
out = BeforeAfterTimestamp(pd.Timestamp(before, unit="ns"), None)
try:
yield out
finally:
right_after = get_ts()
while get_ts() == right_after:
time.sleep(0.000001)
out.after = pd.Timestamp(get_ts(), unit="ns")
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
StreamDescriptorMismatch,
UserInputException,
)
from arcticdb_ext.storage import NoDataFoundException
from arcticdb.flattener import Flattener
from arcticdb.version_store import NativeVersionStore
from arcticdb.version_store._custom_normalizers import CustomNormalizer, register_normalizer
Expand All @@ -45,10 +44,11 @@
assert_frame_equal,
assert_series_equal,
config_context,
distinct_timestamps,
)
from arcticdb_ext.tools import AZURE_SUPPORT
from tests.util.date import DateRange
from arcticdb_ext import set_config_int, unset_config_int


if sys.platform == "linux":
SMOKE_TEST_VERSION_STORES = [
Expand Down Expand Up @@ -785,17 +785,17 @@ def test_is_pickled_by_snapshot(lmdb_version_store):
def test_is_pickled_by_timestamp(lmdb_version_store):
symbol = "test"
will_be_pickled = [1, 2, 3]
lmdb_version_store.write(symbol, will_be_pickled)
time_after_first_write = pd.Timestamp.utcnow()
time.sleep(0.1)
with distinct_timestamps(lmdb_version_store) as first_write_timestamps:
lmdb_version_store.write(symbol, will_be_pickled)

not_pickled = pd.DataFrame({"a": np.arange(3)})
lmdb_version_store.write(symbol, not_pickled)
with distinct_timestamps(lmdb_version_store):
lmdb_version_store.write(symbol, not_pickled)

with pytest.raises(NoDataFoundException):
lmdb_version_store.read(symbol, pd.Timestamp(0))
assert lmdb_version_store.is_symbol_pickled(symbol) is False
assert lmdb_version_store.is_symbol_pickled(symbol, time_after_first_write) is True
assert lmdb_version_store.is_symbol_pickled(symbol, first_write_timestamps.after) is True
assert lmdb_version_store.is_symbol_pickled(symbol, pd.Timestamp(np.iinfo(np.int64).max)) is False


Expand Down Expand Up @@ -878,17 +878,13 @@ def test_list_versions_with_snapshots(lmdb_version_store):


def test_read_ts(lmdb_version_store):
lmdb_version_store.write("a", 1) # v0
time.sleep(0.001) # In case utcnow() has a lower precision and returning a timestamp before the write (#496)
time_after_first_write = pd.Timestamp.utcnow()

assert lmdb_version_store.read("a", as_of=time_after_first_write).version == 0
time.sleep(0.1)
lmdb_version_store.write("a", 2) # v1
time.sleep(0.11)

lmdb_version_store.write("a", 3) # v2
time.sleep(0.1)
with distinct_timestamps(lmdb_version_store) as first_write_timestamps:
lmdb_version_store.write("a", 1) # v0
assert lmdb_version_store.read("a", as_of=first_write_timestamps.after).version == 0
with distinct_timestamps(lmdb_version_store):
lmdb_version_store.write("a", 2) # v1
with distinct_timestamps(lmdb_version_store):
lmdb_version_store.write("a", 3) # v2
lmdb_version_store.write("a", 4) # v3
lmdb_version_store.snapshot("snap3")
versions = lmdb_version_store.list_versions()
Expand All @@ -912,7 +908,7 @@ def test_read_ts(lmdb_version_store):
assert vitem.version == 3
assert vitem.data == 4

vitem = lmdb_version_store.read("a", as_of=time_after_first_write)
vitem = lmdb_version_store.read("a", as_of=first_write_timestamps.after)
assert vitem.version == 0
assert vitem.data == 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import time
import numpy as np
from pandas import DataFrame, Timestamp
import pytest
import sys

from arcticdb.version_store import NativeVersionStore, VersionedItem
from arcticdb.exceptions import ArcticNativeNotYetImplemented
from arcticdb_ext.storage import NoDataFoundException
from arcticdb.util.test import assert_frame_equal
from arcticdb.util.test import assert_frame_equal, distinct_timestamps


# In the following lines, the naming convention is
Expand Down Expand Up @@ -85,22 +83,21 @@ def test_read_metadata_by_timestamp(lmdb_version_store):
symbol = "test_symbol"

metadata_v0 = {"something": 1}
lmdb_version_store.write(symbol, 1, metadata=metadata_v0) # v0
time_after_first_write = Timestamp.utcnow()
time.sleep(0.1)
with distinct_timestamps(lmdb_version_store) as first_write_timestamps:
lmdb_version_store.write(symbol, 1, metadata=metadata_v0) # v0

with pytest.raises(NoDataFoundException):
lmdb_version_store.read(symbol, as_of=Timestamp(0))

assert lmdb_version_store.read_metadata(symbol, as_of=time_after_first_write).metadata == metadata_v0
assert lmdb_version_store.read_metadata(symbol, as_of=first_write_timestamps.after).metadata == metadata_v0

metadata_v1 = {"something more": 2}
lmdb_version_store.write(symbol, 2, metadata=metadata_v1) # v1
time.sleep(0.11)
with distinct_timestamps(lmdb_version_store):
lmdb_version_store.write(symbol, 2, metadata=metadata_v1) # v1

metadata_v2 = {"something else": 3}
lmdb_version_store.write(symbol, 3, metadata=metadata_v2) # v2
time.sleep(0.1)
with distinct_timestamps(lmdb_version_store):
lmdb_version_store.write(symbol, 3, metadata=metadata_v2) # v2

metadata_v3 = {"nothing": 4}
lmdb_version_store.write(symbol, 4, metadata=metadata_v3) # v3
Expand All @@ -109,7 +106,7 @@ def test_read_metadata_by_timestamp(lmdb_version_store):
assert len(versions) == 4
sorted_versions_for_a = sorted([v for v in versions if v["symbol"] == symbol], key=lambda x: x["version"])

assert lmdb_version_store.read_metadata(symbol, as_of=time_after_first_write).metadata == metadata_v0
assert lmdb_version_store.read_metadata(symbol, as_of=first_write_timestamps.after).metadata == metadata_v0

ts_for_v1 = sorted_versions_for_a[1]["date"]
assert lmdb_version_store.read_metadata(symbol, as_of=ts_for_v1).metadata == metadata_v1
Expand Down
50 changes: 14 additions & 36 deletions python/tests/integration/arcticdb/version_store/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"""
import pytest
import numpy as np

from arcticdb_ext.exceptions import InternalException
from arcticdb.util.test import distinct_timestamps


def test_basic_snapshot_flow(lmdb_version_store):
Expand Down Expand Up @@ -237,55 +239,31 @@ def test_pruned_symbol_in_symbol_read_version(lmdb_version_store_tombstone_and_p
assert lib.read("a", as_of="snap").data == 1


import pandas as pd


def test_read_symbol_with_ts_in_snapshot(lmdb_version_store, sym):
lib = lmdb_version_store
lib.write(sym, 0)
lib.write(sym, 1)
time_after_second_write = pd.Timestamp.utcnow()
lib.snapshot("snap")
# After this write only version 1 exists via the snapshot
lib.write(sym, 2, prune_previous_version=True)
time_after_third_write = pd.Timestamp.utcnow()

assert lib.read(sym).data == 2
versions = lib.list_versions()
assert len(versions) == 2 # deleted for version 1

assert lib.read(sym, as_of=1).data == 1
assert lib.read(sym, as_of=time_after_second_write).version == 1
assert lib.read(sym, as_of=time_after_second_write).data == 1

lib.snapshot("snap1")
lib.delete_version(sym, 2)
assert lib.read(sym, as_of=2).data == 2 # still in snapshot
assert lib.read(sym, as_of=time_after_third_write).version == 2


def test_read_symbol_with_ts_in_snapshot_with_pruning(lmdb_version_store_tombstone_and_pruning, sym):
lib = lmdb_version_store_tombstone_and_pruning
@pytest.mark.parametrize(
"store", ["lmdb_version_store_v1", "lmdb_version_store_v2", "lmdb_version_store_tombstone_and_pruning"]
)
def test_read_symbol_with_ts_in_snapshot(store, request, sym):
lib = request.getfixturevalue(store)
lib.write(sym, 0)
lib.write(sym, 1)
time_after_second_write = pd.Timestamp.utcnow()
with distinct_timestamps(lib) as second_write_timestamps:
lib.write(sym, 1)
lib.snapshot("snap")
# After this write only version 1 exists via the snapshot
lib.write(sym, 2, prune_previous_version=True)
time_after_third_write = pd.Timestamp.utcnow()
with distinct_timestamps(lib) as third_write_timestamps:
lib.write(sym, 2, prune_previous_version=True)

assert lib.read(sym).data == 2
versions = lib.list_versions()
assert len(versions) == 2 # deleted for version 1

assert lib.read(sym, as_of=1).data == 1
assert lib.read(sym, as_of=time_after_second_write).version == 1
assert lib.read(sym, as_of=time_after_second_write).data == 1
assert lib.read(sym, as_of=second_write_timestamps.after).version == 1
assert lib.read(sym, as_of=second_write_timestamps.after).data == 1

lib.snapshot("snap1")
lib.delete_version(sym, 2)
assert lib.read(sym, as_of=2).data == 2 # still in snapshot
assert lib.read(sym, as_of=time_after_third_write).version == 2
assert lib.read(sym, as_of=third_write_timestamps.after).version == 2


def test_snapshot_random_versions_to_fail(lmdb_version_store_tombstone_and_pruning, sym):
Expand Down
39 changes: 13 additions & 26 deletions python/tests/unit/arcticdb/version_store/test_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import numpy as np
import pandas as pd
import pytest
import time

from arcticdb.util.test import assert_frame_equal
from arcticdb_ext.exceptions import InternalException
from arcticdb_ext.storage import KeyType, NoDataFoundException
from arcticdb_ext.version_store import ManualClockVersionStore
from arcticdb.version_store._normalization import NPDDataFrame
from arcticdb.util.test import sample_dataframe

Expand Down Expand Up @@ -56,53 +56,40 @@ def test_delete_version_with_update(version_store_factory, pos, sym):
assert_frame_equal(lmdb_version_store.read(symbol, 0).data, original_df)


@pytest.mark.skip(reason="Flaky test, needs investigation.")
def test_delete_by_timestamp(lmdb_version_store, sym):
symbol = sym
time_initial = pd.Timestamp.utcnow()
time.sleep(0.1)
now = lmdb_version_store.version_store.get_store_current_timestamp_for_tests()
lmdb_version_store.version_store = ManualClockVersionStore(lmdb_version_store._library)
minute_in_ns = 60 * int(1e9)

ManualClockVersionStore.time = now - 5 * minute_in_ns
lmdb_version_store.write(symbol, 1) # v0
time_after_v0 = pd.Timestamp.utcnow()
time.sleep(0.1)

ManualClockVersionStore.time = now - 4 * minute_in_ns
lmdb_version_store.write(symbol, 2) # v1
time_after_v1 = pd.Timestamp.utcnow()
time.sleep(0.1)

ManualClockVersionStore.time = now - 3 * minute_in_ns
lmdb_version_store.write(symbol, 3) # v2
time_after_v2 = pd.Timestamp.utcnow()
time.sleep(0.1)

ManualClockVersionStore.time = now - 2 * minute_in_ns
lmdb_version_store.write(symbol, 4) # v3
time_after_v3 = pd.Timestamp.utcnow()
time.sleep(0.1)

ManualClockVersionStore.time = now - 1 * minute_in_ns
lmdb_version_store.write(symbol, 5) # v4
time_after_v4 = pd.Timestamp.utcnow()

lmdb_version_store._prune_previous_versions(
symbol, keep_mins=(pd.Timestamp.utcnow() - time_initial).microseconds / 60.0 / 1000000
)
lmdb_version_store._prune_previous_versions(symbol, keep_mins=5.5)
assert len(lmdb_version_store.list_versions(symbol)) == 5

lmdb_version_store._prune_previous_versions(
symbol, keep_mins=(pd.Timestamp.utcnow() - time_after_v0).microseconds / 60.0 / 1000000
)
lmdb_version_store._prune_previous_versions(symbol, keep_mins=4.5)
assert len(lmdb_version_store.list_versions(symbol)) == 4

lmdb_version_store._prune_previous_versions(
symbol, keep_mins=(pd.Timestamp.utcnow() - time_after_v3).microseconds / 60.0 / 1000000, keep_version=2
)
lmdb_version_store._prune_previous_versions(symbol, keep_mins=1.5, keep_version=2)
assert len(lmdb_version_store.list_versions(symbol)) == 2

lmdb_version_store._prune_previous_versions(
symbol, keep_mins=(pd.Timestamp.utcnow() - time_after_v4).microseconds / 60.0 / 1000000
)
lmdb_version_store._prune_previous_versions(symbol, keep_mins=1.5)
assert len(lmdb_version_store.list_versions(symbol)) == 1


@pytest.mark.skip
def test_clear_lmdb(lmdb_version_store, sym):
symbol = sym
lmdb_version_store.version_store.clear()
Expand Down

0 comments on commit 7ce532f

Please sign in to comment.