Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-2.27] Revert improve readers by parallelizing I/O and compute operations. (#5471) #5472

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions test/src/unit-ReadCellSlabIter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,7 @@ void set_result_tile_dim(
std::nullopt,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
result_tile.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
4 changes: 2 additions & 2 deletions test/src/unit-cppapi-consolidation-with-timestamps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down Expand Up @@ -685,7 +685,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down
20 changes: 4 additions & 16 deletions test/src/unit-result-tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,7 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -233,10 +230,7 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down Expand Up @@ -332,10 +326,7 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -352,10 +343,7 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
12 changes: 5 additions & 7 deletions test/src/unit-sparse-global-order-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2202,10 +2202,9 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (50000 * 0.11 / 2 fragments = 2750), so only one result
// tile will be loaded each time.
memory_.total_budget_ = "60000";
// Two result tile (2 * (~3000 + 8) will be bigger than the per fragment
// budget (1000).
memory_.total_budget_ = "35000";
memory_.ratio_coords_ = "0.11";
update_config();

Expand Down Expand Up @@ -2728,9 +2727,8 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (40000 * 0.22 /2 frag = 4400), so only one will be loaded
// each time.
// Two result tile (2 * (~4000 + 8) will be bigger than the per fragment
// budget (1000).
memory_.total_budget_ = "40000";
memory_.ratio_coords_ = "0.22";
update_config();
Expand Down
7 changes: 2 additions & 5 deletions test/src/unit-sparse-unordered-with-dups-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,12 +1064,9 @@ TEST_CASE_METHOD(

if (one_frag) {
CHECK(1 == loop_num->second);
} else {
CHECK(9 == loop_num->second);
}
/**
* We can't do a similar check for multiple fragments as it is architecture
* dependent how many tiles fit in the memory budget. And thus also
* architecture dependent as to how many internal loops we have.
*/

// Try to read multiple frags without partial tile offset reading. Should
// fail
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/compression_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ Status CompressionFilter::decompress_var_string_coords(
auto output_view = span<std::byte>(
reinterpret_cast<std::byte*>(output_buffer->data()), uncompressed_size);
auto offsets_view = span<uint64_t>(
offsets_tile->data_as_unsafe<offsets_t>(), uncompressed_offsets_size);
offsets_tile->data_as<offsets_t>(), uncompressed_offsets_size);

if (compressor_ == Compressor::RLE) {
uint8_t rle_len_bytesize, string_len_bytesize;
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/filter/filter_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ Status FilterPipeline::run_reverse(
// If the pipeline is empty, just copy input to output.
if (filters_.empty()) {
void* output_chunk_buffer =
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(input_data.copy_to(output_chunk_buffer));
continue;
}
Expand All @@ -487,7 +487,7 @@ Status FilterPipeline::run_reverse(
bool last_filter = filter_idx == 0;
if (last_filter) {
void* output_chunk_buffer =
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(output_data.set_fixed_allocation(
output_chunk_buffer, chunk.unfiltered_data_size_));
reader_stats->add_counter(
Expand Down
3 changes: 1 addition & 2 deletions tiledb/sm/filter/test/filter_test_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ Tile create_tile_for_unfiltering(
tile->cell_size() * nelts,
tile->filtered_buffer().data(),
tile->filtered_buffer().size(),
tracker,
std::nullopt};
tracker};
}

void run_reverse(
Expand Down
3 changes: 1 addition & 2 deletions tiledb/sm/filter/test/tile_data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ class TileDataGenerator {
original_tile_size(),
filtered_buffer.data(),
filtered_buffer.size(),
memory_tracker,
std::nullopt);
memory_tracker);
}

/** Returns the size of the original unfiltered data. */
Expand Down
9 changes: 3 additions & 6 deletions tiledb/sm/metadata/test/unit_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ TEST_CASE(
tile1->size(),
tile1->filtered_buffer().data(),
tile1->filtered_buffer().size(),
tracker,
ThreadPool::SharedTask());
tracker);
memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size());

metadata_tiles[1] = tdb::make_shared<Tile>(
Expand All @@ -136,8 +135,7 @@ TEST_CASE(
tile2->size(),
tile2->filtered_buffer().data(),
tile2->filtered_buffer().size(),
tracker,
ThreadPool::SharedTask());
tracker);
memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size());

metadata_tiles[2] = tdb::make_shared<Tile>(
Expand All @@ -149,8 +147,7 @@ TEST_CASE(
tile3->size(),
tile3->filtered_buffer().data(),
tile3->filtered_buffer().size(),
tracker,
ThreadPool::SharedTask());
tracker);
memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size());

meta = Metadata::deserialize(metadata_tiles);
Expand Down
59 changes: 27 additions & 32 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,6 @@ Status DenseReader::dense_read() {
// processing.
if (qc_coords_mode_) {
t_start = t_end;
if (compute_task.valid()) {
throw_if_not_ok(compute_task.wait());
}
continue;
}

Expand Down Expand Up @@ -772,8 +769,8 @@ DenseReader::compute_result_space_tiles(
const auto fragment_num = (unsigned)frag_tile_domains.size();
const auto& tile_coords = subarray.tile_coords();

// Keep track of the required memory to load the result space tiles. Split
// up filtered versus unfiltered. The memory budget is combined for all
// Keep track of the required memory to load the result space tiles. Split up
// filtered versus unfiltered. The memory budget is combined for all
// query condition attributes.
uint64_t required_memory_query_condition_unfiltered = 0;
std::vector<uint64_t> required_memory_unfiltered(
Expand All @@ -789,28 +786,28 @@ DenseReader::compute_result_space_tiles(
aggregate_only_field[n - condition_names.size()] = aggregate_only(name);
}

// Here we estimate the size of the tile structures. First, we have to
// account the size of the space tile structure. We could go deeper in the
// class to account for other things but for now we keep it simpler. Second,
// we try to account for the tile subarray (DenseTileSubarray). This class
// will have a vector of ranges per dimensions, so 1 + dim_num *
// sizeof(vector). Here we choose 32 for the size of the vector to
// anticipate the conversion to a PMR vector. We also add dim_num * 2 *
// sizeof(DimType) to account for at least one range per dimension (this
// should be improved by accounting for the exact number of ranges). Finally
// for the original range index member, we have to add 1 + dim_num *
// sizeof(vector) as well and one uint64_t per dimension (this can also be
// improved by accounting for the exact number of ranges).
// Here we estimate the size of the tile structures. First, we have to account
// the size of the space tile structure. We could go deeper in the class to
// account for other things but for now we keep it simpler. Second, we try to
// account for the tile subarray (DenseTileSubarray). This class will have a
// vector of ranges per dimensions, so 1 + dim_num * sizeof(vector). Here we
// choose 32 for the size of the vector to anticipate the conversion to a PMR
// vector. We also add dim_num * 2 * sizeof(DimType) to account for at least
// one range per dimension (this should be improved by accounting for the
// exact number of ranges). Finally for the original range index member, we
// have to add 1 + dim_num * sizeof(vector) as well and one uint64_t per
// dimension (this can also be improved by accounting for the
// exact number of ranges).
uint64_t est_tile_structs_size =
sizeof(ResultSpaceTile<DimType>) + (1 + dim_num) * 2 * 32 +
dim_num * (2 * sizeof(DimType) + sizeof(uint64_t));

// Create the vector of result tiles to operate on. We stop once we reach
// the end or the memory budget. We either reach the tile upper memory
// limit, which is only for unfiltered data, or the limit of the available
// budget, which is for filtered data, unfiltered data and the tile structs.
// We try to process two tile batches at a time so the available memory is
// half of what we have available.
// the end or the memory budget. We either reach the tile upper memory limit,
// which is only for unfiltered data, or the limit of the available budget,
// which is for filtered data, unfiltered data and the tile structs. We try to
// process two tile batches at a time so the available memory is half of what
// we have available.
uint64_t t_end = t_start;
bool wait_compute_task_before_read = false;
bool done = false;
Expand Down Expand Up @@ -898,8 +895,8 @@ DenseReader::compute_result_space_tiles(
uint64_t tile_memory_filtered = 0;
uint64_t r_idx = n - condition_names.size();

// We might not need to load this tile into memory at all for
// aggregation only.
// We might not need to load this tile into memory at all for aggregation
// only.
if (aggregate_only_field[r_idx] &&
can_aggregate_tile_with_frag_md(
names[n], result_space_tile, tiles_cell_num[t_end])) {
Expand Down Expand Up @@ -956,14 +953,13 @@ DenseReader::compute_result_space_tiles(
required_memory_unfiltered[r_idx] +
est_tile_structs_size;

// Disable the multiple iterations if the tiles don't fit in the
// iteration budget.
// Disable the multiple iterations if the tiles don't fit in the iteration
// budget.
if (total_memory > available_memory_iteration) {
wait_compute_task_before_read = true;
}

// If a single tile doesn't fit in the available memory, we can't
// proceed.
// If a single tile doesn't fit in the available memory, we can't proceed.
if (total_memory > available_memory) {
throw DenseReaderException(
"Cannot process a single tile requiring " +
Expand Down Expand Up @@ -1007,8 +1003,7 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(
const auto& tile_coords = subarray.tile_coords();
const bool agg_only = name.has_value() && aggregate_only(name.value());

// If the result is already loaded in query condition, return the empty
// list;
// If the result is already loaded in query condition, return the empty list;
std::vector<ResultTile*> ret;
if (name.has_value() && condition_names.count(name.value()) != 0) {
return ret;
Expand Down Expand Up @@ -1038,8 +1033,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(

/**
* Apply the query condition. The computation will be pushed on the compute
* thread pool in `compute_task`. Callers should wait on this task before
* using the results of the query condition.
* thread pool in `compute_task`. Callers should wait on this task before using
* the results of the query condition.
*/
template <class DimType, class OffType>
Status DenseReader::apply_query_condition(
Expand Down
Loading
Loading