Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix 1173: Correctly apply sortedness checks when calling update with date_range argument #1238

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ void sorted_data_check_update(InputTensorFrame& frame, index::IndexSegmentReader
"When calling update, the input data must be a time series.");
bool input_data_is_sorted = frame.desc.get_sorted() == SortedValue::ASCENDING ||
frame.desc.get_sorted() == SortedValue::UNKNOWN;
// If changing this error message, the corresponding message in _normalization.py::restrict_data_to_date_range_only should also be updated
sorting::check<ErrorCode::E_UNSORTED_DATA>(
input_data_is_sorted,
"When calling update, the input data must be sorted.");
Expand Down
18 changes: 16 additions & 2 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from arcticc.pb2.storage_pb2 import VersionStoreConfig
from mmap import mmap
from collections import Counter
from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException
from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException, SortingException
from arcticdb.supported_types import DateRangeInput, time_types as supported_time_types
from arcticdb.util._versions import IS_PANDAS_TWO, IS_PANDAS_ZERO
from arcticdb.version_store.read_result import ReadResult
Expand Down Expand Up @@ -889,7 +889,10 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col

sort_status = _SortedValue.UNKNOWN
index = item.index
if hasattr(index, "is_monotonic_increasing"):
# Treat empty indexes as ascending so that all operations are valid
if index.empty:
sort_status = _SortedValue.ASCENDING
elif isinstance(index, (pd.DatetimeIndex, pd.PeriodIndex)):
if index.is_monotonic_increasing:
sort_status = _SortedValue.ASCENDING
elif index.is_monotonic_decreasing:
Expand Down Expand Up @@ -1351,6 +1354,17 @@ def _strip_tz(s, e):
if hasattr(data, "loc"):
if not data.index.get_level_values(0).tz:
start, end = _strip_tz(start, end)
if not data.index.is_monotonic_increasing:
# data.loc[...] scans forward through the index until hitting a value >= pd.to_datetime(end)
# If the input data is unsorted this produces non-intuitive results
# The copy below in data.loc[...] will recalculate is_monotonic_<in|de>creasing
# Therefore if data.loc[...] is sorted, but data is not the update will be allowed with unexpected results
# See https://github.com/man-group/ArcticDB/issues/1173 for more details
# We could set data.is_monotonic_<in|de>creasing to the values on input to this function after calling
# data.loc[...] and let version_core.cpp::sorted_data_check_update handle this, but that will be confusing
# as the frame input to sorted_data_check_update WILL be sorted. Instead, we fail early here, at the cost
# of duplicating exception messages.
raise SortingException("E_UNSORTED_DATA When calling update, the input data must be sorted.")
data = data.loc[pd.to_datetime(start) : pd.to_datetime(end)]
else: # non-Pandas, try to slice it anyway
if not getattr(data, "timezone", None):
Expand Down
2 changes: 2 additions & 0 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2602,6 +2602,7 @@ def get_info(self, symbol: str, version: Optional[VersionQueryInput] = None) ->
- normalization_metadata,
- type, `str`
- date_range, `tuple`
- sorted, `str`
"""
version_query = self._get_version_query(version)
read_options = _PythonVersionStoreReadOptions()
Expand Down Expand Up @@ -2637,6 +2638,7 @@ def batch_get_info(
- normalization_metadata,
- type, `str`
- date_range, `tuple`
- sorted, `str`
"""
throw_on_error = True
return self._batch_read_descriptor(symbols, as_ofs, throw_on_error)
Expand Down
13 changes: 13 additions & 0 deletions python/arcticdb/version_store/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ class SymbolDescription(NamedTuple):
date_range : Tuple[datetime.datetime, datetime.datetime]
The times in UTC that data for this symbol spans. If the data is not timeseries indexed then this value will be
``(datetime.datetime(1970, 1, 1), datetime.datetime(1970, 1, 1))``.
sorted : str
One of "ASCENDING", "DESCENDING", "UNSORTED", or "UNKNOWN":
ASCENDING - The data has a timestamp index, and is sorted in ascending order. Guarantees that operations such as
append, update, and read with date_range work as expected.
DESCENDING - The data has a timestamp index, and is sorted in descending order. Update and read with date_range
will not work.
UNSORTED - The data has a timestamp index, and is not sorted. Can only be created by calling write, write_batch,
append, or append_batch with validate_index set to False. Update and read with date_range will not
work.
UNKNOWN - Either the data does not have a timestamp index, or the data does have a timestamp index, but was
written by a client that predates this information being stored.
"""

columns: Tuple[NameWithDType]
Expand All @@ -134,6 +145,7 @@ class SymbolDescription(NamedTuple):
row_count: int
last_update_time: datetime64
date_range: Tuple[datetime.datetime, datetime.datetime]
sorted: str


class WritePayload:
Expand Down Expand Up @@ -1538,6 +1550,7 @@ def _info_to_desc(info: Dict[str, Any]) -> SymbolDescription:
last_update_time=last_update_time,
index_type=info["index_type"],
date_range=date_range,
sorted=info["sorted"],
)

def get_description(self, symbol: str, as_of: Optional[AsOf] = None) -> SymbolDescription:
Expand Down
2 changes: 2 additions & 0 deletions python/tests/integration/arcticdb/test_arctic.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,8 @@ def test_get_description(arctic_library):
assert original_info.row_count == 4
assert info.last_update_time > original_info.last_update_time
assert info.last_update_time.tz == pytz.UTC
assert original_info.sorted == "ASCENDING"
assert info.sorted == "ASCENDING"


def test_tail(arctic_library):
Expand Down
8 changes: 7 additions & 1 deletion python/tests/integration/arcticdb/test_arctic_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ def test_get_description_batch_missing_keys(arctic_library):
assert batch[2].index[0] == ["named_index"]
assert batch[2].index_type == "index"
assert batch[2].row_count == 3
assert batch[2].sorted == "ASCENDING"


def test_get_description_batch_symbol_doesnt_exist(arctic_library):
Expand All @@ -980,6 +981,7 @@ def test_get_description_batch_symbol_doesnt_exist(arctic_library):
assert batch[0].index[0] == ["named_index"]
assert batch[0].index_type == "index"
assert batch[0].row_count == 4
assert batch[0].sorted == "ASCENDING"

assert isinstance(batch[1], DataError)
assert batch[1].symbol == "s2"
Expand Down Expand Up @@ -1016,6 +1018,7 @@ def test_get_description_batch_version_doesnt_exist(arctic_library):
assert batch[0].index[0] == ["named_index"]
assert batch[0].index_type == "index"
assert batch[0].row_count == 4
assert batch[0].sorted == "ASCENDING"

assert isinstance(batch[1], DataError)
assert batch[1].symbol == "s1"
Expand Down Expand Up @@ -1157,6 +1160,8 @@ def test_get_description_batch(arctic_library):
assert info.row_count == 6
assert original_info.row_count == 4
assert info.last_update_time > original_info.last_update_time
assert original_info.sorted == "ASCENDING"
assert info.sorted == "ASCENDING"


def test_get_description_batch_multiple_versions(arctic_library):
Expand Down Expand Up @@ -1245,7 +1250,8 @@ def test_get_description_batch_multiple_versions(arctic_library):
assert info.row_count == 6
assert original_info.row_count == 4
assert info.last_update_time > original_info.last_update_time

assert original_info.sorted == "ASCENDING"
assert info.sorted == "ASCENDING"

def test_read_description_batch_high_amount(arctic_library):
lib = arctic_library
Expand Down
2 changes: 1 addition & 1 deletion python/tests/unit/arcticdb/version_store/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def test_append_not_sorted_range_index_non_exception(lmdb_version_store):

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"
assert info["sorted"] == "UNKNOWN"

num_rows = 20
dtidx = pd.RangeIndex(num_initial_rows, num_initial_rows + num_rows, 1)
Expand Down
136 changes: 44 additions & 92 deletions python/tests/unit/arcticdb/version_store/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,98 +453,50 @@ def _create_product_candles_df(arr):
assert_frame_equal(after_arctic, before_arctic)


def test_update_input_not_sorted_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_initial_rows)
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == True

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_rows), 3)
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == False

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)


def test_update_existing_not_sorted_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_initial_rows), 3)
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "UNSORTED"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_rows)
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == True

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)


def test_update_input_descending_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_initial_rows)
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == True

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = reversed(pd.date_range(initial_timestamp, periods=num_rows))
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == False
assert df2.index.is_monotonic_decreasing == True

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)


def test_update_existing_descending_exception(lmdb_version_store):
symbol = "bad_update"

num_initial_rows = 20
initial_timestamp = pd.Timestamp("2019-01-01")
dtidx = reversed(pd.date_range(initial_timestamp, periods=num_initial_rows))
df = pd.DataFrame({"c": np.arange(0, num_initial_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False
assert df.index.is_monotonic_decreasing == True

lmdb_version_store.write(symbol, df)
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "DESCENDING"

num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = pd.date_range(initial_timestamp, periods=num_rows)
df2 = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df2.index.is_monotonic_increasing == True

with pytest.raises(SortingException):
lmdb_version_store.update(symbol, df2)
@pytest.mark.parametrize("existing_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED"))
@pytest.mark.parametrize("update_df_sortedness", ("ASCENDING", "DESCENDING", "UNSORTED"))
@pytest.mark.parametrize("date_range_arg_provided", (True, False))
def test_update_sortedness_checks(
lmdb_version_store,
existing_df_sortedness,
update_df_sortedness,
date_range_arg_provided,
):
lib = lmdb_version_store
symbol = "test_update_sortedness_checks"
num_rows = 10
data = np.arange(num_rows)
ascending_idx = pd.date_range("2024-01-15", periods=num_rows)
ascending_df = pd.DataFrame({"col": data}, index=ascending_idx)
descending_df = pd.DataFrame({"col": data}, index=pd.DatetimeIndex(reversed(ascending_idx)))
unsorted_df = pd.DataFrame({"col": data}, index=pd.DatetimeIndex(np.roll(ascending_idx, num_rows // 2)))

date_range = (pd.Timestamp("2024-01-13"), pd.Timestamp("2024-01-17")) if date_range_arg_provided else None

if existing_df_sortedness == "ASCENDING":
write_df = ascending_df
elif existing_df_sortedness == "DESCENDING":
write_df = descending_df
else:
# existing_df_sortedness == "UNSORTED":
write_df = unsorted_df
lib.write(symbol, write_df)
assert lib.get_info(symbol)["sorted"] == existing_df_sortedness

if update_df_sortedness == "ASCENDING":
update_df = ascending_df
elif update_df_sortedness == "DESCENDING":
update_df = descending_df
else:
# update_df_sortedness == "UNSORTED":
update_df = unsorted_df

if existing_df_sortedness == "ASCENDING" and update_df_sortedness == "ASCENDING":
lib.update(symbol, update_df, date_range=date_range)
assert lib.get_info(symbol)["sorted"] == "ASCENDING"
else:
with pytest.raises(SortingException):
lib.update(symbol, update_df, date_range=date_range)


def test_update_not_sorted_input_multi_index_exception(lmdb_version_store):
Expand Down
35 changes: 24 additions & 11 deletions python/tests/unit/arcticdb/version_store/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import pytest
from arcticdb.exceptions import SortingException, NormalizationException
from arcticdb.util._versions import IS_PANDAS_TWO
from pandas import MultiIndex


Expand All @@ -30,7 +31,7 @@ def test_write_ascending_sorted_dataframe(lmdb_version_store):
lmdb_version_store.write(symbol, df)
assert df.index.is_monotonic_increasing == True
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "ASCENDING"
assert info["sorted"] == "UNKNOWN"


def test_write_descending_sorted_dataframe(lmdb_version_store):
Expand All @@ -44,7 +45,7 @@ def test_write_descending_sorted_dataframe(lmdb_version_store):
lmdb_version_store.write(symbol, df)
assert df.index.is_monotonic_decreasing == True
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "DESCENDING"
assert info["sorted"] == "UNKNOWN"


def test_write_unsorted_sorted_dataframe(lmdb_version_store):
Expand All @@ -59,7 +60,7 @@ def test_write_unsorted_sorted_dataframe(lmdb_version_store):
assert df.index.is_monotonic_decreasing == False
assert df.index.is_monotonic_increasing == False
info = lmdb_version_store.get_info(symbol)
assert info["sorted"] == "UNSORTED"
assert info["sorted"] == "UNKNOWN"


def test_write_unknown_sorted_dataframe(lmdb_version_store):
Expand Down Expand Up @@ -87,9 +88,9 @@ def test_write_not_sorted_non_validate_index(lmdb_version_store):
num_initial_rows = 20
num_rows = 20
initial_timestamp = pd.Timestamp("2020-01-01")
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_initial_rows), 3)
dtidx = np.roll(pd.date_range(initial_timestamp, periods=num_initial_rows), 0)
df = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False
# assert df.index.is_monotonic_increasing == False

lmdb_version_store.write(symbol, df)

Expand All @@ -113,10 +114,22 @@ def test_write_not_sorted_multi_index_exception(lmdb_version_store):
lmdb_version_store.write(symbol, df, validate_index=True)


def test_write_not_sorted_range_index_exception(lmdb_version_store):
symbol = "bad_write"
@pytest.mark.parametrize("index_type", ["range", "int64"])
@pytest.mark.parametrize("sorted", [True, False])
@pytest.mark.parametrize("validate_index", [True, False])
def test_write_non_timestamp_index(lmdb_version_store, index_type, sorted, validate_index):
lib = lmdb_version_store
symbol = "test_write_range_index"
num_rows = 20
dtidx = np.roll(pd.RangeIndex(0, num_rows, 1), 3)
df = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=dtidx)
assert df.index.is_monotonic_increasing == False
lmdb_version_store.write(symbol, df, validate_index=True)
shift = 0 if sorted else 3
if index_type == "range":
idx = np.roll(pd.RangeIndex(0, num_rows, 1), shift)
elif index_type == "int64":
idx = np.roll(pd.Index(range(20), dtype=np.int64) if IS_PANDAS_TWO else pd.Int64Index(range(20)), shift)
df = pd.DataFrame({"c": np.arange(0, num_rows, dtype=np.int64)}, index=idx)
assert df.index.is_monotonic_increasing == sorted
lib.write(symbol, df, validate_index=validate_index)
info = lib.get_info(symbol)
assert info["sorted"] == "UNKNOWN"


Loading