From 0b23fbf3f37aee044c17f96a74d74c77f237d061 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Tue, 23 Apr 2024 15:56:17 +0100 Subject: [PATCH 1/8] Minimal failing test --- .../arcticdb/version_store/test_deletion.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/python/tests/integration/arcticdb/version_store/test_deletion.py b/python/tests/integration/arcticdb/version_store/test_deletion.py index 284de3e1e7..e48bf17eab 100644 --- a/python/tests/integration/arcticdb/version_store/test_deletion.py +++ b/python/tests/integration/arcticdb/version_store/test_deletion.py @@ -568,6 +568,25 @@ def test_delete_date_range_remove_everything(version_store_factory, map_timeout) assert_frame_equal(vit.data, df) +def test_delete_date_range_get_info(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_delete_date_range_get_info" + df = pd.DataFrame({"col": np.arange(5)}, index=pd.date_range(pd.Timestamp("2024-01-01"), periods=5)) + lib.write(sym, df) + assert df.index[0] == lib.get_info(sym)["date_range"][0] + assert df.index[-1] == lib.get_info(sym)["date_range"][1] + + lib.delete(sym, (pd.Timestamp("2024-01-04"), pd.Timestamp("2024-01-05"))) + received = lib.read(sym).data + assert_frame_equal(df.iloc[:3], received) + assert received.index[-1] == lib.get_info(sym)["date_range"][1] + + lib.delete(sym, (pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02"))) + received = lib.read(sym).data + assert_frame_equal(df.iloc[2:3], received) + assert received.index[0] == lib.get_info(sym)["date_range"][0] + + def test_delete_read_from_timestamp(basic_store): sym = "test_from_timestamp_with_delete" lib = basic_store From 194b3f12d1376cb73d3592f548a43d73c1271482 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Tue, 23 Apr 2024 16:46:36 +0100 Subject: [PATCH 2/8] Fix bug --- cpp/arcticdb/pipeline/write_frame.cpp | 34 +++---------------- .../arcticdb/version_store/test_deletion.py | 11 +++--- 2 files changed, 10 insertions(+), 35 deletions(-) diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index cf7af8031b..1e1e42b86a 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -269,33 +269,6 @@ static RowRange partial_rewrite_row_range( } } -/// @brief Find the index range of affected rows during a partial rewrite (on update) -/// Similar to partial_rewrite_row_range the segment is affected either at the beginning -/// or at the end. -static IndexRange partial_rewrite_index_range( - const IndexRange& segment_range, - const IndexRange& update_range, - AffectedSegmentPart affected_part -) { - if (affected_part == AffectedSegmentPart::START) { - util::check( - segment_range.start_ < update_range.start_, - "Unexpected index range in after: {} !< {}", - segment_range.start_, - update_range.start_ - ); - return {segment_range.start_, update_range.start_}; - } else { - util::check( - segment_range.end_ > update_range.end_, - "Unexpected non-intersection of update indices: {} !> {}", - segment_range.end_, - update_range.end_ - ); - return {segment_range.end_, update_range.end_}; - } -} - std::optional rewrite_partial_segment( const SliceAndKey& existing, IndexRange index_range, @@ -307,13 +280,14 @@ std::optional rewrite_partial_segment( const IndexRange& existing_range = key.index_range(); auto kv = store->read(key).get(); const SegmentInMemory& segment = kv.second; - const IndexRange affected_index_range = partial_rewrite_index_range(existing_range, index_range, affected_part); const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part); const int64_t num_rows = affected_row_range.end() - affected_row_range.start(); if (num_rows <= 0) { return std::nullopt; } SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true); + const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output); + const IndexValue end_ts = std::get(TimeseriesIndex::end_value_for_segment(output)) + 1; FrameSlice new_slice{ std::make_shared(output.descriptor()), existing.slice_.col_range, @@ -324,8 +298,8 @@ std::optional rewrite_partial_segment( key.type(), version_id, key.id(), - affected_index_range.start_, - affected_index_range.end_, + start_ts, + end_ts, std::move(output) ); return SliceAndKey{std::move(new_slice), std::get(std::move(fut_key).get())}; diff --git a/python/tests/integration/arcticdb/version_store/test_deletion.py b/python/tests/integration/arcticdb/version_store/test_deletion.py index e48bf17eab..e6fb79e6b0 100644 --- a/python/tests/integration/arcticdb/version_store/test_deletion.py +++ b/python/tests/integration/arcticdb/version_store/test_deletion.py @@ -571,17 +571,18 @@ def test_delete_date_range_remove_everything(version_store_factory, map_timeout) def test_delete_date_range_get_info(lmdb_version_store_v1): lib = lmdb_version_store_v1 sym = "test_delete_date_range_get_info" - df = pd.DataFrame({"col": np.arange(5)}, index=pd.date_range(pd.Timestamp("2024-01-01"), periods=5)) + df = pd.DataFrame({"col": np.arange(5)}, index=pd.date_range(pd.Timestamp(1000), freq="us", periods=5)) lib.write(sym, df) - assert df.index[0] == lib.get_info(sym)["date_range"][0] - assert df.index[-1] == lib.get_info(sym)["date_range"][1] + date_range = lib.get_info(sym)["date_range"] + assert df.index[0] == date_range[0] + assert df.index[-1] == date_range[1] - lib.delete(sym, (pd.Timestamp("2024-01-04"), pd.Timestamp("2024-01-05"))) + lib.delete(sym, (pd.Timestamp(4000), pd.Timestamp(5000))) received = lib.read(sym).data assert_frame_equal(df.iloc[:3], received) assert received.index[-1] == lib.get_info(sym)["date_range"][1] - lib.delete(sym, (pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02"))) + lib.delete(sym, (pd.Timestamp(1000), pd.Timestamp(2000))) received = lib.read(sym).data assert_frame_equal(df.iloc[2:3], received) assert received.index[0] == lib.get_info(sym)["date_range"][0] From c246ff6e1a714e611fc821bc42a361a303577ccc Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Tue, 23 Apr 2024 16:50:15 +0100 Subject: [PATCH 3/8] Fix off by one error --- cpp/arcticdb/version/local_versioned_engine.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 244b7c8fae..1d9de02bbd 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -409,6 +409,9 @@ folly::Future LocalVersionedEngine::get_descriptor( util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::end_index)).type()); } }); + if (end_index.has_value()) { + --(*end_index); + } return DescriptorItem{std::move(to_atom(key_seg)), std::move(start_index), std::move(end_index), std::move(timeseries_descriptor)}; }); } From eed9f2d527a189c3e2f73c7539a4743d2bbd5fc4 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Tue, 23 Apr 2024 16:54:42 +0100 Subject: [PATCH 4/8] Add column slicing to test --- .../arcticdb/version_store/test_deletion.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/python/tests/integration/arcticdb/version_store/test_deletion.py b/python/tests/integration/arcticdb/version_store/test_deletion.py index e6fb79e6b0..977bf858f1 100644 --- a/python/tests/integration/arcticdb/version_store/test_deletion.py +++ b/python/tests/integration/arcticdb/version_store/test_deletion.py @@ -568,10 +568,15 @@ def test_delete_date_range_remove_everything(version_store_factory, map_timeout) assert_frame_equal(vit.data, df) -def test_delete_date_range_get_info(lmdb_version_store_v1): - lib = lmdb_version_store_v1 +def test_delete_date_range_get_info(lmdb_version_store_tiny_segment): + lib = lmdb_version_store_tiny_segment sym = "test_delete_date_range_get_info" - df = pd.DataFrame({"col": np.arange(5)}, index=pd.date_range(pd.Timestamp(1000), freq="us", periods=5)) + data = { + "col_0": [0, 1, 2, 3, 4], + "col_1": [5, 6, 7, 8, 9], + "col_2": [10, 11, 12, 13, 14], + } + df = pd.DataFrame(data, index=pd.date_range(pd.Timestamp(1000), freq="us", periods=5)) lib.write(sym, df) date_range = lib.get_info(sym)["date_range"] assert df.index[0] == date_range[0] From 19e50f0f90862b98cbdfd52902be9ca5c372a869 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Tue, 23 Apr 2024 17:33:10 +0100 Subject: [PATCH 5/8] Remove some unnecessary looping --- .../version/local_versioned_engine.cpp | 20 ++----------------- 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 1d9de02bbd..862ac4c9da 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -373,15 +373,7 @@ folly::Future LocalVersionedEngine::get_descriptor( return std::nullopt; } else if constexpr (is_numeric_type(ColumnTagType::data_type)) { std::optional start_index; - auto column_data = seg.column(position_t(index::Fields::start_index)).data(); - while (auto block = column_data.template next()) { - auto ptr = reinterpret_cast(block.value().data()); - const auto row_count = block.value().row_count(); - for (auto i = 0u; i < row_count; ++i) { - auto value = *ptr++; - start_index = start_index.has_value() ? std::min(*start_index, value) : value; - } - } + start_index = seg.column(position_t(index::Fields::start_index)).template scalar_at(0); return start_index; } else { util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::start_index)).type()); @@ -395,15 +387,7 @@ folly::Future LocalVersionedEngine::get_descriptor( return std::nullopt; } else if constexpr (is_numeric_type(ColumnTagType::data_type)) { std::optional end_index; - auto column_data = seg.column(position_t(index::Fields::end_index)).data(); - while (auto block = column_data.template next()) { - auto ptr = reinterpret_cast(block.value().data()); - const auto row_count = block.value().row_count(); - for (auto i = 0u; i < row_count; ++i) { - auto value = *ptr++; - end_index = end_index.has_value() ? std::max(*end_index, value) : value; - } - } + end_index = seg.column(position_t(index::Fields::end_index)).template scalar_at(seg.row_count() - 1); return end_index; } else { util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::end_index)).type()); From 09446eb1afe9a94d65c45634b6fa8eba3d89befd Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Wed, 24 Apr 2024 10:29:26 +0100 Subject: [PATCH 6/8] Simplify DescriptorItem and get_descriptor --- cpp/arcticdb/entity/descriptor_item.hpp | 20 +++---- .../version/local_versioned_engine.cpp | 57 ++++++++----------- cpp/arcticdb/version/python_bindings.cpp | 13 +---- .../integration/arcticdb/test_arctic_batch.py | 4 +- python/tests/util/mark.py | 4 +- 5 files changed, 42 insertions(+), 56 deletions(-) diff --git a/cpp/arcticdb/entity/descriptor_item.hpp b/cpp/arcticdb/entity/descriptor_item.hpp index bcf81f27aa..38aa4b2a5d 100644 --- a/cpp/arcticdb/entity/descriptor_item.hpp +++ b/cpp/arcticdb/entity/descriptor_item.hpp @@ -16,27 +16,27 @@ namespace arcticdb { struct DescriptorItem { DescriptorItem( entity::AtomKey &&key, - std::optional start_index, - std::optional end_index, - std::optional timeseries_descriptor) : + std::optional start_index, + std::optional end_index, + std::optional&& timeseries_descriptor) : key_(std::move(key)), start_index_(start_index), end_index_(end_index), - timeseries_descriptor_(timeseries_descriptor) { + timeseries_descriptor_(std::move(timeseries_descriptor)) { } DescriptorItem() = delete; entity::AtomKey key_; - std::optional start_index_; - std::optional end_index_; - std::optional timeseries_descriptor_; + std::optional start_index_; + std::optional end_index_; + std::optional timeseries_descriptor_; std::string symbol() const { return fmt::format("{}", key_.id()); } uint64_t version() const { return key_.version_id(); } timestamp creation_ts() const { return key_.creation_ts(); } - std::optional start_index() const { return start_index_; } - std::optional end_index() const { return end_index_; } - std::optional timeseries_descriptor() const { return timeseries_descriptor_; } + std::optional start_index() const { return start_index_; } + std::optional end_index() const { return end_index_; } + std::optional timeseries_descriptor() const { return timeseries_descriptor_; } }; } \ No newline at end of file diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 862ac4c9da..18061e84e4 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -363,40 +363,33 @@ folly::Future LocalVersionedEngine::get_descriptor( const auto key = std::move(k); return store()->read(key) .thenValue([](auto&& key_seg_pair) -> DescriptorItem { - auto key_seg = std::move(std::get<0>(key_seg_pair)); - auto seg = std::move(std::get<1>(key_seg_pair)); - auto timeseries_descriptor = seg.has_metadata() ? std::make_optional(*seg.metadata()) : std::nullopt; - auto start_index = seg.column(position_t(index::Fields::start_index)).type().visit_tag([&](auto column_desc_tag) -> std::optional { - using ColumnDescriptorType = std::decay_t; - using ColumnTagType = typename ColumnDescriptorType::DataTypeTag; - if (seg.row_count() == 0) { - return std::nullopt; - } else if constexpr (is_numeric_type(ColumnTagType::data_type)) { - std::optional start_index; - start_index = seg.column(position_t(index::Fields::start_index)).template scalar_at(0); - return start_index; - } else { - util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::start_index)).type()); - } - }); + auto key = to_atom(std::move(key_seg_pair.first)); + auto seg = std::move(key_seg_pair.second); + auto tsd = std::make_optional(); + if (seg.has_metadata()) { + seg.metadata()->UnpackTo(&(*tsd)); + } + std::optional start_index; + std::optional end_index; + if (seg.row_count() > 0) { + const auto& start_index_column = seg.column(position_t(index::Fields::start_index)); + details::visit_type(start_index_column.type().data_type(), [&start_index_column, &start_index](auto column_desc_tag) { + using type_info = ScalarTypeInfo; + if constexpr (is_time_type(type_info::data_type)) { + start_index = start_index_column.template scalar_at(0); + } + }); - auto end_index = seg.column(position_t(index::Fields::end_index)).type().visit_tag([&](auto column_desc_tag) -> std::optional { - using ColumnDescriptorType = std::decay_t; - using ColumnTagType = typename ColumnDescriptorType::DataTypeTag; - if (seg.row_count() == 0) { - return std::nullopt; - } else if constexpr (is_numeric_type(ColumnTagType::data_type)) { - std::optional end_index; - end_index = seg.column(position_t(index::Fields::end_index)).template scalar_at(seg.row_count() - 1); - return end_index; - } else { - util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::end_index)).type()); - } - }); - if (end_index.has_value()) { - --(*end_index); + const auto& end_index_column = seg.column(position_t(index::Fields::end_index)); + details::visit_type(end_index_column.type().data_type(), [&end_index_column, &end_index, row_count=seg.row_count()](auto column_desc_tag) { + using type_info = ScalarTypeInfo; + if constexpr (is_time_type(type_info::data_type)) { + // -1 as the end timestamp in the data keys is one nanosecond greater than the last value in the index column + end_index = *end_index_column.template scalar_at(row_count - 1) - 1; + } + }); } - return DescriptorItem{std::move(to_atom(key_seg)), std::move(start_index), std::move(end_index), std::move(timeseries_descriptor)}; + return DescriptorItem{std::move(key), start_index, end_index, std::move(tsd)}; }); } diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 5bf1420ee7..a75911105a 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -200,16 +200,9 @@ void register_bindings(py::module &version, py::exceptionUnpackTo(&tsd); - pyobj = python_util::pb_to_python(tsd); - } else { - pyobj = pybind11::none(); - } - return pyobj; + // FUTURE: Use std::optional monadic operations in C++23 + auto opt_tsd = self.timeseries_descriptor(); + return opt_tsd.has_value() ? python_util::pb_to_python(*opt_tsd) : pybind11::none(); }); py::class_>(version, "FrameSlice") diff --git a/python/tests/integration/arcticdb/test_arctic_batch.py b/python/tests/integration/arcticdb/test_arctic_batch.py index 2152d1ee9b..dae9fd24cc 100644 --- a/python/tests/integration/arcticdb/test_arctic_batch.py +++ b/python/tests/integration/arcticdb/test_arctic_batch.py @@ -1316,8 +1316,8 @@ def test_read_description_batch_empty_nat(arctic_library): requests = [ReadInfoRequest("sym_" + str(sym)) for sym in range(num_symbols)] results_list = lib.get_description_batch(requests) for sym in range(num_symbols): - assert np.isnat(results_list[sym].date_range[0]) == True - assert np.isnat(results_list[sym].date_range[1]) == True + assert np.isnat(results_list[sym].date_range[0]) + assert np.isnat(results_list[sym].date_range[1]) def test_read_batch_mixed_with_snapshots(arctic_library): diff --git a/python/tests/util/mark.py b/python/tests/util/mark.py index 4b2da35e3e..20d6b43cff 100644 --- a/python/tests/util/mark.py +++ b/python/tests/util/mark.py @@ -32,11 +32,11 @@ # This is to avoid the risk of the name becoming out of sync with the actual condition. SLOW_TESTS_MARK = pytest.mark.skipif(FAST_TESTS_ONLY, reason="Skipping test as it takes a long time to run") -AZURE_TESTS_MARK = pytest.mark.skipif(FAST_TESTS_ONLY or MACOS_CONDA_BUILD, reason=_MACOS_CONDA_BUILD_SKIP_REASON) +AZURE_TESTS_MARK = pytest.mark.skipif(True or FAST_TESTS_ONLY or MACOS_CONDA_BUILD, reason=_MACOS_CONDA_BUILD_SKIP_REASON) """Mark to skip all Azure tests when MACOS_CONDA_BUILD or ARCTICDB_FAST_TESTS_ONLY is set.""" MONGO_TESTS_MARK = pytest.mark.skipif( - FAST_TESTS_ONLY or sys.platform != "linux", + True or FAST_TESTS_ONLY or sys.platform != "linux", reason="Skipping mongo tests under ARCTICDB_FAST_TESTS_ONLY", ) """Mark on tests using the mongo storage fixtures. Currently skips if ARCTICDB_FAST_TESTS_ONLY.""" From d1acb142aea39e51ef90b62123de365c09472546 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Wed, 24 Apr 2024 10:32:20 +0100 Subject: [PATCH 7/8] Tidy up --- cpp/arcticdb/pipeline/write_frame.cpp | 1 + python/tests/util/mark.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index 1e1e42b86a..95516f9e61 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -287,6 +287,7 @@ std::optional rewrite_partial_segment( } SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true); const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output); + // +1 as in the key we store one nanosecond greater than the last index value in the segment const IndexValue end_ts = std::get(TimeseriesIndex::end_value_for_segment(output)) + 1; FrameSlice new_slice{ std::make_shared(output.descriptor()), diff --git a/python/tests/util/mark.py b/python/tests/util/mark.py index 20d6b43cff..4b2da35e3e 100644 --- a/python/tests/util/mark.py +++ b/python/tests/util/mark.py @@ -32,11 +32,11 @@ # This is to avoid the risk of the name becoming out of sync with the actual condition. SLOW_TESTS_MARK = pytest.mark.skipif(FAST_TESTS_ONLY, reason="Skipping test as it takes a long time to run") -AZURE_TESTS_MARK = pytest.mark.skipif(True or FAST_TESTS_ONLY or MACOS_CONDA_BUILD, reason=_MACOS_CONDA_BUILD_SKIP_REASON) +AZURE_TESTS_MARK = pytest.mark.skipif(FAST_TESTS_ONLY or MACOS_CONDA_BUILD, reason=_MACOS_CONDA_BUILD_SKIP_REASON) """Mark to skip all Azure tests when MACOS_CONDA_BUILD or ARCTICDB_FAST_TESTS_ONLY is set.""" MONGO_TESTS_MARK = pytest.mark.skipif( - True or FAST_TESTS_ONLY or sys.platform != "linux", + FAST_TESTS_ONLY or sys.platform != "linux", reason="Skipping mongo tests under ARCTICDB_FAST_TESTS_ONLY", ) """Mark on tests using the mongo storage fixtures. Currently skips if ARCTICDB_FAST_TESTS_ONLY.""" From daf0ebb7369ccbe69a4118e76270be5b40b92b49 Mon Sep 17 00:00:00 2001 From: Alex Owens Date: Wed, 24 Apr 2024 10:58:11 +0100 Subject: [PATCH 8/8] Remove unused variable --- cpp/arcticdb/pipeline/write_frame.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index 95516f9e61..6b70204823 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -277,7 +277,6 @@ std::optional rewrite_partial_segment( const std::shared_ptr& store ) { const auto& key = existing.key(); - const IndexRange& existing_range = key.index_range(); auto kv = store->read(key).get(); const SegmentInMemory& segment = kv.second; const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);