Skip to content

Commit

Permalink
Abstract out building of a merged descriptor
Browse files Browse the repository at this point in the history
- Deduplicates merging descriptors inside update and append
- Fixes fixed string to dynamic string conversion #1204
- Fixes inconsistent column ordering when appending #1349
  • Loading branch information
IvoDD committed Feb 21, 2024
1 parent f640012 commit 92c32c2
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 47 deletions.
2 changes: 2 additions & 0 deletions cpp/arcticdb/entity/merge_descriptors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <arcticdb/entity/stream_descriptor.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>

#pragma once

namespace arcticdb {
StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
Expand Down
51 changes: 51 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <arcticdb/storage/store.hpp>
#include <arcticdb/pipeline/index_writer.hpp>
#include <arcticdb/pipeline/frame_utils.hpp>
#include <arcticdb/version/version_utils.hpp>
#include <arcticdb/entity/merge_descriptors.hpp>

namespace arcticdb::pipelines::index {

Expand Down Expand Up @@ -77,4 +79,53 @@ std::pair<index::IndexSegmentReader, std::vector<SliceAndKey>> read_index_to_vec
return {std::move(index_segment_reader), std::move(slice_and_keys)};
}

TimeseriesDescriptor get_merged_tsd(
int row_count,
bool dynamic_schema,
const pipelines::index::IndexSegmentReader& existing_isr,
const std::shared_ptr<pipelines::InputTensorFrame>& new_frame) {
auto existing_descriptor = existing_isr.tsd().as_stream_descriptor();
auto merged_descriptor = existing_descriptor;
if (existing_isr.tsd().proto().total_rows() == 0){
// If the existing dataframe is empty, we use the descriptor of the new_frame
merged_descriptor = new_frame->desc;
}
else if (dynamic_schema) {
// In case of dynamic schema
merged_descriptor = merge_descriptors(
existing_descriptor,
std::vector<std::shared_ptr<FieldCollection>>{new_frame->desc.fields_ptr()},
{}
);
} else {
// In case of static schema, we only promote empty types and fixed->dynamic strings
const auto &new_fields = new_frame->desc.fields();
for (size_t i = 0; i < new_fields.size(); ++i) {
const auto &new_type = new_fields.at(i).type();
TypeDescriptor &result_type = merged_descriptor.mutable_field(i).mutable_type();
// We allow promoting empty types
if (is_empty_type(result_type.data_type()) && !is_empty_type(new_type.data_type())) {
result_type = new_type;
}
// We allow promoting fixed strings to dynamic strings
else if (is_sequence_type(result_type.data_type()) &&
is_sequence_type(new_type.data_type()) &&
!is_dynamic_string_type(result_type.data_type()) &&
is_dynamic_string_type(new_type.data_type())) {
result_type = new_type;
}
}
}
merged_descriptor.set_sorted(deduce_sorted(existing_isr.get_sorted(), new_frame->desc.get_sorted()));
return make_timeseries_descriptor(
row_count,
std::move(merged_descriptor),
std::move(new_frame->norm_meta),
std::move(new_frame->user_meta),
std::nullopt,
std::nullopt,
new_frame->bucketize_dynamic
);
}

} //namespace arcticdb::pipelines::index
7 changes: 7 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,11 @@ std::pair<index::IndexSegmentReader, std::vector<SliceAndKey>> read_index_to_vec
const std::shared_ptr<Store>& store,
const AtomKey& index_key);

// Combines the stream descriptors of an existing index key and a new frame.
TimeseriesDescriptor get_merged_tsd(
int row_count,
bool dynamic_schema,
const pipelines::index::IndexSegmentReader& existing_isr,
const std::shared_ptr<pipelines::InputTensorFrame>& new_frame);

} //namespace arcticdb::pipelines::index
31 changes: 2 additions & 29 deletions cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,35 +247,8 @@ folly::Future<entity::AtomKey> append_frame(
.thenValue([dynamic_schema, slices_to_write=std::move(existing_slices), frame=frame, index_segment_reader=std::move(index_segment_reader), key=std::move(key), store](auto&& slice_and_keys_to_append) mutable {
slices_to_write.insert(std::end(slices_to_write), std::make_move_iterator(std::begin(slice_and_keys_to_append)), std::make_move_iterator(std::end(slice_and_keys_to_append)));
std::sort(std::begin(slices_to_write), std::end(slices_to_write));
if(dynamic_schema) {
auto merged_descriptor = merge_descriptors(
frame->desc,
std::vector<std::shared_ptr<FieldCollection>>{index_segment_reader.tsd().fields_ptr()},
{}
);
merged_descriptor.set_sorted(deduce_sorted(index_segment_reader.get_sorted(), frame->desc.get_sorted()));
auto tsd = make_timeseries_descriptor(
frame->num_rows + frame->offset,
std::move(merged_descriptor),
std::move(frame->norm_meta),
std::move(frame->user_meta),
std::nullopt,
std::nullopt,
frame->bucketize_dynamic
);
return index::write_index(stream::index_type_from_descriptor(frame->desc), std::move(tsd), std::move(slices_to_write), key, store);
} else {
const FieldCollection& new_fields{index_segment_reader.tsd().fields()};
for (size_t i = 0; i < new_fields.size(); ++i) {
const Field& new_field = new_fields.at(i);
TypeDescriptor& original_type = frame->desc.mutable_field(i).mutable_type();
if (is_empty_type(original_type.data_type()) && !is_empty_type(new_field.type().data_type())) {
original_type = new_field.type();
}
}
frame->desc.set_sorted(deduce_sorted(index_segment_reader.get_sorted(), frame->desc.get_sorted()));
return index::write_index(frame, std::move(slices_to_write), key, store);
}
auto tsd = get_merged_tsd(frame->num_rows + frame->offset, dynamic_schema, index_segment_reader, frame);
return index::write_index(stream::index_type_from_descriptor(tsd.as_stream_descriptor()), std::move(tsd), std::move(slices_to_write), key, store);
});
}

Expand Down
13 changes: 2 additions & 11 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ VersionedItem update_impl(
sorted_data_check_update(*frame, index_segment_reader);
bool bucketize_dynamic = index_segment_reader.bucketize_dynamic();
(void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic);
auto combined_sorting_info = deduce_sorted(frame->desc.get_sorted(), index_segment_reader.get_sorted());
fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame);

std::vector<FilterQuery<index::IndexSegmentReader>> queries =
Expand Down Expand Up @@ -394,16 +393,8 @@ VersionedItem update_impl(
unaffected_keys.size(), new_keys_size, affected_keys.size(), flattened_slice_and_keys.size());

std::sort(std::begin(flattened_slice_and_keys), std::end(flattened_slice_and_keys));
auto existing_desc = index_segment_reader.tsd().as_stream_descriptor();
auto desc = merge_descriptors(existing_desc, std::vector<std::shared_ptr<FieldCollection>>{ frame->desc.fields_ptr() }, {});
desc.set_sorted(combined_sorting_info);
auto time_series = make_timeseries_descriptor(row_count, std::move(desc), std::move(frame->norm_meta), std::move(frame->user_meta), std::nullopt, std::nullopt, bucketize_dynamic);
auto index = index_type_from_descriptor(time_series.as_stream_descriptor());

auto version_key_fut = util::variant_match(index, [&time_series, &flattened_slice_and_keys, &stream_id, &update_info, &store] (auto idx) {
using IndexType = decltype(idx);
return index::write_index<IndexType>(std::move(time_series), std::move(flattened_slice_and_keys), IndexPartialKey{stream_id, update_info.next_version_id_}, store);
});
auto tsd = get_merged_tsd(row_count, dynamic_schema, index_segment_reader, frame);
auto version_key_fut = index::write_index(stream::index_type_from_descriptor(tsd.as_stream_descriptor()), std::move(tsd), std::move(flattened_slice_and_keys), IndexPartialKey{stream_id, update_info.next_version_id_}, store);
auto version_key = std::move(version_key_fut).get();
auto versioned_item = VersionedItem(to_atom(std::move(version_key)));
ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", stream_id, update_info.next_version_id_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,7 @@ def test_changing_numeric_type(version_store_factory, dynamic_schema):
@pytest.mark.parametrize("dynamic_schema, dynamic_strings_first", [
(True, True),
(True, False),
pytest.param(False,
True,
marks=pytest.mark.xfail(
reason="""Issue with appending/updating a dynamic string column with fixed-width strings
https://github.com/man-group/ArcticDB/issues/1204"""
)
),
(False, True),
(False, False),
])
def test_changing_string_type(version_store_factory, dynamic_schema, dynamic_strings_first):
Expand Down

0 comments on commit 92c32c2

Please sign in to comment.