Skip to content

Commit

Permalink
Fixes for sort and finalize (#1763)
Browse files Browse the repository at this point in the history
Fixes: #1737 #1736 #1735

<details>
  <summary>
   Checklist for code changes...
  </summary>

- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Vasil Pashov <vasil.pashov@man.com>
  • Loading branch information
vasil-pashov and Vasil Pashov committed Sep 30, 2024
1 parent 44a5a67 commit a51ff8b
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 45 deletions.
2 changes: 0 additions & 2 deletions cpp/arcticdb/pipeline/frame_slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ struct FrameSlice {
hash_bucket_(hash),
num_buckets_(num_buckets),
indices_(std::move(indices)) {
util::check(col_range.diff() > 0 || row_range.diff() > 0, "Expected non-zero column or row range");
}

FrameSlice(const ColRange& col_range, const RowRange& row_range,
Expand All @@ -93,7 +92,6 @@ struct FrameSlice {
row_range(row_range),
hash_bucket_(hash_bucket),
num_buckets_(num_buckets) {
util::check(col_range.diff() > 0 || row_range.diff() > 0, "Expected non-zero column or row range");
}

explicit FrameSlice(const SegmentInMemory& seg);
Expand Down
3 changes: 3 additions & 0 deletions cpp/arcticdb/pipeline/frame_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ size_t adjust_slice_rowcounts(std::vector<pipelines::SliceAndKey> & slice_and_ke
}

size_t get_slice_rowcounts(std::vector<pipelines::SliceAndKey> & slice_and_keys) {
if (slice_and_keys.empty()) {
return 0;
}
auto current_col = slice_and_keys[0].slice_.col_range.first;
size_t rowcount = 0u;
for (auto& slice_and_key : slice_and_keys) {
Expand Down
20 changes: 8 additions & 12 deletions cpp/arcticdb/pipeline/index_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,17 @@ inline std::vector<SliceAndKey> unfiltered_index(const index::IndexSegmentReader

template<typename RowType>
std::optional<IndexValue> index_value_from_row(const RowType &row, IndexDescriptorImpl::Type index_type, int field_num) {
std::optional<IndexValue> index_value;
switch (index_type) {
case IndexDescriptorImpl::Type::TIMESTAMP:
case IndexDescriptorImpl::Type::ROWCOUNT:
index_value = row.template scalar_at<timestamp>(field_num);
break;
case IndexDescriptorImpl::Type::STRING: {
auto opt = row.string_at(field_num);
index_value = opt ? std::make_optional<IndexValue>(std::string(opt.value())) : std::nullopt;
break;
}
default:
util::raise_rte("Unknown index type {} for column {}", int(index_type), field_num);
case IndexDescriptorImpl::Type::ROWCOUNT: return row.template scalar_at<timestamp>(field_num);
case IndexDescriptorImpl::Type::STRING: {
auto opt = row.string_at(field_num);
return opt ? std::make_optional<IndexValue>(std::string(opt.value())) : std::nullopt;
}
return index_value;
default:
util::raise_rte("Unknown index type {} for column {}", int(index_type), field_num);
}
return std::nullopt;
}

template<typename RowType>
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/read_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ inline std::optional<util::BitSet> overall_column_bitset(

inline void generate_filtered_field_descriptors(PipelineContext& context, const std::optional<std::vector<std::string>>& columns) {
if (columns.has_value()) {
std::unordered_set<std::string_view> column_set{std::begin(*columns), std::end(*columns)};
const ankerl::unordered_dense::set<std::string_view> column_set{std::begin(*columns), std::end(*columns)};

context.filter_columns_ = std::make_shared<FieldCollection>();
const auto& desc = context.descriptor();
Expand Down
7 changes: 6 additions & 1 deletion cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ std::vector<std::vector<size_t>> structure_by_row_slice(
ranges_and_keys.erase(ranges_and_keys.begin(), ranges_and_keys.begin() + start_from);

std::vector<std::vector<size_t>> res;
RowRange previous_row_range;
RowRange previous_row_range{-1, -1};
for (const auto& [idx, ranges_and_key]: folly::enumerate(ranges_and_keys)) {
RowRange current_row_range{ranges_and_key.row_range_};
if (current_row_range != previous_row_range) {
Expand Down Expand Up @@ -941,6 +941,11 @@ std::optional<std::vector<std::vector<EntityId>>> MergeClause::repartition(std::
auto compare =
[](const std::unique_ptr<SegmentWrapper> &left,
const std::unique_ptr<SegmentWrapper> &right) {
if (left->seg_.row_count() == 0) {
return false;
} else if (right->seg_.row_count() == 0) {
return true;
}
const auto left_index = index::index_value_from_row(left->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0);
const auto right_index = index::index_value_from_row(right->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0);
return left_index > right_index;
Expand Down
17 changes: 12 additions & 5 deletions cpp/arcticdb/stream/append_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,24 @@ SegmentInMemory incomplete_segment_from_frame(
auto index_tensor = std::move(frame->index_tensor);
const bool has_index = frame->has_index();
const auto index = std::move(frame->index);
SegmentInMemory output;
auto field_tensors = std::move(frame->field_tensors);

std::visit([&](const auto& idx) {
auto field_tensors = std::move(frame->field_tensors);
auto output = std::visit([&](const auto& idx) {
using IdxType = std::decay_t<decltype(idx)>;
using SingleSegmentAggregator = Aggregator<IdxType, FixedSchema, NeverSegmentPolicy>;
auto copy_prev_key = prev_key;
auto timeseries_desc = index_descriptor_from_frame(frame, existing_rows, std::move(prev_key));
util::check(!timeseries_desc.fields().empty(), "Expected fields not to be empty in incomplete segment");
auto norm_meta = timeseries_desc.proto().normalization();
auto descriptor = timeseries_desc.as_stream_descriptor();

SegmentInMemory output;
if (num_rows == 0) {
output = SegmentInMemory(FixedSchema{descriptor, index}.default_descriptor(), 0, false, false);
output.set_timeseries_descriptor(pack_timeseries_descriptor(descriptor, existing_rows, std::move(copy_prev_key), std::move(norm_meta)));
return output;
}

SingleSegmentAggregator agg{FixedSchema{descriptor, index}, [&](auto&& segment) {
auto tsd = pack_timeseries_descriptor(descriptor, existing_rows + num_rows, std::move(copy_prev_key), std::move(norm_meta));
segment.set_timeseries_descriptor(tsd);
Expand Down Expand Up @@ -194,7 +201,8 @@ SegmentInMemory incomplete_segment_from_frame(

agg.end_block_write(num_rows);
agg.commit();
}, index);
return output;
}, index);

ARCTICDB_DEBUG(log::version(), "Constructed segment from frame of {} rows and {} columns at offset {}", output.row_count(), output.num_columns(), output.offset());
return output;
Expand Down Expand Up @@ -241,7 +249,6 @@ std::vector<SliceAndKey> get_incomplete(
bool load_data) {
using namespace arcticdb::pipelines;

std::unique_ptr<TimeseriesDescriptor> unused;
auto entries = get_incomplete_append_slices_for_stream_id(store, stream_id, via_iteration, load_data);

util::variant_match(range,
Expand Down
3 changes: 3 additions & 0 deletions cpp/arcticdb/stream/merge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ void do_merge(
while (!input_streams.empty()) {
auto next = input_streams.pop_top();

if (next->row().parent_->row_count() == 0) {
continue;
}
agg.start_row(pipelines::index::index_value_from_row(next->row(), IndexDescriptorImpl::Type::TIMESTAMP, 0).value()) ([&next, add_symbol_column](auto &rb) {
if(add_symbol_column)
rb.set_scalar_by_name("symbol", std::string_view(std::get<StringId>(next->id())), DataType::UTF_DYNAMIC64);
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(7001, E_INVALID_DECIMAL_STRING) \
ERROR_CODE(7002, E_INVALID_CHAR_IN_NAME) \
ERROR_CODE(7003, E_NAME_TOO_LONG) \
ERROR_CODE(7004, E_NO_STAGED_SEGMENTS) \
ERROR_CODE(8000, E_UNRECOGNISED_COLUMN_STATS_VERSION) \
ERROR_CODE(9000, E_DECODE_ERROR) \
ERROR_CODE(9001, E_UNKNOWN_CODEC) \
Expand Down
5 changes: 4 additions & 1 deletion cpp/arcticdb/version/version_core-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,15 @@ void do_compact(
};

for(auto it = target_start; it != target_end; ++it) {
decltype(auto) sk = [&it](){
auto sk = [&it](){
if constexpr(std::is_same_v<IteratorType, pipelines::PipelineContext::iterator>)
return it->slice_and_key();
else
return *it;
}();
if (sk.slice().rows().diff() == 0) {
continue;
}
aggregator.add_segment(
std::move(sk.segment(store)),
sk.slice(),
Expand Down
47 changes: 38 additions & 9 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,22 +789,32 @@ void read_incompletes_to_pipeline(
if(incomplete_segments.empty())
return;

// In order to have the right normalization metadata and descriptor we need to find the first non-empty segment.
// Picking an empty segment when there are non-empty ones will impact the index type and column namings.
// If all segments are empty we will procede as if were appending/writing and empty dataframe.
debug::check<ErrorCode::E_ASSERTION_FAILURE>(!incomplete_segments.empty(), "Incomplete segments must be non-empty");
const auto* seg = &incomplete_segments.front().segment(store);
for (auto& slice : incomplete_segments) {
if (slice.segment(store).row_count() > 0) {
seg = &slice.segment_.value();
break;
}
}
// Mark the start point of the incompletes, so we know that there is no column slicing after this point
pipeline_context->incompletes_after_ = pipeline_context->slice_and_keys_.size();

// If there are only incompletes we need to add the index here
if(pipeline_context->slice_and_keys_.empty()) {
add_index_columns_to_query(read_query, incomplete_segments.begin()->segment(store).index_descriptor());
add_index_columns_to_query(read_query, seg->index_descriptor());
}

auto first_seg = incomplete_segments.begin()->segment(store);

if (!pipeline_context->desc_)
pipeline_context->desc_ = first_seg.descriptor();
pipeline_context->desc_ = seg->descriptor();

if (!pipeline_context->norm_meta_) {
pipeline_context->norm_meta_ = std::make_unique<arcticdb::proto::descriptors::NormalizationMetadata>();
auto segment_tsd = first_seg.index_descriptor();
pipeline_context->norm_meta_->CopyFrom(segment_tsd.proto().normalization());
pipeline_context->norm_meta_->CopyFrom(seg->index_descriptor().proto().normalization());
ensure_timeseries_norm_meta(*pipeline_context->norm_meta_, pipeline_context->stream_id_, sparsify);
}

Expand Down Expand Up @@ -847,11 +857,16 @@ void check_incompletes_index_ranges_dont_overlap(const std::shared_ptr<PipelineC
// Use ordered set so we only need to compare adjacent elements
std::set<TimestampRange> unique_timestamp_ranges;
for (auto it = pipeline_context->incompletes_begin(); it!= pipeline_context->end(); it++) {
if (it->slice_and_key().slice().rows().diff() == 0) {
continue;
}
sorting::check<ErrorCode::E_UNSORTED_DATA>(
!last_existing_index_value.has_value() || it->slice_and_key().key().start_time() >= *last_existing_index_value,
"Cannot append staged segments to existing data as incomplete segment contains index value < existing data (in UTC): {} <= {}",
date_and_time(it->slice_and_key().key().start_time()),
date_and_time(*last_existing_index_value));
// Should never reach "" but the standard mandates that all function arguments are evaluated
last_existing_index_value ? date_and_time(*last_existing_index_value) : ""
);
auto [_, inserted] = unique_timestamp_ranges.insert({it->slice_and_key().key().start_time(), it->slice_and_key().key().end_time()});
// This is correct because incomplete segments aren't column sliced
sorting::check<ErrorCode::E_UNSORTED_DATA>(
Expand Down Expand Up @@ -1278,6 +1293,10 @@ VersionedItem sort_merge_impl(
options.convert_int_to_float_,
options.via_iteration_,
options.sparsify_);
user_input::check<ErrorCode::E_NO_STAGED_SEGMENTS>(
pipeline_context->slice_and_keys_.size() > 0,
"Finalizing staged data is not allowed with empty staging area"
);

std::vector<entity::VariantKey> delete_keys;
for(auto sk = pipeline_context->incompletes_begin(); sk != pipeline_context->end(); ++sk) {
Expand All @@ -1299,6 +1318,15 @@ VersionedItem sort_merge_impl(

read_query.clauses_.emplace_back(std::make_shared<Clause>(MergeClause{timeseries_index, SparseColumnPolicy{}, stream_id, pipeline_context->descriptor()}));
auto segments = read_and_process(store, pipeline_context, read_query, ReadOptions{}, pipeline_context->incompletes_after());
if (options.append_ && update_info.previous_index_key_.has_value() &&
update_info.previous_index_key_->end_time() - 1 > std::get<timestamp>(TimeseriesIndex::start_value_for_segment(segments[0].segment_.value()))) {
store->remove_keys(delete_keys).get();
sorting::raise<ErrorCode::E_UNSORTED_DATA>(
"Cannot append staged segments to existing data as incomplete segment contains index value < existing data (in UTC): {} <= {}",
date_and_time(std::get<timestamp>(TimeseriesIndex::start_value_for_segment(segments[0].segment_.value()))),
date_and_time(update_info.previous_index_key_->end_time() - 1)
);
}
pipeline_context->total_rows_ = num_versioned_rows + get_slice_rowcounts(segments);

auto index = index_type_from_descriptor(pipeline_context->descriptor());
Expand Down Expand Up @@ -1375,9 +1403,10 @@ VersionedItem compact_incomplete_impl(
options.convert_int_to_float_,
options.via_iteration_,
options.sparsify_);
if (pipeline_context->slice_and_keys_.size() == prev_size) {
util::raise_rte("No incomplete segments found for {}", stream_id);
}
user_input::check<ErrorCode::E_NO_STAGED_SEGMENTS>(
pipeline_context->slice_and_keys_.size() != prev_size,
"Finalizing staged data is not allowed with empty staging area"
);
if (options.validate_index_) {
check_incompletes_index_ranges_dont_overlap(pipeline_context, previous_sorted_value);
}
Expand Down
5 changes: 3 additions & 2 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ def compact_incomplete(
metadata: Optional[Any] = None,
prune_previous_version: Optional[bool] = None,
validate_index: bool = False,
):
) -> VersionedItem:
"""
Compact previously written un-indexed chunks of data, produced by a tick collector or parallel
writes/appends.
Expand Down Expand Up @@ -1988,9 +1988,10 @@ def compact_incomplete(
"prune_previous_version", self._write_options(), global_default=False, existing_value=prune_previous_version
)
udm = normalize_metadata(metadata) if metadata is not None else None
return self.version_store.compact_incomplete(
vit = self.version_store.compact_incomplete(
symbol, append, convert_int_to_float, via_iteration, sparsify, udm, prune_previous_version, validate_index
)
return self._convert_thin_cxx_item_to_python(vit, metadata)

@staticmethod
def _get_index_columns_from_descriptor(descriptor):
Expand Down
29 changes: 23 additions & 6 deletions python/arcticdb/version_store/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import pandas as pd
import numpy as np
import logging
from arcticdb.version_store._normalization import normalize_metadata

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -902,7 +903,7 @@ def finalize_staged_data(
prune_previous_versions: bool = False,
metadata: Any = None,
validate_index=True,
):
) -> VersionedItem:
"""
Finalizes staged data, making it available for reads.
Expand All @@ -924,12 +925,18 @@ def finalize_staged_data(
are non-overlapping with each other, and, in the case of `StagedDataFinalizeMethod.APPEND`, fall after the
last index value in the previous version.
Returns
-------
VersionedItem
Structure containing metadata and version number of the written symbol in the store.
The data member will be None.
See Also
--------
write
Documentation on the ``staged`` parameter explains the concept of staged data in more detail.
"""
self._nvs.compact_incomplete(
return self._nvs.compact_incomplete(
symbol,
append=mode == StagedDataFinalizeMethod.APPEND,
convert_int_to_float=False,
Expand All @@ -943,7 +950,8 @@ def sort_and_finalize_staged_data(
symbol: str,
mode: Optional[StagedDataFinalizeMethod] = StagedDataFinalizeMethod.WRITE,
prune_previous_versions: bool = False,
):
metadata: Any = None
) -> VersionedItem:
"""
sort_merge will sort and finalize staged data. This differs from `finalize_staged_data` in that it
can support staged segments with interleaved time periods - the end result will be ordered. This requires
Expand All @@ -961,18 +969,27 @@ def sort_and_finalize_staged_data(
prune_previous_versions : bool, default=False
Removes previous (non-snapshotted) versions from the database.
metadata : Any, default=None
Optional metadata to persist along with the symbol.
Returns
-------
VersionedItem
Structure containing metadata and version number of the written symbol in the store.
The data member will be None.
See Also
--------
write
Documentation on the ``staged`` parameter explains the concept of staged data in more detail.
"""

self._nvs.version_store.sort_merge(
vit = self._nvs.version_store.sort_merge(
symbol,
None,
normalize_metadata(metadata) if metadata is not None else None,
mode == StagedDataFinalizeMethod.APPEND,
prune_previous_versions=prune_previous_versions,
)
return self._nvs._convert_thin_cxx_item_to_python(vit, metadata)

def get_staged_symbols(self) -> List[str]:
"""
Expand Down
Loading

0 comments on commit a51ff8b

Please sign in to comment.