Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix 1683: Add validate_index arg to staged data finalization in both V1 and V2 APIs #1694

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions cpp/arcticdb/stream/append_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,13 @@ folly::Future<arcticdb::entity::VariantKey> write_incomplete_frame(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index,
std::optional<AtomKey>&& next_key) {
using namespace arcticdb::pipelines;

if (!index_is_not_timeseries_or_is_sorted_ascending(*frame)) {
sorting::raise<ErrorCode::E_UNSORTED_DATA>("When writing/appending staged data in parallel, input data must be sorted.");
}
sorting::check<ErrorCode::E_UNSORTED_DATA>(
!validate_index || index_is_not_timeseries_or_is_sorted_ascending(*frame),
"When writing/appending staged data in parallel, input data must be sorted.");

auto index_range = frame->index_range;
auto segment = incomplete_segment_from_frame(frame, 0, std::move(next_key), false);
Expand All @@ -225,9 +226,10 @@ folly::Future<arcticdb::entity::VariantKey> write_incomplete_frame(
void write_parallel(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame) {
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) {
// TODO: dynamic bucketize doesn't work with incompletes
(void)write_incomplete_frame(store, stream_id, frame, std::nullopt).get();
(void)write_incomplete_frame(store, stream_id, frame, validate_index, std::nullopt).get();
}

std::vector<SliceAndKey> get_incomplete(
Expand Down Expand Up @@ -367,7 +369,8 @@ AppendMapEntry entry_from_key(const std::shared_ptr<StreamSource>& store, const
void append_incomplete(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame) {
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) {
using namespace arcticdb::proto::descriptors;
using namespace arcticdb::stream;
ARCTICDB_SAMPLE_DEFAULT(AppendIncomplete)
Expand All @@ -377,7 +380,7 @@ void append_incomplete(
const auto num_rows = frame->num_rows;
total_rows += num_rows;
auto desc = frame->desc.clone();
auto new_key = write_incomplete_frame(store, stream_id, frame, std::move(next_key)).get();
auto new_key = write_incomplete_frame(store, stream_id, frame, validate_index, std::move(next_key)).get();


ARCTICDB_DEBUG(log::version(), "Wrote incomplete frame for stream {}, {} rows, total rows {}", stream_id, num_rows, total_rows);
Expand Down
6 changes: 4 additions & 2 deletions cpp/arcticdb/stream/append_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ void remove_incomplete_segments(
void write_parallel(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<pipelines::InputTensorFrame>& frame);
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
bool validate_index);

void write_head(
const std::shared_ptr<Store>& store,
Expand All @@ -68,7 +69,8 @@ void append_incomplete_segment(
void append_incomplete(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
const std::shared_ptr<pipelines::InputTensorFrame>& frame);
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
bool validate_index);

std::optional<int64_t> latest_incomplete_timestamp(
const std::shared_ptr<Store>& store,
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/stream/test/test_append_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TEST(Append, Simple) {
auto wrapper = get_test_simple_frame(stream_id, 10, 0);
auto& frame = wrapper.frame_;
auto desc = frame->desc.clone();
append_incomplete(store, stream_id, frame);
append_incomplete(store, stream_id, frame, true);
pipelines::FilterRange range;
auto pipeline_context = std::make_shared<PipelineContext>(desc);
pipeline_context->selected_columns_ = util::BitSet(2);
Expand Down
29 changes: 12 additions & 17 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,8 +978,9 @@ std::set<StreamId> LocalVersionedEngine::get_active_incomplete_refs() {

void LocalVersionedEngine::append_incomplete_frame(
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame) const {
arcticdb::append_incomplete(store_, stream_id, frame);
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) const {
arcticdb::append_incomplete(store_, stream_id, frame, validate_index);
}

void LocalVersionedEngine::append_incomplete_segment(
Expand All @@ -990,27 +991,21 @@ void LocalVersionedEngine::append_incomplete_segment(

void LocalVersionedEngine::write_parallel_frame(
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame) const {
write_parallel(store_, stream_id, frame);
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) const {
write_parallel(store_, stream_id, frame, validate_index);
}

VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
const StreamId& stream_id,
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
bool append,
bool convert_int_to_float,
bool via_iteration,
bool sparsify,
bool prune_previous_versions) {
const CompactIncompleteOptions& options) {
log::version().debug("Compacting incomplete symbol {}", stream_id);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{});
auto versioned_item = compact_incomplete_impl(
store_, stream_id, user_meta, update_info,
append, convert_int_to_float, via_iteration, sparsify, get_write_options());
auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options());

write_version_and_prune_previous(
prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);

if(cfg_.symbol_list())
symbol_list().add_symbol(store_, stream_id, update_info.next_version_id_);
Expand Down Expand Up @@ -1683,10 +1678,10 @@ std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> Local
VersionedItem LocalVersionedEngine::sort_merge_internal(
const StreamId& stream_id,
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const SortMergeOptions& option) {
const CompactIncompleteOptions& options) {
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{});
auto versioned_item = sort_merge_impl(store_, stream_id, user_meta, update_info, option.append_, option.convert_int_to_float_, option.via_iteration_, option.sparsify_);
write_version_and_prune_previous(option.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
auto versioned_item = sort_merge_impl(store_, stream_id, user_meta, update_info, options);
write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
}

Expand Down
22 changes: 6 additions & 16 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ struct KeySizesInfo {
size_t uncompressed_size; // bytes
};

struct SortMergeOptions {
bool append_;
bool convert_int_to_float_;
bool via_iteration_;
bool sparsify_;
bool prune_previous_versions_;
};

folly::Future<folly::Unit> delete_trees_responsibly(
std::shared_ptr<Store> store,
std::shared_ptr<VersionMap> &version_map,
Expand Down Expand Up @@ -112,7 +104,8 @@ class LocalVersionedEngine : public VersionedEngine {

void append_incomplete_frame(
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame) const override;
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) const override;

void remove_incomplete(
const StreamId& stream_id
Expand Down Expand Up @@ -163,7 +156,8 @@ class LocalVersionedEngine : public VersionedEngine {

void write_parallel_frame(
const StreamId& stream_id,
const std::shared_ptr<InputTensorFrame>& frame) const override;
const std::shared_ptr<InputTensorFrame>& frame,
bool validate_index) const override;

void delete_tree(
const std::vector<IndexTypeKey>& idx_to_be_deleted,
Expand Down Expand Up @@ -262,7 +256,7 @@ class LocalVersionedEngine : public VersionedEngine {
VersionedItem sort_merge_internal(
const StreamId& stream_id,
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const SortMergeOptions& option);
const CompactIncompleteOptions& options);

std::vector<folly::Future<AtomKey>> batch_write_internal(
const std::vector<VersionId>& version_ids,
Expand Down Expand Up @@ -419,11 +413,7 @@ class LocalVersionedEngine : public VersionedEngine {
VersionedItem compact_incomplete_dynamic(
const StreamId& stream_id,
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
bool append,
bool convert_int_to_float,
bool via_iteration,
bool sparsify,
bool prune_previous_versions);
const CompactIncompleteOptions& options);

/**
* Take tombstoned indexes that have been pruned in the version map and perform the actual deletion
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
py::arg("sparsify") = false,
py::arg("user_meta") = std::nullopt,
py::arg("prune_previous_versions") = false,
py::arg("validate_index") = false,
py::call_guard<SingleThreadMutexHolder>(), "Compact incomplete segments")
.def("sort_merge",
&PythonVersionStore::sort_merge,
Expand Down
21 changes: 15 additions & 6 deletions cpp/arcticdb/version/test/test_version_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ TEST_F(VersionStoreTest, SortMerge) {
using namespace arcticdb::storage;
using namespace arcticdb::stream;
using namespace arcticdb::pipelines;
using namespace arcticdb::version_store;

size_t count = 0;

Expand All @@ -191,10 +192,18 @@ TEST_F(VersionStoreTest, SortMerge) {
std::shuffle(data.begin(), data.end(), mt);

for(auto&& frame : data) {
test_store_->append_incomplete_frame(symbol, std::move(frame.input_frame_));
test_store_->append_incomplete_frame(symbol, std::move(frame.input_frame_), true);
}

test_store_->sort_merge_internal(symbol, std::nullopt, arcticdb::version_store::SortMergeOptions{true, false, false, false, false});
CompactIncompleteOptions options{
.prune_previous_versions_=false,
.append_=true,
.convert_int_to_float_=false,
.via_iteration_=false,
.sparsify_=false
};

test_store_->sort_merge_internal(symbol, std::nullopt, options);
}

TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) {
Expand Down Expand Up @@ -238,7 +247,7 @@ TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) {
std::shuffle(data.begin(), data.end(), mt);

for(auto& frame : data) {
test_store_->write_parallel_frame(symbol, std::move(frame.input_frame_));
test_store_->write_parallel_frame(symbol, std::move(frame.input_frame_), true);
}

auto vit = test_store_->compact_incomplete(symbol, false, false, true, false);
Expand Down Expand Up @@ -283,17 +292,17 @@ TEST_F(VersionStoreTest, GetIncompleteSymbols) {
std::string stream_id1{"thing1"};
auto wrapper1 = get_test_simple_frame(stream_id1, 15, 2);
auto& frame1 = wrapper1.frame_;
test_store_->append_incomplete_frame(stream_id1, std::move(frame1));
test_store_->append_incomplete_frame(stream_id1, std::move(frame1), true);

std::string stream_id2{"thing2"};
auto wrapper2 = get_test_simple_frame(stream_id2, 15, 2);
auto& frame2 = wrapper2.frame_;
test_store_->append_incomplete_frame(stream_id2, std::move(frame2));
test_store_->append_incomplete_frame(stream_id2, std::move(frame2), true);

std::string stream_id3{"thing3"};
auto wrapper3 = get_test_simple_frame(stream_id3, 15, 2);
auto& frame3 = wrapper3.frame_;
test_store_->append_incomplete_frame(stream_id3, std::move(frame3));
test_store_->append_incomplete_frame(stream_id3, std::move(frame3), true);

std::set<StreamId> expected{ stream_id1, stream_id2, stream_id3};
auto result = test_store_->get_incomplete_symbols();
Expand Down
Loading
Loading