Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-2.27] Improve readers by parallelizing I/O and compute operations (#5401) #5451

Merged
merged 40 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
be16f0b
Move I/O wait to datablock instead of reader.
Shelnutt2 Oct 29, 2024
de943f1
Switch to SharedTask in order to allow multi-threaded access to futur…
Shelnutt2 Oct 29, 2024
f3d5359
Fix unit test compilation
Shelnutt2 Oct 29, 2024
628e805
WIP: parallelize filter pipeline and interleave comparisons
Shelnutt2 Oct 30, 2024
453cb26
Switch to ThreadPool::SharedTask instead of shared_ptr
Shelnutt2 Oct 30, 2024
3b821dd
Add recursive_mutex for thread-safety of tile ThreadPool::SharedTask …
Shelnutt2 Oct 31, 2024
89551db
WIP: try to store reference to FilterData on result tile, need to fix…
Shelnutt2 Oct 31, 2024
9b9ae5e
Adjust lambdas and avoid task components going out of scope.
Shelnutt2 Oct 31, 2024
560e9d7
Add new data_unsafe to Tile accessorsa.
Shelnutt2 Oct 31, 2024
ef36088
Add stats tracking to new tasks for reading and unfiltering tiles
Shelnutt2 Oct 31, 2024
b0c82a4
Fix unit test compilation
Shelnutt2 Oct 31, 2024
98c9a92
Add new zip_coordinates_unsafe
Shelnutt2 Nov 1, 2024
602d96b
Wait until tasks are done before freeing tiles
ypatia Nov 7, 2024
92690a6
Remove redundant shared future get
ypatia Nov 11, 2024
903b7a8
Fix null counts, check tasks are valid and other fixes
ypatia Nov 15, 2024
470d8b1
Fix RLE and dict decompression
ypatia Nov 15, 2024
c6f9227
Fix budget tests, g.o.r. result tile is now 3496 bytes in size
ypatia Nov 18, 2024
59a3926
Adaptations to new threadpool
ypatia Nov 25, 2024
9579c0c
Fix compute task outliving dense reader
ypatia Nov 28, 2024
914c480
Remove mutex that causes problems (TBD)
ypatia Nov 29, 2024
9adb1c0
Adapt some tests after threadpool changes
ypatia Dec 4, 2024
6b8f228
Fix deadlock in merge_result_cell_slabs
ypatia Dec 4, 2024
ffaa95f
Fix linux compilation issue
ypatia Dec 6, 2024
964c644
Fix gcc future exception
Shelnutt2 Dec 6, 2024
6849756
Fix missing unit test threadpool linkage
Shelnutt2 Dec 6, 2024
76e6116
Fix tile missing threadpool linkage
ypatia Dec 12, 2024
cc9980d
Remove duplicate library in cmake
ypatia Dec 12, 2024
995fa66
Disable temporarily flaky test
ypatia Dec 13, 2024
b950d84
Attempt to fix asan error
ypatia Dec 13, 2024
4ab0a99
Fix segfault in legacy reader
ypatia Dec 16, 2024
a43f837
Revert "Attempt to fix asan error"
ypatia Dec 17, 2024
d0fbf49
Fix some windows tests
ypatia Dec 18, 2024
35ae5fe
Fix lifetime issues and some namings
ypatia Dec 18, 2024
cf92e21
Fix ASAN: Destructors of base classes must be virtual
ypatia Dec 19, 2024
cffe2bf
Some more PR cleanup
ypatia Dec 19, 2024
08e2a0b
Constrain non blocking tasks to I/O
ypatia Jan 22, 2025
2e0a7fd
Fixes to tests
ypatia Jan 23, 2025
7f178b5
Address most review comments
ypatia Feb 4, 2025
e446aa4
Address last comment and fix test for windows
ypatia Feb 5, 2025
f9488c5
Fix object libraries tests
ypatia Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion test/src/unit-ReadCellSlabIter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ void set_result_tile_dim(
std::nullopt,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
result_tile.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
4 changes: 2 additions & 2 deletions test/src/unit-cppapi-consolidation-with-timestamps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down Expand Up @@ -685,7 +685,7 @@ TEST_CASE_METHOD(

// Will only allow to load two tiles out of 3.
Config cfg;
cfg.set("sm.mem.total_budget", "30000");
cfg.set("sm.mem.total_budget", "50000");
cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15");
ctx_ = Context(cfg);

Expand Down
20 changes: 16 additions & 4 deletions test/src/unit-result-tile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -230,7 +233,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down Expand Up @@ -326,7 +332,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand All @@ -343,7 +352,10 @@ TEST_CASE_METHOD(
0,
std::nullopt,
std::nullopt);
ResultTile::TileData tile_data{nullptr, nullptr, nullptr};
ResultTile::TileData tile_data{
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()},
{nullptr, ThreadPool::SharedTask()}};
rt.init_coord_tile(
constants::format_version,
array_schema,
Expand Down
12 changes: 7 additions & 5 deletions test/src/unit-sparse-global-order-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1993,9 +1993,10 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tile (2 * (~3000 + 8) will be bigger than the per fragment
// budget (1000).
memory_.total_budget_ = "35000";
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (50000 * 0.11 / 2 fragments = 2750), so only one result
// tile will be loaded each time.
memory_.total_budget_ = "60000";
memory_.ratio_coords_ = "0.11";
update_config();

Expand Down Expand Up @@ -2518,8 +2519,9 @@ TEST_CASE_METHOD(
}

// FIXME: there is no per fragment budget anymore
// Two result tile (2 * (~4000 + 8) will be bigger than the per fragment
// budget (1000).
// Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per
// fragment budget (40000 * 0.22 /2 frag = 4400), so only one will be loaded
// each time.
memory_.total_budget_ = "40000";
memory_.ratio_coords_ = "0.22";
update_config();
Expand Down
7 changes: 5 additions & 2 deletions test/src/unit-sparse-unordered-with-dups-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,12 @@ TEST_CASE_METHOD(

if (one_frag) {
CHECK(1 == loop_num->second);
} else {
CHECK(9 == loop_num->second);
}
/**
* We can't do a similar check for multiple fragments as it is architecture
* dependent how many tiles fit in the memory budget. And thus also
* architecture dependent as to how many internal loops we have.
*/

// Try to read multiple frags without partial tile offset reading. Should
// fail
Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/filter/compression_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ Status CompressionFilter::decompress_var_string_coords(
auto output_view = span<std::byte>(
reinterpret_cast<std::byte*>(output_buffer->data()), uncompressed_size);
auto offsets_view = span<uint64_t>(
offsets_tile->data_as<offsets_t>(), uncompressed_offsets_size);
offsets_tile->data_as_unsafe<offsets_t>(), uncompressed_offsets_size);

if (compressor_ == Compressor::RLE) {
uint8_t rle_len_bytesize, string_len_bytesize;
Expand Down
4 changes: 2 additions & 2 deletions tiledb/sm/filter/filter_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ Status FilterPipeline::run_reverse(
// If the pipeline is empty, just copy input to output.
if (filters_.empty()) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(input_data.copy_to(output_chunk_buffer));
continue;
}
Expand All @@ -487,7 +487,7 @@ Status FilterPipeline::run_reverse(
bool last_filter = filter_idx == 0;
if (last_filter) {
void* output_chunk_buffer =
tile->data_as<char>() + chunk_data.chunk_offsets_[i];
tile->data_as_unsafe<char>() + chunk_data.chunk_offsets_[i];
RETURN_NOT_OK(output_data.set_fixed_allocation(
output_chunk_buffer, chunk.unfiltered_data_size_));
reader_stats->add_counter(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/filter_test_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ Tile create_tile_for_unfiltering(
tile->cell_size() * nelts,
tile->filtered_buffer().data(),
tile->filtered_buffer().size(),
tracker};
tracker,
std::nullopt};
}

void run_reverse(
Expand Down
3 changes: 2 additions & 1 deletion tiledb/sm/filter/test/tile_data_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class TileDataGenerator {
original_tile_size(),
filtered_buffer.data(),
filtered_buffer.size(),
memory_tracker);
memory_tracker,
std::nullopt);
}

/** Returns the size of the original unfiltered data. */
Expand Down
9 changes: 6 additions & 3 deletions tiledb/sm/metadata/test/unit_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ TEST_CASE(
tile1->size(),
tile1->filtered_buffer().data(),
tile1->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size());

metadata_tiles[1] = tdb::make_shared<Tile>(
Expand All @@ -135,7 +136,8 @@ TEST_CASE(
tile2->size(),
tile2->filtered_buffer().data(),
tile2->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size());

metadata_tiles[2] = tdb::make_shared<Tile>(
Expand All @@ -147,7 +149,8 @@ TEST_CASE(
tile3->size(),
tile3->filtered_buffer().data(),
tile3->filtered_buffer().size(),
tracker);
tracker,
ThreadPool::SharedTask());
memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size());

meta = Metadata::deserialize(metadata_tiles);
Expand Down
59 changes: 32 additions & 27 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ Status DenseReader::dense_read() {
// processing.
if (qc_coords_mode_) {
t_start = t_end;
if (compute_task.valid()) {
throw_if_not_ok(compute_task.wait());
}
continue;
}

Expand Down Expand Up @@ -769,8 +772,8 @@ DenseReader::compute_result_space_tiles(
const auto fragment_num = (unsigned)frag_tile_domains.size();
const auto& tile_coords = subarray.tile_coords();

// Keep track of the required memory to load the result space tiles. Split up
// filtered versus unfiltered. The memory budget is combined for all
// Keep track of the required memory to load the result space tiles. Split
// up filtered versus unfiltered. The memory budget is combined for all
// query condition attributes.
uint64_t required_memory_query_condition_unfiltered = 0;
std::vector<uint64_t> required_memory_unfiltered(
Expand All @@ -786,28 +789,28 @@ DenseReader::compute_result_space_tiles(
aggregate_only_field[n - condition_names.size()] = aggregate_only(name);
}

// Here we estimate the size of the tile structures. First, we have to account
// the size of the space tile structure. We could go deeper in the class to
// account for other things but for now we keep it simpler. Second, we try to
// account for the tile subarray (DenseTileSubarray). This class will have a
// vector of ranges per dimensions, so 1 + dim_num * sizeof(vector). Here we
// choose 32 for the size of the vector to anticipate the conversion to a PMR
// vector. We also add dim_num * 2 * sizeof(DimType) to account for at least
// one range per dimension (this should be improved by accounting for the
// exact number of ranges). Finally for the original range index member, we
// have to add 1 + dim_num * sizeof(vector) as well and one uint64_t per
// dimension (this can also be improved by accounting for the
// exact number of ranges).
// Here we estimate the size of the tile structures. First, we have to
// account the size of the space tile structure. We could go deeper in the
// class to account for other things but for now we keep it simpler. Second,
// we try to account for the tile subarray (DenseTileSubarray). This class
// will have a vector of ranges per dimensions, so 1 + dim_num *
// sizeof(vector). Here we choose 32 for the size of the vector to
// anticipate the conversion to a PMR vector. We also add dim_num * 2 *
// sizeof(DimType) to account for at least one range per dimension (this
// should be improved by accounting for the exact number of ranges). Finally
// for the original range index member, we have to add 1 + dim_num *
// sizeof(vector) as well and one uint64_t per dimension (this can also be
// improved by accounting for the exact number of ranges).
uint64_t est_tile_structs_size =
sizeof(ResultSpaceTile<DimType>) + (1 + dim_num) * 2 * 32 +
dim_num * (2 * sizeof(DimType) + sizeof(uint64_t));

// Create the vector of result tiles to operate on. We stop once we reach
// the end or the memory budget. We either reach the tile upper memory limit,
// which is only for unfiltered data, or the limit of the available budget,
// which is for filtered data, unfiltered data and the tile structs. We try to
// process two tile batches at a time so the available memory is half of what
// we have available.
// the end or the memory budget. We either reach the tile upper memory
// limit, which is only for unfiltered data, or the limit of the available
// budget, which is for filtered data, unfiltered data and the tile structs.
// We try to process two tile batches at a time so the available memory is
// half of what we have available.
uint64_t t_end = t_start;
bool wait_compute_task_before_read = false;
bool done = false;
Expand Down Expand Up @@ -895,8 +898,8 @@ DenseReader::compute_result_space_tiles(
uint64_t tile_memory_filtered = 0;
uint64_t r_idx = n - condition_names.size();

// We might not need to load this tile into memory at all for aggregation
// only.
// We might not need to load this tile into memory at all for
// aggregation only.
if (aggregate_only_field[r_idx] &&
can_aggregate_tile_with_frag_md(
names[n], result_space_tile, tiles_cell_num[t_end])) {
Expand Down Expand Up @@ -953,13 +956,14 @@ DenseReader::compute_result_space_tiles(
required_memory_unfiltered[r_idx] +
est_tile_structs_size;

// Disable the multiple iterations if the tiles don't fit in the iteration
// budget.
// Disable the multiple iterations if the tiles don't fit in the
// iteration budget.
if (total_memory > available_memory_iteration) {
wait_compute_task_before_read = true;
}

// If a single tile doesn't fit in the available memory, we can't proceed.
// If a single tile doesn't fit in the available memory, we can't
// proceed.
if (total_memory > available_memory) {
throw DenseReaderException(
"Cannot process a single tile requiring " +
Expand Down Expand Up @@ -1003,7 +1007,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(
const auto& tile_coords = subarray.tile_coords();
const bool agg_only = name.has_value() && aggregate_only(name.value());

// If the result is already loaded in query condition, return the empty list;
// If the result is already loaded in query condition, return the empty
// list;
std::vector<ResultTile*> ret;
if (name.has_value() && condition_names.count(name.value()) != 0) {
return ret;
Expand Down Expand Up @@ -1033,8 +1038,8 @@ std::vector<ResultTile*> DenseReader::result_tiles_to_load(

/**
* Apply the query condition. The computation will be pushed on the compute
* thread pool in `compute_task`. Callers should wait on this task before using
* the results of the query condition.
* thread pool in `compute_task`. Callers should wait on this task before
* using the results of the query condition.
*/
template <class DimType, class OffType>
Status DenseReader::apply_query_condition(
Expand Down
Loading
Loading