Skip to content

Commit

Permalink
Bugfix 1173: Correctly apply sortedness checks when calling update wi…
Browse files Browse the repository at this point in the history
…th date_range argument
  • Loading branch information
alexowens90 committed Jan 16, 2024
1 parent bac4f54 commit 22498dd
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 107 deletions.
1 change: 1 addition & 0 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ void sorted_data_check_update(InputTensorFrame& frame, index::IndexSegmentReader
"When calling update, the input data must be a time series.");
bool input_data_is_sorted = frame.desc.get_sorted() == SortedValue::ASCENDING ||
frame.desc.get_sorted() == SortedValue::UNKNOWN;
// If changing this error message, the corresponding message in _normalization.py::restrict_data_to_date_range_only should also be updated
sorting::check<ErrorCode::E_UNSORTED_DATA>(
input_data_is_sorted,
"When calling update, the input data must be sorted.");
Expand Down
18 changes: 16 additions & 2 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from arcticc.pb2.storage_pb2 import VersionStoreConfig
from mmap import mmap
from collections import Counter
from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException
from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException, SortingException
from arcticdb.supported_types import DateRangeInput, time_types as supported_time_types
from arcticdb.util._versions import IS_PANDAS_TWO, IS_PANDAS_ZERO
from arcticdb.version_store.read_result import ReadResult
Expand Down Expand Up @@ -889,7 +889,10 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col

sort_status = _SortedValue.UNKNOWN
index = item.index
if hasattr(index, "is_monotonic_increasing"):
# Treat empty indexes as ascending so that all operations are valid
if index.empty:
sort_status = _SortedValue.ASCENDING
elif isinstance(index, (pd.DatetimeIndex, pd.PeriodIndex)):
if index.is_monotonic_increasing:
sort_status = _SortedValue.ASCENDING
elif index.is_monotonic_decreasing:
Expand Down Expand Up @@ -1351,6 +1354,17 @@ def _strip_tz(s, e):
if hasattr(data, "loc"):
if not data.index.get_level_values(0).tz:
start, end = _strip_tz(start, end)
if not data.index.is_monotonic_increasing:
# data.loc[...] scans forward through the index until hitting a value >= pd.to_datetime(end)
# If the input data is unsorted this produces non-intuitive results
# The copy below in data.loc[...] will recalculate is_monotonic_<in|de>creasing
# Therefore if data.loc[...] is sorted, but data is not the update will be allowed with unexpected results
# See https://github.com/man-group/ArcticDB/issues/1173 for more details
# We could set data.is_monotonic_<in|de>creasing to the values on input to this function after calling
# data.loc[...] and let version_core.cpp::sorted_data_check_update handle this, but that will be confusing
# as the frame input to sorted_data_check_update WILL be sorted. Instead, we fail early here, at the cost
# of duplicating exception messages.
raise SortingException("E_UNSORTED_DATA When calling update, the input data must be sorted.")
data = data.loc[pd.to_datetime(start) : pd.to_datetime(end)]
else: # non-Pandas, try to slice it anyway
if not getattr(data, "timezone", None):
Expand Down
2 changes: 2 additions & 0 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2602,6 +2602,7 @@ def get_info(self, symbol: str, version: Optional[VersionQueryInput] = None) ->
- normalization_metadata,
- type, `str`
- date_range, `tuple`
- sorted, `str`
"""
version_query = self._get_version_query(version)
read_options = _PythonVersionStoreReadOptions()
Expand Down Expand Up @@ -2637,6 +2638,7 @@ def batch_get_info(
- normalization_metadata,
- type, `str`
- date_range, `tuple`
- sorted, `str`
"""
throw_on_error = True
return self._batch_read_descriptor(symbols, as_ofs, throw_on_error)
Expand Down
13 changes: 13 additions & 0 deletions python/arcticdb/version_store/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ class SymbolDescription(NamedTuple):
date_range : Tuple[datetime.datetime, datetime.datetime]
The times in UTC that data for this symbol spans. If the data is not timeseries indexed then this value will be
``(datetime.datetime(1970, 1, 1), datetime.datetime(1970, 1, 1))``.
sorted : str
One of "ASCENDING", "DESCENDING", "UNSORTED", or "UNKNOWN":
ASCENDING - The data has a timestamp index, and is sorted in ascending order. Guarantees that operations such as
append, update, and read with date_range work as expected.
DESCENDING - The data has a timestamp index, and is sorted in descending order. Update and read with date_range
will not work.
UNSORTED - The data has a timestamp index, and is not sorted. Can only be created by calling write, write_batch,
append, or append_batch with validate_index set to False. Update and read with date_range will not
work.
UNKNOWN - Either the data does not have a timestamp index, or the data does have a timestamp index, but was
written by a client that predates this information being stored.
"""

columns: Tuple[NameWithDType]
Expand All @@ -134,6 +145,7 @@ class SymbolDescription(NamedTuple):
row_count: int
last_update_time: datetime64
date_range: Tuple[datetime.datetime, datetime.datetime]
sorted: str


class WritePayload:
Expand Down Expand Up @@ -1538,6 +1550,7 @@ def _info_to_desc(info: Dict[str, Any]) -> SymbolDescription:
last_update_time=last_update_time,
index_type=info["index_type"],
date_range=date_range,
sorted=info["sorted"],
)

def get_description(self, symbol: str, as_of: Optional[AsOf] = None) -> SymbolDescription:
Expand Down
2 changes: 2 additions & 0 deletions python/tests/integration/arcticdb/test_arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,8 @@ def test_get_description(arctic_library):
assert original_info.row_count == 4
assert info.last_update_time > original_info.last_update_time
assert info.last_update_time.tz == pytz.UTC
assert original_info.sorted == "ASCENDING"
assert info.sorted == "ASCENDING"


def test_tail(arctic_library):
Expand Down
8 changes: 7 additions & 1 deletion python/tests/integration/arcticdb/test_arctic_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ def test_get_description_batch_missing_keys(arctic_library):
assert batch[2].index[0] == ["named_index"]
assert batch[2].index_type == "index"
assert batch[2].row_count == 3
assert batch[2].sorted == "ASCENDING"


def test_get_description_batch_symbol_doesnt_exist(arctic_library):
Expand All @@ -980,6 +981,7 @@ def test_get_description_batch_symbol_doesnt_exist(arctic_library):
assert batch[0].index[0] == ["named_index"]
assert batch[0].index_type == "index"
assert batch[0].row_count == 4
assert batch[0].sorted == "ASCENDING"

assert isinstance(batch[1], DataError)
assert batch[1].symbol == "s2"
Expand Down Expand Up @@ -1016,6 +1018,7 @@ def test_get_description_batch_version_doesnt_exist(arctic_library):
assert batch[0].index[0] == ["named_index"]
assert batch[0].index_type == "index"
assert batch[0].row_count == 4
assert batch[0].sorted == "ASCENDING"

assert isinstance(batch[1], DataError)
assert batch[1].symbol == "s1"
Expand Down Expand Up @@ -1157,6 +1160,8 @@ def test_get_description_batch(arctic_library):
assert info.row_count == 6
assert original_info.row_count == 4
assert info.last_update_time > original_info.last_update_time
assert original_info.sorted == "ASCENDING"
assert info.sorted == "ASCENDING"


def test_get_description_batch_multiple_versions(arctic_library):
Expand Down Expand Up @@ -1245,7 +1250,8 @@ def test_get_description_batch_multiple_versions(arctic_library):
assert info.row_count == 6
assert original_info.row_count == 4
assert info.last_update_time > original_info.last_update_time

assert original_info.sorted == "ASCENDING"
assert info.sorted == "ASCENDING"

def test_read_description_batch_high_amount(arctic_library):
lib = arctic_library
Expand Down
2 changes: 1 addition & 1 deletion python/tests/unit/arcticdb/version_store/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def test_append_not_sorted_range_index_non_exception(lmdb_version_store):

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"
assert info["sorted"] == "UNKNOWN"

num_rows = 20
dtidx = pd.RangeIndex(num_initial_rows, num_initial_rows + num_rows, 1)
Expand Down
136 changes: 44 additions & 92 deletions python/tests/unit/arcticdb/version_store/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,98 +453,50 @@ def _create_product_candles_df(arr):
assert_frame_equal(after_arctic, before_arctic)


def test_update_input_not_sorted_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_initial_rows)
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == True

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_rows), 3)
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == False

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)


def test_update_existing_not_sorted_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_initial_rows), 3)
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "UNSORTED"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_rows)
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == True

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)


def test_update_input_descending_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_initial_rows)
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == True

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = reversed(pd.date_range(initial_timestamp, periods=num_rows))
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == False
assert df2.index.is_monotonic_decreasing == True

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)


def test_update_existing_descending_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = reversed(pd.date_range(initial_timestamp, periods=num_initial_rows))
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False
assert df.index.is_monotonic_decreasing == True

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "DESCENDING"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_rows)
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == True

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)
@pytest.mark.parametrize("existing_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED"))
@pytest.mark.parametrize("update_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED"))
@pytest.mark.parametrize("date_range_arg_provided", (True, False))
def test_update_sortedness_checks(
lmdb_version_store,
existing_df_sortedness,
update_df_sortedness,
date_range_arg_provided,
):
lib = lmdb_version_store
symbol = "test_update_sortedness_checks"
num_rows = 10
data = np.arange(num_rows)
ascending_idx = pd.date_range("2024-01-15", periods=num_rows)
ascending_df = pd.DataFrame({"col": data}, index=ascending_idx)
descending_df = pd.DataFrame({"col": data}, index=pd.DatetimeIndex(reversed(ascending_idx)))
unsorted_df = pd.DataFrame({"col": data}, index=pd.DatetimeIndex(np.roll(ascending_idx, num_rows // 2)))

date_range = (pd.Timestamp("2024-01-13"), pd.Timestamp("2024-01-17")) if date_range_arg_provided else None

if existing_df_sortedness == "ASCENDING":
write_df = ascending_df
elif existing_df_sortedness == "DESCENDING":
write_df = descending_df
else:
# existing_df_sortedness == "UNSORTED":
write_df = unsorted_df
lib.write(symbol, write_df)
assert lib.get_info(symbol)["sorted"] == existing_df_sortedness

if update_df_sortedness == "ASCENDING":
update_df = ascending_df
elif update_df_sortedness == "DESCENDING":
update_df = descending_df
else:
# update_df_sortedness == "UNSORTED":
update_df = unsorted_df

if existing_df_sortedness == "ASCENDING" and update_df_sortedness == "ASCENDING":
lib.update(symbol, update_df, date_range=date_range)
assert lib.get_info(symbol)["sorted"] == "ASCENDING"
else:
with pytest.raises(SortingException):
lib.update(symbol, update_df, date_range=date_range)


def test_update_not_sorted_input_multi_index_exception(lmdb_version_store):
Expand Down
35 changes: 24 additions & 11 deletions python/tests/unit/arcticdb/version_store/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import pytest
from arcticdb.exceptions import SortingException, NormalizationException
from arcticdb.util._versions import IS_PANDAS_TWO
from pandas import MultiIndex


Expand All @@ -30,7 +31,7 @@ def test_write_ascending_sorted_dataframe(lmdb_version_store):
lmdb_version_store.write(symbol, df)
assert df.index.is_monotonic_increasing == True
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"
assert info["sorted"] == "UNKNOWN"


def test_write_descending_sorted_dataframe(lmdb_version_store):
Expand All @@ -44,7 +45,7 @@ def test_write_descending_sorted_dataframe(lmdb_version_store):
lmdb_version_store.write(symbol, df)
assert df.index.is_monotonic_decreasing == True
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "DESCENDING"
assert info["sorted"] == "UNKNOWN"


def test_write_unsorted_sorted_dataframe(lmdb_version_store):
Expand All @@ -59,7 +60,7 @@ def test_write_unsorted_sorted_dataframe(lmdb_version_store):
assert df.index.is_monotonic_decreasing == False
assert df.index.is_monotonic_increasing == False
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "UNSORTED"
assert info["sorted"] == "UNKNOWN"


def test_write_unknown_sorted_dataframe(lmdb_version_store):
Expand Down Expand Up @@ -87,9 +88,9 @@ def test_write_not_sorted_non_validate_index(lmdb_version_store):
num_initial_rows = 20
num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_initial_rows), 3)
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_initial_rows), 0)
df = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False
# assert df.index.is_monotonic_increasing == False

lmdb_version_store.write(symbol, df)

Expand All @@ -113,10 +114,22 @@ def test_write_not_sorted_multi_index_exception(lmdb_version_store):
lmdb_version_store.write(symbol, df, validate_index=True)


def test_write_not_sorted_range_index_exception(lmdb_version_store):
symbol = "bad_write"
@pytest.mark.parametrize("index_type", ["range", "int64"])
@pytest.mark.parametrize("sorted", [True, False])
@pytest.mark.parametrize("validate_index", [True, False])
def test_write_non_timestamp_index(lmdb_version_store, index_type, sorted, validate_index):
lib = lmdb_version_store
symbol = "test_write_range_index"
num_rows = 20
dtidx = np.roll(pd.RangeIndex(0, num_rows, 1), 3)
df = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False
lmdb_version_store.write(symbol, df, validate_index=True)
shift = 0 if sorted else 3
if index_type == "range":
idx = np.roll(pd.RangeIndex(0, num_rows, 1), shift)
elif index_type == "int64":
idx = np.roll(pd.Index(range(20), dtype=np.int64) if IS_PANDAS_TWO else pd.Int64Index(range(20)), shift)
df = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=idx)
assert df.index.is_monotonic_increasing == sorted
lib.write(symbol, df, validate_index=validate_index)
info = lib.get_info(symbol)
assert info["sorted"] == "UNKNOWN"


0 comments on commit 22498dd

Please sign in to comment.