diff --git a/cpp/arcticdb/entity/descriptor_item.hpp b/cpp/arcticdb/entity/descriptor_item.hpp index bcf81f27aa..38aa4b2a5d 100644 --- a/cpp/arcticdb/entity/descriptor_item.hpp +++ b/cpp/arcticdb/entity/descriptor_item.hpp @@ -16,27 +16,27 @@ namespace arcticdb { struct DescriptorItem { DescriptorItem( entity::AtomKey &&key, - std::optional start_index, - std::optional end_index, - std::optional timeseries_descriptor) : + std::optional start_index, + std::optional end_index, + std::optional&& timeseries_descriptor) : key_(std::move(key)), start_index_(start_index), end_index_(end_index), - timeseries_descriptor_(timeseries_descriptor) { + timeseries_descriptor_(std::move(timeseries_descriptor)) { } DescriptorItem() = delete; entity::AtomKey key_; - std::optional start_index_; - std::optional end_index_; - std::optional timeseries_descriptor_; + std::optional start_index_; + std::optional end_index_; + std::optional timeseries_descriptor_; std::string symbol() const { return fmt::format("{}", key_.id()); } uint64_t version() const { return key_.version_id(); } timestamp creation_ts() const { return key_.creation_ts(); } - std::optional start_index() const { return start_index_; } - std::optional end_index() const { return end_index_; } - std::optional timeseries_descriptor() const { return timeseries_descriptor_; } + std::optional start_index() const { return start_index_; } + std::optional end_index() const { return end_index_; } + std::optional timeseries_descriptor() const { return timeseries_descriptor_; } }; } \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index cf7af8031b..6b70204823 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -269,33 +269,6 @@ static RowRange partial_rewrite_row_range( } } -/// @brief Find the index range of affected rows during a partial rewrite (on update) -/// Similar to partial_rewrite_row_range the segment is affected either at the beginning -/// or at the end. -static IndexRange partial_rewrite_index_range( - const IndexRange& segment_range, - const IndexRange& update_range, - AffectedSegmentPart affected_part -) { - if (affected_part == AffectedSegmentPart::START) { - util::check( - segment_range.start_ < update_range.start_, - "Unexpected index range in after: {} !< {}", - segment_range.start_, - update_range.start_ - ); - return {segment_range.start_, update_range.start_}; - } else { - util::check( - segment_range.end_ > update_range.end_, - "Unexpected non-intersection of update indices: {} !> {}", - segment_range.end_, - update_range.end_ - ); - return {segment_range.end_, update_range.end_}; - } -} - std::optional rewrite_partial_segment( const SliceAndKey& existing, IndexRange index_range, @@ -304,16 +277,17 @@ std::optional rewrite_partial_segment( const std::shared_ptr& store ) { const auto& key = existing.key(); - const IndexRange& existing_range = key.index_range(); auto kv = store->read(key).get(); const SegmentInMemory& segment = kv.second; - const IndexRange affected_index_range = partial_rewrite_index_range(existing_range, index_range, affected_part); const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part); const int64_t num_rows = affected_row_range.end() - affected_row_range.start(); if (num_rows <= 0) { return std::nullopt; } SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true); + const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output); + // +1 as in the key we store one nanosecond greater than the last index value in the segment + const IndexValue end_ts = std::get(TimeseriesIndex::end_value_for_segment(output)) + 1; FrameSlice new_slice{ std::make_shared(output.descriptor()), existing.slice_.col_range, @@ -324,8 +298,8 @@ std::optional rewrite_partial_segment( key.type(), version_id, key.id(), - affected_index_range.start_, - affected_index_range.end_, + start_ts, + end_ts, std::move(output) ); return SliceAndKey{std::move(new_slice), std::get(std::move(fut_key).get())}; diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 244b7c8fae..18061e84e4 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -363,53 +363,33 @@ folly::Future LocalVersionedEngine::get_descriptor( const auto key = std::move(k); return store()->read(key) .thenValue([](auto&& key_seg_pair) -> DescriptorItem { - auto key_seg = std::move(std::get<0>(key_seg_pair)); - auto seg = std::move(std::get<1>(key_seg_pair)); - auto timeseries_descriptor = seg.has_metadata() ? std::make_optional(*seg.metadata()) : std::nullopt; - auto start_index = seg.column(position_t(index::Fields::start_index)).type().visit_tag([&](auto column_desc_tag) -> std::optional { - using ColumnDescriptorType = std::decay_t; - using ColumnTagType = typename ColumnDescriptorType::DataTypeTag; - if (seg.row_count() == 0) { - return std::nullopt; - } else if constexpr (is_numeric_type(ColumnTagType::data_type)) { - std::optional start_index; - auto column_data = seg.column(position_t(index::Fields::start_index)).data(); - while (auto block = column_data.template next()) { - auto ptr = reinterpret_cast(block.value().data()); - const auto row_count = block.value().row_count(); - for (auto i = 0u; i < row_count; ++i) { - auto value = *ptr++; - start_index = start_index.has_value() ? std::min(*start_index, value) : value; - } + auto key = to_atom(std::move(key_seg_pair.first)); + auto seg = std::move(key_seg_pair.second); + auto tsd = std::make_optional(); + if (seg.has_metadata()) { + seg.metadata()->UnpackTo(&(*tsd)); + } + std::optional start_index; + std::optional end_index; + if (seg.row_count() > 0) { + const auto& start_index_column = seg.column(position_t(index::Fields::start_index)); + details::visit_type(start_index_column.type().data_type(), [&start_index_column, &start_index](auto column_desc_tag) { + using type_info = ScalarTypeInfo; + if constexpr (is_time_type(type_info::data_type)) { + start_index = start_index_column.template scalar_at(0); } - return start_index; - } else { - util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::start_index)).type()); - } - }); + }); - auto end_index = seg.column(position_t(index::Fields::end_index)).type().visit_tag([&](auto column_desc_tag) -> std::optional { - using ColumnDescriptorType = std::decay_t; - using ColumnTagType = typename ColumnDescriptorType::DataTypeTag; - if (seg.row_count() == 0) { - return std::nullopt; - } else if constexpr (is_numeric_type(ColumnTagType::data_type)) { - std::optional end_index; - auto column_data = seg.column(position_t(index::Fields::end_index)).data(); - while (auto block = column_data.template next()) { - auto ptr = reinterpret_cast(block.value().data()); - const auto row_count = block.value().row_count(); - for (auto i = 0u; i < row_count; ++i) { - auto value = *ptr++; - end_index = end_index.has_value() ? std::max(*end_index, value) : value; - } + const auto& end_index_column = seg.column(position_t(index::Fields::end_index)); + details::visit_type(end_index_column.type().data_type(), [&end_index_column, &end_index, row_count=seg.row_count()](auto column_desc_tag) { + using type_info = ScalarTypeInfo; + if constexpr (is_time_type(type_info::data_type)) { + // -1 as the end timestamp in the data keys is one nanosecond greater than the last value in the index column + end_index = *end_index_column.template scalar_at(row_count - 1) - 1; } - return end_index; - } else { - util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::end_index)).type()); - } - }); - return DescriptorItem{std::move(to_atom(key_seg)), std::move(start_index), std::move(end_index), std::move(timeseries_descriptor)}; + }); + } + return DescriptorItem{std::move(key), start_index, end_index, std::move(tsd)}; }); } diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 5bf1420ee7..a75911105a 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -200,16 +200,9 @@ void register_bindings(py::module &version, py::exceptionUnpackTo(&tsd); - pyobj = python_util::pb_to_python(tsd); - } else { - pyobj = pybind11::none(); - } - return pyobj; + // FUTURE: Use std::optional monadic operations in C++23 + auto opt_tsd = self.timeseries_descriptor(); + return opt_tsd.has_value() ? python_util::pb_to_python(*opt_tsd) : pybind11::none(); }); py::class_>(version, "FrameSlice") diff --git a/python/tests/integration/arcticdb/test_arctic_batch.py b/python/tests/integration/arcticdb/test_arctic_batch.py index 2152d1ee9b..dae9fd24cc 100644 --- a/python/tests/integration/arcticdb/test_arctic_batch.py +++ b/python/tests/integration/arcticdb/test_arctic_batch.py @@ -1316,8 +1316,8 @@ def test_read_description_batch_empty_nat(arctic_library): requests = [ReadInfoRequest("sym_" + str(sym)) for sym in range(num_symbols)] results_list = lib.get_description_batch(requests) for sym in range(num_symbols): - assert np.isnat(results_list[sym].date_range[0]) == True - assert np.isnat(results_list[sym].date_range[1]) == True + assert np.isnat(results_list[sym].date_range[0]) + assert np.isnat(results_list[sym].date_range[1]) def test_read_batch_mixed_with_snapshots(arctic_library): diff --git a/python/tests/integration/arcticdb/version_store/test_deletion.py b/python/tests/integration/arcticdb/version_store/test_deletion.py index 284de3e1e7..977bf858f1 100644 --- a/python/tests/integration/arcticdb/version_store/test_deletion.py +++ b/python/tests/integration/arcticdb/version_store/test_deletion.py @@ -568,6 +568,31 @@ def test_delete_date_range_remove_everything(version_store_factory, map_timeout) assert_frame_equal(vit.data, df) +def test_delete_date_range_get_info(lmdb_version_store_tiny_segment): + lib = lmdb_version_store_tiny_segment + sym = "test_delete_date_range_get_info" + data = { + "col_0": [0, 1, 2, 3, 4], + "col_1": [5, 6, 7, 8, 9], + "col_2": [10, 11, 12, 13, 14], + } + df = pd.DataFrame(data, index=pd.date_range(pd.Timestamp(1000), freq="us", periods=5)) + lib.write(sym, df) + date_range = lib.get_info(sym)["date_range"] + assert df.index[0] == date_range[0] + assert df.index[-1] == date_range[1] + + lib.delete(sym, (pd.Timestamp(4000), pd.Timestamp(5000))) + received = lib.read(sym).data + assert_frame_equal(df.iloc[:3], received) + assert received.index[-1] == lib.get_info(sym)["date_range"][1] + + lib.delete(sym, (pd.Timestamp(1000), pd.Timestamp(2000))) + received = lib.read(sym).data + assert_frame_equal(df.iloc[2:3], received) + assert received.index[0] == lib.get_info(sym)["date_range"][0] + + def test_delete_read_from_timestamp(basic_store): sym = "test_from_timestamp_with_delete" lib = basic_store