Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async update method. Improve the performance of update by parallelising reads. #2087

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

vasil-pashov
Copy link
Collaborator

@vasil-pashov vasil-pashov commented Dec 19, 2024

Reference Issues/PRs

Implement async_update_impl function which returns a future. The synchronous version for update just calls it and waits for the future just like append does.

This keeps most of the code for update the same, however instead of calling .get on futures it will chain then and return a future. In the process of doing this the reads needed by update were made in parallel. Thus the regular update will have improved performance.

Slight refactor of C++ unit tests, using std::array instead of std::vector for fixed size collections and placing const and constexpr specifiers. No functional changes.

What does this implement or fix?

Any other comments?

Checklist

Checklist for code changes...
  • Have you updated the relevant docstrings, documentation and copyright notice?
  • Is this contribution tested against all ArcticDB's features?
  • Do all exceptions introduced raise appropriate error messages?
  • Are API changes highlighted in the PR description?
  • Is the PR labelled as enhancement or bug so it appears in autogenerated release notes?

@vasil-pashov vasil-pashov marked this pull request as ready for review December 20, 2024 14:54
auto versioned_item = VersionedItem(to_atom(std::move(version_key)));
ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", stream_id, update_info.next_version_id_);
ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", frame->desc.id(), update_info.next_version_id_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this set on the input frame? If not, can use the stream ID from the version_key

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. PythonVersionStore::update passes the result of convert::py_ndf_to_frame to update_internal

@@ -20,8 +20,13 @@ using namespace arcticdb::proto::descriptors;
namespace arcticdb::pipelines::index {

IndexSegmentReader get_index_reader(const AtomKey &prev_index, const std::shared_ptr<Store> &store) {
auto [key, seg] = store->read_sync(prev_index);
return index::IndexSegmentReader{std::move(seg)};
return async_get_index_reader(prev_index, store).get();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave this implementation using read_sync, so that the scheduling overhead can be avoided if necessary

@@ -61,8 +61,7 @@ void foreach_active_bit(const util::BitSet &bs, C &&visitor) {
}
}

template<typename ContainerType>
std::vector<SliceAndKey> filter_index(const ContainerType &container, std::optional<CombinedQuery<ContainerType>> &&query) {
inline std::vector<SliceAndKey> filter_index(const index::IndexSegmentReader& container, std::optional<CombinedQuery<index::IndexSegmentReader>> &&query) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the template parameter always resolved to IndexSegmentReader? Maybe rename container to match as well now?

std::back_inserter(unaffected_keys));

util::check(affected_keys.size() + unaffected_keys.size() == index_segment_reader.size(), "Unaffected vs affected keys split was inconsistent {} + {} != {}",
return index::async_get_index_reader(*(update_info.previous_index_key_), store).thenValue([=](index::IndexSegmentReader&& index_segment_reader) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we capturing everything by copy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole implementation is inside async_get_index_reader and we need to propagate the input params. I used it as a shorthand. In theory, options can be moved but it's consisted only of PODs so it won't do anything.

frame,
get_slicing_policy(options, *frame),
IndexPartialKey{stream_id, update_info.next_version_id_}, store
).thenValue([
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slice_and_write finishes on the IO executor, but I think we want to be on CPU for the next task

std::move(output)
);
return SliceAndKey{std::move(new_slice), std::get<AtomKey>(std::move(fut_key).get())};
return store->read(existing.key()).thenValue([=](std::pair<VariantKey, SegmentInMemory>&& key_segment) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capture by copy?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also be a thenValueInline

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #2087 (comment) the implementation is in the lambda and it uses all variables. So I need to pass them to the future. Can't capture by ref as they'll die when this is put in the queue and the function returns.

const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return folly::Future<std::optional<SliceAndKey>>{std::nullopt};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs folly::Future, if you specify the return type of the lambda can probably just return std::nullopt

IndexRange original;
};

folly::Future<AtomKey> async_update_impl(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This methods a bit of a monster now, if there's a clean way to break it up a bit it should get more readable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't think way to split things in logically rather than pulling them for the sake of splitting. I'll give it another try, now that everything compiles and all tests are passing there will be less unknowns.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants