Skip to content

Commit

Permalink
Backport sort and finalize fixes (#1859)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
<!--Example: Fixes #1234. See also #3456.-->
Backporting sort and finalize fixes to 4.5.1rc1.

It also needs #1856

Removes `read_dataframe_merged` from the python bindings. It was removed
in master by #1698 which is
too large to be cherry-picked in this backport.
#### What does this implement or fix?

#### Any other comments?

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Vasil Pashov <vasil.pashov@man.com>
Co-authored-by: Ivo Dilov <iddilov@gmail.com>
  • Loading branch information
3 people authored Oct 1, 2024
1 parent 44a5a67 commit 243d58a
Show file tree
Hide file tree
Showing 50 changed files with 2,329 additions and 583 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ set(arcticdb_srcs
util/offset_string.cpp
util/sparse_utils.cpp
util/string_utils.cpp
util/timer.cpp
util/trace.cpp
util/type_handler.cpp
version/local_versioned_engine.cpp
Expand Down Expand Up @@ -816,7 +817,6 @@ if(${TEST})
util/test/test_string_utils.cpp
util/test/test_tracing_allocator.cpp
version/test/test_append.cpp
version/test/test_merge.cpp
version/test/test_sparse.cpp
version/test/test_stream_version_data.cpp
version/test/test_symbol_list.cpp
Expand Down
11 changes: 8 additions & 3 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,15 @@ class Column {
return TypedColumnIterator<TagType, const RawType>(*this, false);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(ssize_t row_offset, T val) {
util::check(sizeof(T) == get_type_size(type_.data_type()), "Type mismatch in set_scalar, expected {}",
get_type_size(type_.data_type()));
util::check(
sizeof(T) == get_type_size(type_.data_type()),
"Type mismatch in set_scalar, expected {} byte scalar got {} byte scalar",
get_type_size(type_.data_type()),
sizeof(T)
);

auto prev_logical_row = last_logical_row_;
last_logical_row_ = row_offset;
Expand Down
28 changes: 17 additions & 11 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ class SegmentInMemory {
impl_->end_row();
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<typename T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
impl_->set_scalar(idx, val);
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<typename T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
impl_->set_string(idx, val);
}

Expand Down Expand Up @@ -119,16 +121,14 @@ class SegmentInMemory {
impl_->init_column_map();
}

template<class T, template<class> class Tensor, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T, template<class> class Tensor>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, Tensor<T> &val) {
impl_->set_array(pos, val);
}

template<class T, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, py::array_t<T>& val) {
impl_->set_array(pos, val);
}
Expand Down Expand Up @@ -263,12 +263,14 @@ class SegmentInMemory {
impl_->sparsify();
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_external_block(position_t idx, T *val, size_t size) {
impl_->set_external_block(idx, val, size);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_sparse_block(position_t idx, T *val, size_t rows_to_write) {
impl_->set_sparse_block(idx, val, rows_to_write);
}
Expand Down Expand Up @@ -477,6 +479,10 @@ class SegmentInMemory {
return output;
}

void drop_empty_columns() {
impl_->drop_empty_columns();
}

private:
explicit SegmentInMemory(std::shared_ptr<SegmentInMemoryImpl> impl) :
impl_(std::move(impl)) {}
Expand Down
25 changes: 24 additions & 1 deletion cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <arcticdb/column_store/string_pool.hpp>
#include <arcticdb/entity/type_utils.hpp>
#include <arcticdb/pipeline/string_pool_utils.hpp>
#include <arcticdb/util/preconditions.hpp>

#include <google/protobuf/any.h>
#include <google/protobuf/any.pb.h>


Expand Down Expand Up @@ -673,4 +673,27 @@ const google::protobuf::Any* SegmentInMemoryImpl::metadata() const {
return metadata_.get();
}

void SegmentInMemoryImpl::drop_empty_columns() {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
row_count() > 0,
"Dropping all empty columns from an empty segment would result in removing all columns"
);
const StreamDescriptor& original = descriptor();
StreamDescriptor only_non_empty_cols;
only_non_empty_cols.set_id(original.id());
only_non_empty_cols.set_index(descriptor().index());
size_t field_index = 0;
while (field_index < original.index().field_count()) {
only_non_empty_cols.add_field(original.field(field_index++));
}
while (field_index < original.field_count()) {
const Column& col = column(field_index);
if (col.row_count() > 0) {
only_non_empty_cols.add_field(original.field(field_index));
}
field_index++;
}
change_schema(std::move(only_non_empty_cols));
}

}
49 changes: 26 additions & 23 deletions cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,31 +67,31 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->scalar_at<RawType>(row_id_, column_id_));
});
}

template<class Callable>
auto visit_string(Callable &&c) const {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DTT = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
if constexpr(is_sequence_type(DTT::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)));
return c(parent_->string_at(row_id_, position_t(column_id_)));
});
}

template<class Callable>
auto visit_field(Callable &&c) const {
const auto& field = parent_->descriptor().field(column_id_);
return entity::visit_field(parent_->descriptor().field(column_id_), [&field, that=this, c = std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(field, [&field, this, c = std::forward<Callable>(c)](auto type_desc_tag) {
using DataTypeTag = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag;
using RawType = typename DataTypeTag::raw_type;
if constexpr (is_sequence_type(DataTypeTag::data_type))
return c(that->parent_->string_at(that->row_id_, position_t(that->column_id_)), std::string_view{field.name()}, field.type());
return c(parent_->string_at(row_id_, position_t(column_id_)), std::string_view{field.name()}, type_desc_tag);
else if constexpr (is_numeric_type(DataTypeTag::data_type) || is_bool_type(DataTypeTag::data_type))
return c(that->parent_->scalar_at<RawType>(that->row_id_, that->column_id_), std::string_view{field.name()}, field.type());
return c(parent_->scalar_at<RawType>(row_id_, column_id_), std::string_view{field.name()}, type_desc_tag);
else if constexpr(is_empty_type(DataTypeTag::data_type))
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("visit_field does not support empty-type columns");
else
Expand All @@ -101,9 +101,9 @@ class SegmentInMemoryImpl {

template<class Callable>
auto visit(Callable &&c) {
return entity::visit_field(parent_->descriptor().field(column_id_), [that=this, c=std::forward<Callable>(c)](auto type_desc_tag) {
return entity::visit_field(parent_->descriptor().field(column_id_), [this, c=std::forward<Callable>(c)](auto type_desc_tag) {
using RawType = typename std::decay_t<decltype(type_desc_tag)>::DataTypeTag::raw_type;
return c(that->parent_->reference_at<RawType>(that->row_id_, that->column_id_));
return c(parent_->reference_at<RawType>(row_id_, column_id_));
});
}

Expand Down Expand Up @@ -456,18 +456,21 @@ class SegmentInMemoryImpl {
});
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_scalar(position_t idx, T val) {
ARCTICDB_TRACE(log::version(), "Segment setting scalar {} at row {} column {}", val, row_id_ + 1, idx);
column(idx).set_scalar(row_id_ + 1, val);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
void set_external_block(position_t idx, T *val, size_t size) {
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_external_block(position_t idx, T *val, size_t size) {
column_unchecked(idx).set_external_block(row_id_ + 1, val, size);
}

template<class T, std::enable_if_t<std::is_integral_v<T> || std::is_floating_point_v<T>, int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_sparse_block(position_t idx, T *val, size_t rows_to_write) {
column_unchecked(idx).set_sparse_block(row_id_ + 1, val, rows_to_write);
}
Expand All @@ -480,23 +483,22 @@ class SegmentInMemoryImpl {
column_unchecked(idx).set_sparse_block(std::move(buffer), std::move(shapes), std::move(bitset));
}

template<class T, std::enable_if_t<std::is_same_v<std::decay_t<T>, std::string>, int> = 0>
void set_scalar(position_t idx, T val) {
template<class T>
requires std::same_as<std::decay_t<T>, std::string>
void set_scalar(position_t idx, const T& val) {
set_string(idx, val);
}

template<class T, template<class> class Tensor, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
void set_array(position_t pos, Tensor<T> &val) {
template<class T, template<class> class Tensor>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, Tensor<T> &val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
column_unchecked(pos).set_array(row_id_ + 1, val);
}

template<class T, std::enable_if_t<
std::is_integral_v<T> || std::is_floating_point_v<T>,
int> = 0>
template<class T>
requires std::integral<T> || std::floating_point<T>
void set_array(position_t pos, py::array_t<T>& val) {
magic_.check();
ARCTICDB_SAMPLE(MemorySegmentSetArray, 0)
Expand Down Expand Up @@ -808,6 +810,7 @@ class SegmentInMemoryImpl {
const std::vector<uint64_t>& segment_counts) const;

std::vector<std::shared_ptr<SegmentInMemoryImpl>> split(size_t rows) const;
void drop_empty_columns();

private:
ssize_t row_id_ = -1;
Expand Down
21 changes: 19 additions & 2 deletions cpp/arcticdb/entity/merge_descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace arcticdb {
StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<std::shared_ptr<FieldCollection>> &entries,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::unordered_set<std::string_view> &filtered_set,
const std::optional<IndexDescriptorImpl>& default_index) {
using namespace arcticdb::stream;
Expand All @@ -34,6 +34,7 @@ StreamDescriptor merge_descriptors(
merged_fields.emplace_back(idx.name());
merged_fields_map.try_emplace(idx.name(), TypeDescriptor{typename IndexType::TypeDescTag{}});
});
index = default_index_type_from_descriptor(*default_index);
} else {
util::raise_rte("Descriptor has uninitialized index and no default supplied");
}
Expand Down Expand Up @@ -71,7 +72,12 @@ StreamDescriptor merge_descriptors(
if(new_descriptor) {
merged_fields_map[field.name()] = *new_descriptor;
} else {
util::raise_rte("No valid common type between {} and {} for column {}", existing_type_desc, type_desc, field.name());
schema::raise<ErrorCode::E_DESCRIPTOR_MISMATCH>(
"No valid common type between {} and {} for column {}",
existing_type_desc,
type_desc,
field.name()
);
}
}
} else {
Expand Down Expand Up @@ -99,6 +105,17 @@ StreamDescriptor merge_descriptors(
return merge_descriptors(original, entries, filtered_set, default_index);
}

StreamDescriptor merge_descriptors(
const StreamDescriptor& original,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::optional<std::vector<std::string>>& filtered_columns,
const std::optional<IndexDescriptorImpl>& default_index) {
std::unordered_set<std::string_view> filtered_set = filtered_columns.has_value()
? std::unordered_set<std::string_view>(filtered_columns->begin(), filtered_columns->end())
: std::unordered_set<std::string_view>{};
return merge_descriptors(original, entries, filtered_set, default_index);
}

StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<pipelines::SliceAndKey> &entries,
Expand Down
9 changes: 8 additions & 1 deletion cpp/arcticdb/entity/merge_descriptors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

#include <arcticdb/entity/stream_descriptor.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <span>

namespace arcticdb {
StreamDescriptor merge_descriptors(
const StreamDescriptor &original,
const std::vector<std::shared_ptr<FieldCollection>> &entries,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::unordered_set<std::string_view> &filtered_set,
const std::optional<IndexDescriptorImpl>& default_index);

Expand All @@ -21,6 +22,12 @@ entity::StreamDescriptor merge_descriptors(
const std::optional<std::vector<std::string>> &filtered_columns,
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);

entity::StreamDescriptor merge_descriptors(
const entity::StreamDescriptor& original,
std::span<const std::shared_ptr<FieldCollection>> entries,
const std::optional<std::vector<std::string>>& filtered_columns,
const std::optional<entity::IndexDescriptorImpl>& default_index = std::nullopt);

entity::StreamDescriptor merge_descriptors(
const entity::StreamDescriptor &original,
const std::vector<pipelines::SliceAndKey> &entries,
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,11 @@ struct TypeDescriptor {
template<typename Callable>
constexpr auto visit_tag(Callable &&callable) const;

bool operator==(const TypeDescriptor &o) const {
[[nodiscard]] constexpr bool operator==(const TypeDescriptor& o) const {
return data_type_ == o.data_type_ && dimension_ == o.dimension_;
}

bool operator!=(const TypeDescriptor &o) const {
[[nodiscard]] constexpr bool operator!=(const TypeDescriptor& o) const {
return !(*this == o);
}

Expand Down
2 changes: 0 additions & 2 deletions cpp/arcticdb/pipeline/frame_slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ struct FrameSlice {
hash_bucket_(hash),
num_buckets_(num_buckets),
indices_(std::move(indices)) {
util::check(col_range.diff() > 0 || row_range.diff() > 0, "Expected non-zero column or row range");
}

FrameSlice(const ColRange& col_range, const RowRange& row_range,
Expand All @@ -93,7 +92,6 @@ struct FrameSlice {
row_range(row_range),
hash_bucket_(hash_bucket),
num_buckets_(num_buckets) {
util::check(col_range.diff() > 0 || row_range.diff() > 0, "Expected non-zero column or row range");
}

explicit FrameSlice(const SegmentInMemory& seg);
Expand Down
3 changes: 3 additions & 0 deletions cpp/arcticdb/pipeline/frame_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ size_t adjust_slice_rowcounts(std::vector<pipelines::SliceAndKey> & slice_and_ke
}

size_t get_slice_rowcounts(std::vector<pipelines::SliceAndKey> & slice_and_keys) {
if (slice_and_keys.empty()) {
return 0;
}
auto current_col = slice_and_keys[0].slice_.col_range.first;
size_t rowcount = 0u;
for (auto& slice_and_key : slice_and_keys) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ TimeseriesDescriptor get_merged_tsd(
}
else if (dynamic_schema) {
// In case of dynamic schema
const std::array fields_ptr = {new_frame->desc.fields_ptr()};
merged_descriptor = merge_descriptors(
existing_descriptor,
std::vector<std::shared_ptr<FieldCollection>>{new_frame->desc.fields_ptr()},
fields_ptr,
{}
);
} else {
Expand Down
Loading

0 comments on commit 243d58a

Please sign in to comment.