Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/1518/get description date range incorrect after delete data in range #1523

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions cpp/arcticdb/entity/descriptor_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ namespace arcticdb {
struct DescriptorItem {
DescriptorItem(
entity::AtomKey &&key,
std::optional<IndexValue> start_index,
std::optional<IndexValue> end_index,
std::optional<google::protobuf::Any> timeseries_descriptor) :
std::optional<timestamp> start_index,
std::optional<timestamp> end_index,
std::optional<arcticdb::proto::descriptors::TimeSeriesDescriptor>&& timeseries_descriptor) :
key_(std::move(key)),
start_index_(start_index),
end_index_(end_index),
timeseries_descriptor_(timeseries_descriptor) {
timeseries_descriptor_(std::move(timeseries_descriptor)) {
}

DescriptorItem() = delete;

entity::AtomKey key_;
std::optional<IndexValue> start_index_;
std::optional<IndexValue> end_index_;
std::optional<google::protobuf::Any> timeseries_descriptor_;
std::optional<timestamp> start_index_;
std::optional<timestamp> end_index_;
std::optional<arcticdb::proto::descriptors::TimeSeriesDescriptor> timeseries_descriptor_;

std::string symbol() const { return fmt::format("{}", key_.id()); }
uint64_t version() const { return key_.version_id(); }
timestamp creation_ts() const { return key_.creation_ts(); }
std::optional<IndexValue> start_index() const { return start_index_; }
std::optional<IndexValue> end_index() const { return end_index_; }
std::optional<google::protobuf::Any> timeseries_descriptor() const { return timeseries_descriptor_; }
std::optional<timestamp> start_index() const { return start_index_; }
std::optional<timestamp> end_index() const { return end_index_; }
std::optional<arcticdb::proto::descriptors::TimeSeriesDescriptor> timeseries_descriptor() const { return timeseries_descriptor_; }
};
}
36 changes: 5 additions & 31 deletions cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,33 +269,6 @@ static RowRange partial_rewrite_row_range(
}
}

/// @brief Find the index range of affected rows during a partial rewrite (on update)
/// Similar to partial_rewrite_row_range the segment is affected either at the beginning
/// or at the end.
static IndexRange partial_rewrite_index_range(
const IndexRange& segment_range,
const IndexRange& update_range,
AffectedSegmentPart affected_part
) {
if (affected_part == AffectedSegmentPart::START) {
util::check(
segment_range.start_ < update_range.start_,
"Unexpected index range in after: {} !< {}",
segment_range.start_,
update_range.start_
);
return {segment_range.start_, update_range.start_};
} else {
util::check(
segment_range.end_ > update_range.end_,
"Unexpected non-intersection of update indices: {} !> {}",
segment_range.end_,
update_range.end_
);
return {segment_range.end_, update_range.end_};
}
}

std::optional<SliceAndKey> rewrite_partial_segment(
const SliceAndKey& existing,
IndexRange index_range,
Expand All @@ -304,16 +277,17 @@ std::optional<SliceAndKey> rewrite_partial_segment(
const std::shared_ptr<Store>& store
) {
const auto& key = existing.key();
const IndexRange& existing_range = key.index_range();
auto kv = store->read(key).get();
const SegmentInMemory& segment = kv.second;
const IndexRange affected_index_range = partial_rewrite_index_range(existing_range, index_range, affected_part);
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return std::nullopt;
}
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
// +1 as in the key we store one nanosecond greater than the last index value in the segment
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
FrameSlice new_slice{
std::make_shared<StreamDescriptor>(output.descriptor()),
existing.slice_.col_range,
Expand All @@ -324,8 +298,8 @@ std::optional<SliceAndKey> rewrite_partial_segment(
key.type(),
version_id,
key.id(),
affected_index_range.start_,
affected_index_range.end_,
start_ts,
end_ts,
std::move(output)
);
return SliceAndKey{std::move(new_slice), std::get<AtomKey>(std::move(fut_key).get())};
Expand Down
68 changes: 24 additions & 44 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,53 +363,33 @@ folly::Future<DescriptorItem> LocalVersionedEngine::get_descriptor(
const auto key = std::move(k);
return store()->read(key)
.thenValue([](auto&& key_seg_pair) -> DescriptorItem {
auto key_seg = std::move(std::get<0>(key_seg_pair));
auto seg = std::move(std::get<1>(key_seg_pair));
auto timeseries_descriptor = seg.has_metadata() ? std::make_optional<google::protobuf::Any>(*seg.metadata()) : std::nullopt;
auto start_index = seg.column(position_t(index::Fields::start_index)).type().visit_tag([&](auto column_desc_tag) -> std::optional<NumericIndex> {
using ColumnDescriptorType = std::decay_t<decltype(column_desc_tag)>;
using ColumnTagType = typename ColumnDescriptorType::DataTypeTag;
if (seg.row_count() == 0) {
return std::nullopt;
} else if constexpr (is_numeric_type(ColumnTagType::data_type)) {
std::optional<NumericIndex> start_index;
auto column_data = seg.column(position_t(index::Fields::start_index)).data();
while (auto block = column_data.template next<ColumnDescriptorType>()) {
auto ptr = reinterpret_cast<const NumericIndex *>(block.value().data());
const auto row_count = block.value().row_count();
for (auto i = 0u; i < row_count; ++i) {
auto value = *ptr++;
start_index = start_index.has_value() ? std::min(*start_index, value) : value;
}
auto key = to_atom(std::move(key_seg_pair.first));
auto seg = std::move(key_seg_pair.second);
auto tsd = std::make_optional<arcticdb::proto::descriptors::TimeSeriesDescriptor>();
if (seg.has_metadata()) {
seg.metadata()->UnpackTo(&(*tsd));
}
std::optional<timestamp> start_index;
std::optional<timestamp> end_index;
if (seg.row_count() > 0) {
const auto& start_index_column = seg.column(position_t(index::Fields::start_index));
details::visit_type(start_index_column.type().data_type(), [&start_index_column, &start_index](auto column_desc_tag) {
using type_info = ScalarTypeInfo<decltype(column_desc_tag)>;
if constexpr (is_time_type(type_info::data_type)) {
start_index = start_index_column.template scalar_at<timestamp>(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a reason not to use start_value_for_segment and end_value_for_segment from index.hpp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an index key, not a data key

}
return start_index;
} else {
util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::start_index)).type());
}
});
});

auto end_index = seg.column(position_t(index::Fields::end_index)).type().visit_tag([&](auto column_desc_tag) -> std::optional<NumericIndex> {
using ColumnDescriptorType = std::decay_t<decltype(column_desc_tag)>;
using ColumnTagType = typename ColumnDescriptorType::DataTypeTag;
if (seg.row_count() == 0) {
return std::nullopt;
} else if constexpr (is_numeric_type(ColumnTagType::data_type)) {
std::optional<NumericIndex> end_index;
auto column_data = seg.column(position_t(index::Fields::end_index)).data();
while (auto block = column_data.template next<ColumnDescriptorType>()) {
auto ptr = reinterpret_cast<const NumericIndex *>(block.value().data());
const auto row_count = block.value().row_count();
for (auto i = 0u; i < row_count; ++i) {
auto value = *ptr++;
end_index = end_index.has_value() ? std::max(*end_index, value) : value;
}
const auto& end_index_column = seg.column(position_t(index::Fields::end_index));
details::visit_type(end_index_column.type().data_type(), [&end_index_column, &end_index, row_count=seg.row_count()](auto column_desc_tag) {
using type_info = ScalarTypeInfo<decltype(column_desc_tag)>;
if constexpr (is_time_type(type_info::data_type)) {
// -1 as the end timestamp in the data keys is one nanosecond greater than the last value in the index column
end_index = *end_index_column.template scalar_at<timestamp>(row_count - 1) - 1;
}
return end_index;
} else {
util::raise_rte("Unsupported index type {}", seg.column(position_t(index::Fields::end_index)).type());
}
});
return DescriptorItem{std::move(to_atom(key_seg)), std::move(start_index), std::move(end_index), std::move(timeseries_descriptor)};
});
}
return DescriptorItem{std::move(key), start_index, end_index, std::move(tsd)};
});
}

Expand Down
13 changes: 3 additions & 10 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def_property_readonly("end_index", &DescriptorItem::end_index)
.def_property_readonly("creation_ts", &DescriptorItem::creation_ts)
.def_property_readonly("timeseries_descriptor", [](const DescriptorItem& self) {
py::object pyobj;
auto timeseries_descriptor = self.timeseries_descriptor();
if (timeseries_descriptor.has_value()) {
arcticdb::proto::descriptors::TimeSeriesDescriptor tsd;
timeseries_descriptor->UnpackTo(&tsd);
pyobj = python_util::pb_to_python(tsd);
} else {
pyobj = pybind11::none();
}
return pyobj;
// FUTURE: Use std::optional monadic operations in C++23
auto opt_tsd = self.timeseries_descriptor();
return opt_tsd.has_value() ? python_util::pb_to_python(*opt_tsd) : pybind11::none();
});

py::class_<pipelines::FrameSlice, std::shared_ptr<pipelines::FrameSlice>>(version, "FrameSlice")
Expand Down
4 changes: 2 additions & 2 deletions python/tests/integration/arcticdb/test_arctic_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1316,8 +1316,8 @@ def test_read_description_batch_empty_nat(arctic_library):
requests = [ReadInfoRequest("sym_" + str(sym)) for sym in range(num_symbols)]
results_list = lib.get_description_batch(requests)
for sym in range(num_symbols):
assert np.isnat(results_list[sym].date_range[0]) == True
assert np.isnat(results_list[sym].date_range[1]) == True
assert np.isnat(results_list[sym].date_range[0])
assert np.isnat(results_list[sym].date_range[1])


def test_read_batch_mixed_with_snapshots(arctic_library):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,31 @@ def test_delete_date_range_remove_everything(version_store_factory, map_timeout)
assert_frame_equal(vit.data, df)


def test_delete_date_range_get_info(lmdb_version_store_tiny_segment):
lib = lmdb_version_store_tiny_segment
sym = "test_delete_date_range_get_info"
data = {
"col_0": [0, 1, 2, 3, 4],
"col_1": [5, 6, 7, 8, 9],
"col_2": [10, 11, 12, 13, 14],
}
df = pd.DataFrame(data, index=pd.date_range(pd.Timestamp(1000), freq="us", periods=5))
lib.write(sym, df)
date_range = lib.get_info(sym)["date_range"]
assert df.index[0] == date_range[0]
assert df.index[-1] == date_range[1]

lib.delete(sym, (pd.Timestamp(4000), pd.Timestamp(5000)))
received = lib.read(sym).data
assert_frame_equal(df.iloc[:3], received)
assert received.index[-1] == lib.get_info(sym)["date_range"][1]

lib.delete(sym, (pd.Timestamp(1000), pd.Timestamp(2000)))
received = lib.read(sym).data
assert_frame_equal(df.iloc[2:3], received)
assert received.index[0] == lib.get_info(sym)["date_range"][0]


def test_delete_read_from_timestamp(basic_store):
sym = "test_from_timestamp_with_delete"
lib = basic_store
Expand Down
Loading