Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async update method. Improve the performance of update by parallelising reads. #2087

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cpp/arcticdb/pipeline/index_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ IndexSegmentReader get_index_reader(const AtomKey &prev_index, const std::shared
return index::IndexSegmentReader{std::move(seg)};
}

folly::Future<IndexSegmentReader> async_get_index_reader(const AtomKey &prev_index, const std::shared_ptr<Store> &store) {
return store->read(prev_index).thenValueInline([](std::pair<VariantKey, SegmentInMemory>&& key_seg) {
return IndexSegmentReader{std::move(key_seg.second)};
});
}

IndexSegmentReader::IndexSegmentReader(SegmentInMemory&& s) :
seg_(std::move(s)) {
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/arcticdb/pipeline/index_segment_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
#pragma once

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/pipeline/index_fields.hpp>

#include <boost/noncopyable.hpp>

#include <cstdint>
#include <folly/futures/Future.h>

namespace arcticdb {
class Store;
Expand Down Expand Up @@ -135,6 +131,10 @@ index::IndexSegmentReader get_index_reader(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);

folly::Future<IndexSegmentReader> async_get_index_reader(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);

IndexRange get_index_segment_range(
const AtomKey &prev_index,
const std::shared_ptr<Store> &store);
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,8 @@ TimeseriesDescriptor get_merged_tsd(
);
}

bool is_timeseries_index(const IndexDescriptorImpl& index_desc) {
return index_desc.type() == IndexDescriptor::Type::TIMESTAMP || index_desc.type() == IndexDescriptor::Type::EMPTY;
}

} //namespace arcticdb::pipelines::index
2 changes: 2 additions & 0 deletions cpp/arcticdb/pipeline/index_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,6 @@ TimeseriesDescriptor get_merged_tsd(
const TimeseriesDescriptor& existing_tsd,
const std::shared_ptr<pipelines::InputTensorFrame>& new_frame);

[[nodiscard]] bool is_timeseries_index(const IndexDescriptorImpl& index_desc);

} //namespace arcticdb::pipelines::index
40 changes: 17 additions & 23 deletions cpp/arcticdb/pipeline/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@
#include <arcticdb/util/bitset.hpp>
#include <arcticdb/entity/index_range.hpp>
#include <arcticdb/processing/expression_context.hpp>
#include <arcticdb/entity/versioned_item.hpp>
#include <arcticdb/pipeline/python_output_frame.hpp>
#include <arcticdb/pipeline/write_frame.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/util/constructors.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/pipeline/index_segment_reader.hpp>
#include <arcticdb/pipeline/input_tensor_frame.hpp>
#include <arcticdb/pipeline/read_options.hpp>
#include <arcticdb/stream/stream_utils.hpp>
#include <arcticdb/processing/clause.hpp>
#include <arcticdb/util/simple_string_hash.hpp>
Expand All @@ -28,9 +23,12 @@
#include <vector>
#include <string>
#include <variant>
#include <ranges>

namespace arcticdb::pipelines {

namespace ranges = std::ranges;

using FilterRange = std::variant<std::monostate, IndexRange, RowRange>;

/*
Expand Down Expand Up @@ -405,41 +403,37 @@ inline FilterRange get_query_index_range(
return RowRange{std::get<NumericIndex>(index_range.start_), std::get<NumericIndex>(index_range.end_)};
}

inline std::vector<SliceAndKey> strictly_before(const FilterRange &range, const std::vector<SliceAndKey> &input) {
inline std::vector<SliceAndKey> strictly_before(const FilterRange &range, std::span<const SliceAndKey> input) {
std::vector<SliceAndKey> output;
util::variant_match(range,
[&](const RowRange &row_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.slice_.row_range.second < row_range.first;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.slice_.row_range.second < row_range.first;
});
},
[&](const IndexRange &index_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.key().index_range().end_ < index_range.start_;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.key().index_range().end_ < index_range.start_;
});
},
[&](const auto &) {
util::raise_rte("Expected specified range ");
});
return output;
}

inline std::vector<SliceAndKey> strictly_after(const FilterRange &range, const std::vector<SliceAndKey> &input) {
inline std::vector<SliceAndKey> strictly_after(const FilterRange &range, std::span<const SliceAndKey> input) {
std::vector<SliceAndKey> output;
util::variant_match(range,
[&input, &output](const RowRange &row_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.slice_.row_range.first > row_range.second;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.slice_.row_range.first > row_range.second;
});
},
[&input, &output](const IndexRange &index_range) {
std::copy_if(std::begin(input), std::end(input), std::back_inserter(output),
[&](const auto &sk) {
return sk.key().index_range().start_ > index_range.end_;
});
ranges::copy_if(input, std::back_inserter(output), [&](const auto &sk) {
return sk.key().index_range().start_ > index_range.end_;
});
},
[](const auto &) {
util::raise_rte("Expected specified range ");
Expand Down
33 changes: 10 additions & 23 deletions cpp/arcticdb/pipeline/read_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,9 @@
#pragma once

#include <variant>

#include <folly/futures/Future.h>
#include <boost/noncopyable.hpp>

#include <arcticdb/entity/types.hpp>
#include <arcticdb/stream/index.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <pybind11/pybind11.h>

#include <arcticdb/stream/stream_sink.hpp>
#include <arcticdb/stream/stream_source.hpp>
#include <arcticdb/entity/native_tensor.hpp>
#include <arcticdb/entity/performance_tracing.hpp>
#include <arcticdb/entity/atom_key.hpp>
#include <arcticdb/util/bitset.hpp>
#include <arcticdb/util/constructors.hpp>
#include <folly/executors/FutureExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/pipeline/python_output_frame.hpp>
#include <arcticdb/pipeline/query.hpp>
Expand Down Expand Up @@ -61,22 +46,24 @@ void foreach_active_bit(const util::BitSet &bs, C &&visitor) {
}
}

template<typename ContainerType>
std::vector<SliceAndKey> filter_index(const ContainerType &container, std::optional<CombinedQuery<ContainerType>> &&query) {
inline std::vector<SliceAndKey> filter_index(
const index::IndexSegmentReader& index_segment_reader,
std::optional<CombinedQuery<index::IndexSegmentReader>> &&query
) {
ARCTICDB_SAMPLE_DEFAULT(FilterIndex)
std::vector<SliceAndKey> output{};
if (container.size()> 0) {
if (!index_segment_reader.empty()) {
if(query) {
auto row_bitset = (*query)(container);
auto row_bitset = (*query)(index_segment_reader);
ARCTICDB_DEBUG(log::version(), "Row bitset has {} bits set of {}", row_bitset->count(), row_bitset->size());
output.reserve(row_bitset->count());
foreach_active_bit(*row_bitset, [&](auto r) {
output.emplace_back(get_row(container, r));
output.emplace_back(get_row(index_segment_reader, r));
});
} else {
output.reserve(container.size());
for(auto i = 0u; i < container.size(); ++i) {
output.emplace_back(get_row(container, i));
output.reserve(index_segment_reader.size());
for(auto i = 0u; i < index_segment_reader.size(); ++i) {
output.emplace_back(get_row(index_segment_reader, i));
}
}
}
Expand Down
70 changes: 38 additions & 32 deletions cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
#include <arcticdb/stream/aggregator.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/python/python_types.hpp>
#include <arcticdb/pipeline/frame_utils.hpp>
#include <arcticdb/pipeline/write_frame.hpp>
#include <arcticdb/stream/append_map.hpp>
#include <arcticdb/async/task_scheduler.hpp>
#include <arcticdb/util/format_date.hpp>
#include <vector>
#include <array>
#include <ranges>


namespace arcticdb::pipelines {

using namespace arcticdb::entity;
using namespace arcticdb::stream;
namespace ranges = std::ranges;

WriteToSegmentTask::WriteToSegmentTask(
std::shared_ptr<InputTensorFrame> frame,
Expand Down Expand Up @@ -252,40 +253,46 @@ static RowRange partial_rewrite_row_range(
}
}

std::optional<SliceAndKey> rewrite_partial_segment(
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
const SliceAndKey& existing,
const IndexRange& index_range,
VersionId version_id,
AffectedSegmentPart affected_part,
const std::shared_ptr<Store>& store) {
const auto& key = existing.key();
auto kv = store->read(key).get();
const SegmentInMemory& segment = kv.second;
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return std::nullopt;
}
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
// +1 as in the key we store one nanosecond greater than the last index value in the segment
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
FrameSlice new_slice{
std::make_shared<StreamDescriptor>(output.descriptor()),
existing.slice_.col_range,
RowRange{0, num_rows},
existing.slice_.hash_bucket(),
existing.slice_.num_buckets()};

auto fut_key = store->write(
key.type(),
return store->read(existing.key()).thenValueInline([
existing,
index_range,
version_id,
key.id(),
start_ts,
end_ts,
std::move(output)
);
return SliceAndKey{std::move(new_slice), std::get<AtomKey>(std::move(fut_key).get())};
affected_part,
store](std::pair<VariantKey, SegmentInMemory>&& key_segment) -> folly::Future<std::optional<SliceAndKey>> {
const auto& key = existing.key();
const SegmentInMemory& segment = key_segment.second;
const RowRange affected_row_range = partial_rewrite_row_range(segment, index_range, affected_part);
const int64_t num_rows = affected_row_range.end() - affected_row_range.start();
if (num_rows <= 0) {
return std::nullopt;
}
SegmentInMemory output = segment.truncate(affected_row_range.start(), affected_row_range.end(), true);
const IndexValue start_ts = TimeseriesIndex::start_value_for_segment(output);
// +1 as in the key we store one nanosecond greater than the last index value in the segment
const IndexValue end_ts = std::get<NumericIndex>(TimeseriesIndex::end_value_for_segment(output)) + 1;
FrameSlice new_slice{
std::make_shared<StreamDescriptor>(output.descriptor()),
existing.slice_.col_range,
RowRange{0, num_rows},
existing.slice_.hash_bucket(),
existing.slice_.num_buckets()};
return store->write(
key.type(),
version_id,
key.id(),
start_ts,
end_ts,
std::move(output)
).thenValueInline([new_slice=std::move(new_slice)](VariantKey&& k) {
return std::make_optional<SliceAndKey>(std::move(new_slice), std::get<AtomKey>(std::move(k)));
});
});
}

std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<SliceAndKey>, 5>& groups, size_t& global_count) {
Expand All @@ -301,10 +308,9 @@ std::vector<SliceAndKey> flatten_and_fix_rows(const std::array<std::vector<Slice
return std::max(a, sk.slice_.row_range.second);
});

std::transform(std::begin(group), std::end(group), std::back_inserter(output), [&](SliceAndKey sk) {
ranges::transform(group, std::back_inserter(output), [&](SliceAndKey sk) {
auto range_start = global_count + (sk.slice_.row_range.first - group_start);
auto new_range = RowRange{range_start, range_start + (sk.slice_.row_range.diff())};
sk.slice_.row_range = new_range;
sk.slice_.row_range = RowRange{range_start, range_start + sk.slice_.row_range.diff()};
return sk;
});

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/write_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ enum class AffectedSegmentPart {
END
};

std::optional<SliceAndKey> rewrite_partial_segment(
folly::Future<std::optional<SliceAndKey>> async_rewrite_partial_segment(
const SliceAndKey& existing,
const IndexRange& index_range,
VersionId version_id,
Expand Down
4 changes: 1 addition & 3 deletions cpp/arcticdb/processing/test/benchmark_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ using namespace arcticdb;

SegmentInMemory get_segment_for_merge(const StreamId &id, size_t num_rows, size_t start, size_t step){
auto segment = SegmentInMemory{
get_test_descriptor<stream::TimeseriesIndex>(id, {
scalar_field(DataType::UINT8, "column")
}),
get_test_descriptor<stream::TimeseriesIndex>(id, std::array{scalar_field(DataType::UINT8, "column")}),
num_rows
};
auto& index_col = segment.column(0);
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/stream/stream_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#pragma once

#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/codec/segment.hpp>
#include <arcticdb/storage/storage.hpp>
#include <arcticdb/storage/storage_options.hpp>
#include <arcticdb/async/batch_read_args.hpp>
Expand Down
Loading
Loading