diff --git a/test/src/unit-ReadCellSlabIter.cc b/test/src/unit-ReadCellSlabIter.cc index 78f84f5eea8..bdeef4e9c22 100644 --- a/test/src/unit-ReadCellSlabIter.cc +++ b/test/src/unit-ReadCellSlabIter.cc @@ -183,7 +183,10 @@ void set_result_tile_dim( std::nullopt, std::nullopt, std::nullopt); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_coord_tile( constants::format_version, array_schema, diff --git a/test/src/unit-cppapi-consolidation-with-timestamps.cc b/test/src/unit-cppapi-consolidation-with-timestamps.cc index 11afefb37c9..23e09248c0a 100644 --- a/test/src/unit-cppapi-consolidation-with-timestamps.cc +++ b/test/src/unit-cppapi-consolidation-with-timestamps.cc @@ -636,7 +636,7 @@ TEST_CASE_METHOD( // Will only allow to load two tiles out of 3. Config cfg; - cfg.set("sm.mem.total_budget", "30000"); + cfg.set("sm.mem.total_budget", "50000"); cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15"); ctx_ = Context(cfg); @@ -685,7 +685,7 @@ TEST_CASE_METHOD( // Will only allow to load two tiles out of 3. Config cfg; - cfg.set("sm.mem.total_budget", "30000"); + cfg.set("sm.mem.total_budget", "50000"); cfg.set("sm.mem.reader.sparse_global_order.ratio_coords", "0.15"); ctx_ = Context(cfg); diff --git a/test/src/unit-result-tile.cc b/test/src/unit-result-tile.cc index 6bc1552add8..a04bba077b7 100644 --- a/test/src/unit-result-tile.cc +++ b/test/src/unit-result-tile.cc @@ -213,7 +213,10 @@ TEST_CASE_METHOD( 0, std::nullopt, std::nullopt); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, @@ -230,7 +233,10 @@ TEST_CASE_METHOD( 0, std::nullopt, std::nullopt); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, @@ -326,7 +332,10 @@ TEST_CASE_METHOD( 0, std::nullopt, std::nullopt); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, @@ -343,7 +352,10 @@ TEST_CASE_METHOD( 0, std::nullopt, std::nullopt); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; rt.init_coord_tile( constants::format_version, array_schema, diff --git a/test/src/unit-sparse-global-order-reader.cc b/test/src/unit-sparse-global-order-reader.cc index 678ccd7112a..2a2307d780c 100644 --- a/test/src/unit-sparse-global-order-reader.cc +++ b/test/src/unit-sparse-global-order-reader.cc @@ -1993,9 +1993,10 @@ TEST_CASE_METHOD( } // FIXME: there is no per fragment budget anymore - // Two result tile (2 * (~3000 + 8) will be bigger than the per fragment - // budget (1000). - memory_.total_budget_ = "35000"; + // Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per + // fragment budget (50000 * 0.11 / 2 fragments = 2750), so only one result + // tile will be loaded each time. + memory_.total_budget_ = "60000"; memory_.ratio_coords_ = "0.11"; update_config(); @@ -2518,8 +2519,9 @@ TEST_CASE_METHOD( } // FIXME: there is no per fragment budget anymore - // Two result tile (2 * (~4000 + 8) will be bigger than the per fragment - // budget (1000). + // Two result tiles (2 * (2842 + 8)) = 5700 will be bigger than the per + // fragment budget (40000 * 0.22 /2 frag = 4400), so only one will be loaded + // each time. memory_.total_budget_ = "40000"; memory_.ratio_coords_ = "0.22"; update_config(); diff --git a/test/src/unit-sparse-unordered-with-dups-reader.cc b/test/src/unit-sparse-unordered-with-dups-reader.cc index 0a02607578d..406edda69f6 100644 --- a/test/src/unit-sparse-unordered-with-dups-reader.cc +++ b/test/src/unit-sparse-unordered-with-dups-reader.cc @@ -1064,9 +1064,12 @@ TEST_CASE_METHOD( if (one_frag) { CHECK(1 == loop_num->second); - } else { - CHECK(9 == loop_num->second); } + /** + * We can't do a similar check for multiple fragments as it is architecture + * dependent how many tiles fit in the memory budget. And thus also + * architecture dependent as to how many internal loops we have. + */ // Try to read multiple frags without partial tile offset reading. Should // fail diff --git a/tiledb/sm/filter/compression_filter.cc b/tiledb/sm/filter/compression_filter.cc index 40cc0c18680..5069fc63fec 100644 --- a/tiledb/sm/filter/compression_filter.cc +++ b/tiledb/sm/filter/compression_filter.cc @@ -636,7 +636,7 @@ Status CompressionFilter::decompress_var_string_coords( auto output_view = span( reinterpret_cast(output_buffer->data()), uncompressed_size); auto offsets_view = span( - offsets_tile->data_as(), uncompressed_offsets_size); + offsets_tile->data_as_unsafe(), uncompressed_offsets_size); if (compressor_ == Compressor::RLE) { uint8_t rle_len_bytesize, string_len_bytesize; diff --git a/tiledb/sm/filter/filter_pipeline.cc b/tiledb/sm/filter/filter_pipeline.cc index 158d9a6fa72..55fcea4a7aa 100644 --- a/tiledb/sm/filter/filter_pipeline.cc +++ b/tiledb/sm/filter/filter_pipeline.cc @@ -464,7 +464,7 @@ Status FilterPipeline::run_reverse( // If the pipeline is empty, just copy input to output. if (filters_.empty()) { void* output_chunk_buffer = - tile->data_as() + chunk_data.chunk_offsets_[i]; + tile->data_as_unsafe() + chunk_data.chunk_offsets_[i]; RETURN_NOT_OK(input_data.copy_to(output_chunk_buffer)); continue; } @@ -487,7 +487,7 @@ Status FilterPipeline::run_reverse( bool last_filter = filter_idx == 0; if (last_filter) { void* output_chunk_buffer = - tile->data_as() + chunk_data.chunk_offsets_[i]; + tile->data_as_unsafe() + chunk_data.chunk_offsets_[i]; RETURN_NOT_OK(output_data.set_fixed_allocation( output_chunk_buffer, chunk.unfiltered_data_size_)); reader_stats->add_counter( diff --git a/tiledb/sm/filter/test/filter_test_support.cc b/tiledb/sm/filter/test/filter_test_support.cc index 907940fd013..19d3615fc45 100644 --- a/tiledb/sm/filter/test/filter_test_support.cc +++ b/tiledb/sm/filter/test/filter_test_support.cc @@ -203,7 +203,8 @@ Tile create_tile_for_unfiltering( tile->cell_size() * nelts, tile->filtered_buffer().data(), tile->filtered_buffer().size(), - tracker}; + tracker, + std::nullopt}; } void run_reverse( diff --git a/tiledb/sm/filter/test/tile_data_generator.h b/tiledb/sm/filter/test/tile_data_generator.h index 284b868c9f4..2f781b66f02 100644 --- a/tiledb/sm/filter/test/tile_data_generator.h +++ b/tiledb/sm/filter/test/tile_data_generator.h @@ -99,7 +99,8 @@ class TileDataGenerator { original_tile_size(), filtered_buffer.data(), filtered_buffer.size(), - memory_tracker); + memory_tracker, + std::nullopt); } /** Returns the size of the original unfiltered data. */ diff --git a/tiledb/sm/metadata/test/unit_metadata.cc b/tiledb/sm/metadata/test/unit_metadata.cc index 35c3ed06806..0b91d0246bd 100644 --- a/tiledb/sm/metadata/test/unit_metadata.cc +++ b/tiledb/sm/metadata/test/unit_metadata.cc @@ -123,7 +123,8 @@ TEST_CASE( tile1->size(), tile1->filtered_buffer().data(), tile1->filtered_buffer().size(), - tracker); + tracker, + ThreadPool::SharedTask()); memcpy(metadata_tiles[0]->data(), tile1->data(), tile1->size()); metadata_tiles[1] = tdb::make_shared( @@ -135,7 +136,8 @@ TEST_CASE( tile2->size(), tile2->filtered_buffer().data(), tile2->filtered_buffer().size(), - tracker); + tracker, + ThreadPool::SharedTask()); memcpy(metadata_tiles[1]->data(), tile2->data(), tile2->size()); metadata_tiles[2] = tdb::make_shared( @@ -147,7 +149,8 @@ TEST_CASE( tile3->size(), tile3->filtered_buffer().data(), tile3->filtered_buffer().size(), - tracker); + tracker, + ThreadPool::SharedTask()); memcpy(metadata_tiles[2]->data(), tile3->data(), tile3->size()); meta = Metadata::deserialize(metadata_tiles); diff --git a/tiledb/sm/query/readers/dense_reader.cc b/tiledb/sm/query/readers/dense_reader.cc index f59c6dbe4d9..28d0a333c10 100644 --- a/tiledb/sm/query/readers/dense_reader.cc +++ b/tiledb/sm/query/readers/dense_reader.cc @@ -453,6 +453,9 @@ Status DenseReader::dense_read() { // processing. if (qc_coords_mode_) { t_start = t_end; + if (compute_task.valid()) { + throw_if_not_ok(compute_task.wait()); + } continue; } @@ -769,8 +772,8 @@ DenseReader::compute_result_space_tiles( const auto fragment_num = (unsigned)frag_tile_domains.size(); const auto& tile_coords = subarray.tile_coords(); - // Keep track of the required memory to load the result space tiles. Split up - // filtered versus unfiltered. The memory budget is combined for all + // Keep track of the required memory to load the result space tiles. Split + // up filtered versus unfiltered. The memory budget is combined for all // query condition attributes. uint64_t required_memory_query_condition_unfiltered = 0; std::vector required_memory_unfiltered( @@ -786,28 +789,28 @@ DenseReader::compute_result_space_tiles( aggregate_only_field[n - condition_names.size()] = aggregate_only(name); } - // Here we estimate the size of the tile structures. First, we have to account - // the size of the space tile structure. We could go deeper in the class to - // account for other things but for now we keep it simpler. Second, we try to - // account for the tile subarray (DenseTileSubarray). This class will have a - // vector of ranges per dimensions, so 1 + dim_num * sizeof(vector). Here we - // choose 32 for the size of the vector to anticipate the conversion to a PMR - // vector. We also add dim_num * 2 * sizeof(DimType) to account for at least - // one range per dimension (this should be improved by accounting for the - // exact number of ranges). Finally for the original range index member, we - // have to add 1 + dim_num * sizeof(vector) as well and one uint64_t per - // dimension (this can also be improved by accounting for the - // exact number of ranges). + // Here we estimate the size of the tile structures. First, we have to + // account the size of the space tile structure. We could go deeper in the + // class to account for other things but for now we keep it simpler. Second, + // we try to account for the tile subarray (DenseTileSubarray). This class + // will have a vector of ranges per dimensions, so 1 + dim_num * + // sizeof(vector). Here we choose 32 for the size of the vector to + // anticipate the conversion to a PMR vector. We also add dim_num * 2 * + // sizeof(DimType) to account for at least one range per dimension (this + // should be improved by accounting for the exact number of ranges). Finally + // for the original range index member, we have to add 1 + dim_num * + // sizeof(vector) as well and one uint64_t per dimension (this can also be + // improved by accounting for the exact number of ranges). uint64_t est_tile_structs_size = sizeof(ResultSpaceTile) + (1 + dim_num) * 2 * 32 + dim_num * (2 * sizeof(DimType) + sizeof(uint64_t)); // Create the vector of result tiles to operate on. We stop once we reach - // the end or the memory budget. We either reach the tile upper memory limit, - // which is only for unfiltered data, or the limit of the available budget, - // which is for filtered data, unfiltered data and the tile structs. We try to - // process two tile batches at a time so the available memory is half of what - // we have available. + // the end or the memory budget. We either reach the tile upper memory + // limit, which is only for unfiltered data, or the limit of the available + // budget, which is for filtered data, unfiltered data and the tile structs. + // We try to process two tile batches at a time so the available memory is + // half of what we have available. uint64_t t_end = t_start; bool wait_compute_task_before_read = false; bool done = false; @@ -895,8 +898,8 @@ DenseReader::compute_result_space_tiles( uint64_t tile_memory_filtered = 0; uint64_t r_idx = n - condition_names.size(); - // We might not need to load this tile into memory at all for aggregation - // only. + // We might not need to load this tile into memory at all for + // aggregation only. if (aggregate_only_field[r_idx] && can_aggregate_tile_with_frag_md( names[n], result_space_tile, tiles_cell_num[t_end])) { @@ -953,13 +956,14 @@ DenseReader::compute_result_space_tiles( required_memory_unfiltered[r_idx] + est_tile_structs_size; - // Disable the multiple iterations if the tiles don't fit in the iteration - // budget. + // Disable the multiple iterations if the tiles don't fit in the + // iteration budget. if (total_memory > available_memory_iteration) { wait_compute_task_before_read = true; } - // If a single tile doesn't fit in the available memory, we can't proceed. + // If a single tile doesn't fit in the available memory, we can't + // proceed. if (total_memory > available_memory) { throw DenseReaderException( "Cannot process a single tile requiring " + @@ -1003,7 +1007,8 @@ std::vector DenseReader::result_tiles_to_load( const auto& tile_coords = subarray.tile_coords(); const bool agg_only = name.has_value() && aggregate_only(name.value()); - // If the result is already loaded in query condition, return the empty list; + // If the result is already loaded in query condition, return the empty + // list; std::vector ret; if (name.has_value() && condition_names.count(name.value()) != 0) { return ret; @@ -1033,8 +1038,8 @@ std::vector DenseReader::result_tiles_to_load( /** * Apply the query condition. The computation will be pushed on the compute - * thread pool in `compute_task`. Callers should wait on this task before using - * the results of the query condition. + * thread pool in `compute_task`. Callers should wait on this task before + * using the results of the query condition. */ template Status DenseReader::apply_query_condition( diff --git a/tiledb/sm/query/readers/filtered_data.h b/tiledb/sm/query/readers/filtered_data.h index 455fc71a2b0..915de7f28c3 100644 --- a/tiledb/sm/query/readers/filtered_data.h +++ b/tiledb/sm/query/readers/filtered_data.h @@ -121,6 +121,14 @@ class FilteredDataBlock { offset + size <= offset_ + size_; } + void set_io_task(ThreadPool::SharedTask task) { + io_task_ = std::move(task); + } + + ThreadPool::SharedTask io_task() { + return io_task_; + } + private: /* ********************************* */ /* PRIVATE ATTRIBUTES */ @@ -139,6 +147,9 @@ class FilteredDataBlock { /** Data for the data block. */ tdb::pmr::unique_ptr filtered_data_; + + /** IO Task to block on for data access. */ + ThreadPool::SharedTask io_task_; }; /** @@ -187,7 +198,6 @@ class FilteredData { const bool var_sized, const bool nullable, const bool validity_only, - std::vector& read_tasks, shared_ptr memory_tracker) : resources_(resources) , memory_tracker_(memory_tracker) @@ -201,7 +211,7 @@ class FilteredData { , fragment_metadata_(fragment_metadata) , var_sized_(var_sized) , nullable_(nullable) - , read_tasks_(read_tasks) { + , stats_(reader.stats()->create_child("FilteredData")) { if (result_tiles.size() == 0) { return; } @@ -319,56 +329,64 @@ class FilteredData { /* ********************************* */ /** - * Get the fixed filtered data for the result tile. + * Get a pointer to the fixed filtered data for the result tile and a future + * which signals when the data is valid. * * @param fragment Fragment metadata for the tile. * @param rt Result tile. * @return Fixed filtered data pointer. */ - inline void* fixed_filtered_data( + inline std::pair fixed_filtered_data( const FragmentMetadata* fragment, const ResultTile* rt) { auto offset{ fragment->loaded_metadata()->file_offset(name_, rt->tile_idx())}; ensure_data_block_current(TileType::FIXED, fragment, rt, offset); - return current_data_block(TileType::FIXED)->data_at(offset); + return { + current_data_block(TileType::FIXED)->data_at(offset), + current_data_block(TileType::FIXED)->io_task()}; } /** - * Get the var filtered data for the result tile. - * + * Get a pointer to the var filtered data for the result tile and a future + * which signals when the data is valid. * * @param fragment Fragment metadata for the tile. * @param rt Result tile. * @return Var filtered data pointer. */ - inline void* var_filtered_data( + inline std::pair var_filtered_data( const FragmentMetadata* fragment, const ResultTile* rt) { if (!var_sized_) { - return nullptr; + return {nullptr, ThreadPool::SharedTask()}; } auto offset{ fragment->loaded_metadata()->file_var_offset(name_, rt->tile_idx())}; ensure_data_block_current(TileType::VAR, fragment, rt, offset); - return current_data_block(TileType::VAR)->data_at(offset); + return { + current_data_block(TileType::VAR)->data_at(offset), + current_data_block(TileType::VAR)->io_task()}; } /** - * Get the nullable filtered data for the result tile. + * Get a pointer to the nullable filtered data for the result tile and a + * future which signals when the data is valid. * * @param fragment Fragment metadata for the tile. * @param rt Result tile. * @return Nullable filtered data pointer. */ - inline void* nullable_filtered_data( + inline std::pair nullable_filtered_data( const FragmentMetadata* fragment, const ResultTile* rt) { if (!nullable_) { - return nullptr; + return {nullptr, ThreadPool::SharedTask()}; } auto offset{fragment->loaded_metadata()->file_validity_offset( name_, rt->tile_idx())}; ensure_data_block_current(TileType::NULLABLE, fragment, rt, offset); - return current_data_block(TileType::NULLABLE)->data_at(offset); + return { + current_data_block(TileType::NULLABLE)->data_at(offset), + current_data_block(TileType::NULLABLE)->io_task()}; } private: @@ -394,11 +412,13 @@ class FilteredData { auto data{block.data()}; auto size{block.size()}; URI uri{file_uri(fragment_metadata_[block.frag_idx()].get(), type)}; - auto task = resources_.io_tp().execute([this, offset, data, size, uri]() { - throw_if_not_ok(resources_.vfs().read(uri, offset, data, size, false)); - return Status::Ok(); - }); - read_tasks_.push_back(std::move(task)); + ThreadPool::SharedTask task = + resources_.io_tp().execute([this, offset, data, size, uri]() { + auto timer_se = stats_->start_timer("read"); + return resources_.vfs().read(uri, offset, data, size, false); + }); + // This should be changed once we use taskgraphs for modeling the data flow + block.set_io_task(task); } /** @return Data blocks corresponding to the tile type. */ @@ -635,8 +655,8 @@ class FilteredData { /** Is the attribute nullable? */ const bool nullable_; - /** Read tasks. */ - std::vector& read_tasks_; + /** Stats to track loading. */ + stats::Stats* stats_; }; } // namespace tiledb::sm diff --git a/tiledb/sm/query/readers/reader_base.cc b/tiledb/sm/query/readers/reader_base.cc index 874a7b34461..78045e5ce29 100644 --- a/tiledb/sm/query/readers/reader_base.cc +++ b/tiledb/sm/query/readers/reader_base.cc @@ -697,7 +697,6 @@ std::list ReaderBase::read_tiles( } uint64_t num_tiles_read{0}; - std::vector read_tasks; // Run all attributes independently. for (auto n : names) { @@ -721,7 +720,6 @@ std::list ReaderBase::read_tiles( var_sized, nullable, val_only, - read_tasks, memory_tracker_); // Go through each tiles and create the attribute tiles. @@ -749,12 +747,14 @@ std::list ReaderBase::read_tiles( // 'TileData' objects should be returned by this function and passed into // 'unfilter_tiles' so that the filter pipeline can stop using the // 'ResultTile' object to get access to the filtered data. + std::pair t = { + nullptr, ThreadPool::SharedTask()}; ResultTile::TileData tile_data{ val_only ? - nullptr : + t : filtered_data.back().fixed_filtered_data(fragment.get(), tile), val_only ? - nullptr : + t : filtered_data.back().var_filtered_data(fragment.get(), tile), filtered_data.back().nullable_filtered_data(fragment.get(), tile)}; @@ -779,12 +779,6 @@ std::list ReaderBase::read_tiles( stats_->add_counter("num_tiles_read", num_tiles_read); - // Wait for the read tasks to finish. - auto statuses{resources_.io_tp().wait_all_status(read_tasks)}; - for (const auto& st : statuses) { - throw_if_not_ok(st); - } - return filtered_data; } @@ -854,7 +848,7 @@ Status ReaderBase::zip_tile_coordinates( array_schema_.filters(name).get_filter() != nullptr; auto version = tile->format_version(); if (version > 1 || using_compression) { - tile->zip_coordinates(); + tile->zip_coordinates_unsafe(); } } return Status::Ok(); @@ -880,16 +874,18 @@ Status ReaderBase::post_process_unfiltered_tile( return Status::Ok(); } - auto& t = tile_tuple->fixed_tile(); - t.clear_filtered_buffer(); + if (!validity_only) { + auto& t = tile_tuple->fixed_tile(); + t.clear_filtered_buffer(); - throw_if_not_ok(zip_tile_coordinates(name, &t)); + throw_if_not_ok(zip_tile_coordinates(name, &t)); - if (var_size && !validity_only) { - auto& t_var = tile_tuple->var_tile(); - t_var.clear_filtered_buffer(); - throw_if_not_ok(zip_tile_coordinates(name, &t_var)); - t.add_extra_offset(t_var); + if (var_size) { + auto& t_var = tile_tuple->var_tile(); + t_var.clear_filtered_buffer(); + throw_if_not_ok(zip_tile_coordinates(name, &t_var)); + t.add_extra_offset_unsafe(t_var); + } } if (nullable) { @@ -905,8 +901,9 @@ Status ReaderBase::unfilter_tiles( const std::string& name, const bool validity_only, const std::vector& result_tiles) { - const auto stat_type = (array_schema_.is_attr(name)) ? "unfilter_attr_tiles" : - "unfilter_coord_tiles"; + const auto stat_type = (array_schema_.is_attr(name)) ? + "unfilter_attr_tiles_builder" : + "unfilter_coord_tiles_builder"; const auto timer_se = stats_->start_timer(stat_type); auto var_size = array_schema_.var_size(name); @@ -936,10 +933,6 @@ Status ReaderBase::unfilter_tiles( std::vector tiles_chunk_data(num_tiles); std::vector tiles_chunk_var_data(num_tiles); std::vector tiles_chunk_validity_data(num_tiles); - // Vectors with the sizes of all unfiltered tile buffers - std::vector unfiltered_tile_size(num_tiles); - std::vector unfiltered_tile_var_size(num_tiles); - std::vector unfiltered_tile_validity_size(num_tiles); // Pre-compute chunk offsets. auto status = parallel_for( @@ -955,9 +948,6 @@ Status ReaderBase::unfilter_tiles( tiles_chunk_var_data[i], tiles_chunk_validity_data[i]); throw_if_not_ok(st); - unfiltered_tile_size[i] = tile_size.value(); - unfiltered_tile_var_size[i] = tile_var_size.value(); - unfiltered_tile_validity_size[i] = tile_validity_size.value(); return Status::Ok(); }); RETURN_NOT_OK_ELSE(status, throw_if_not_ok(logger_->status(status))); diff --git a/tiledb/sm/query/readers/reader_base.h b/tiledb/sm/query/readers/reader_base.h index ae06525cfed..71b0bc524ad 100644 --- a/tiledb/sm/query/readers/reader_base.h +++ b/tiledb/sm/query/readers/reader_base.h @@ -543,7 +543,8 @@ class ReaderBase : public StrategyBase { /** * Concurrently executes across each name in `names` and each result tile - * in 'result_tiles'. + * in 'result_tiles'. Attaches a future to each result_tile that is signaling + * when reading the corresponding data from disk is done. * * This must be the entry point for reading attribute tiles because it * generates stats for reading attributes. @@ -559,7 +560,8 @@ class ReaderBase : public StrategyBase { /** * Concurrently executes across each name in `names` and each result tile - * in 'result_tiles'. + * in 'result_tiles'. Attaches a future to each result_tile that is signaling + * when reading the corresponding data from disk is done. * * This must be the entry point for reading coordinate tiles because it * generates stats for reading coordinates. @@ -578,7 +580,8 @@ class ReaderBase : public StrategyBase { * in the appropriate result tile. * * Concurrently executes across each name in `names` and each result tile - * in 'result_tiles'. + * in 'result_tiles'. Attaches a future to each result_tile that is signaling + * when reading the corresponding data from disk is done. * * @param names The field names. * @param result_tiles The retrieved tiles will be stored inside the diff --git a/tiledb/sm/query/readers/result_tile.cc b/tiledb/sm/query/readers/result_tile.cc index 5a2c0872aaa..bb4ec543482 100644 --- a/tiledb/sm/query/readers/result_tile.cc +++ b/tiledb/sm/query/readers/result_tile.cc @@ -91,6 +91,20 @@ ResultTile::ResultTile( coord_func_ = &ResultTile::zipped_coord; } +ResultTile::~ResultTile() { + try { + // Wait for all attribute tasks to be done + wait_all_tiles(attr_tiles_); + } catch (...) { + } + + try { + // Wait for all coordinates tasks to be done + wait_all_tiles(coord_tiles_); + } catch (...) { + } +} + /* ****************************** */ /* API */ /* ****************************** */ @@ -130,7 +144,7 @@ void ResultTile::erase_tile(const std::string& name) { // Handle attribute tile for (auto& at : attr_tiles_) { - if (at.first == name) { + if (at.second.has_value() && at.first == name) { at.second.reset(); return; } @@ -261,6 +275,23 @@ ResultTile::TileTuple* ResultTile::tile_tuple(const std::string& name) { return nullptr; } +void ResultTile::wait_all_tiles( + const std::vector>>& tiles) + const { + for (auto& at : tiles) { + const auto& tile_tuple = at.second; + if (tile_tuple.has_value()) { + tile_tuple.value().fixed_tile().data(); + if (tile_tuple.value().var_tile_opt().has_value()) { + tile_tuple.value().var_tile_opt().value().data(); + } + if (tile_tuple.value().validity_tile_opt().has_value()) { + tile_tuple.value().validity_tile_opt().value().data(); + } + } + } +} + const void* ResultTile::unzipped_coord(uint64_t pos, unsigned dim_idx) const { const auto& coord_tile = coord_tiles_[dim_idx].second->fixed_tile(); const uint64_t offset = pos * coord_tile.cell_size(); @@ -383,7 +414,11 @@ Status ResultTile::read( if ((!is_dim && name != constants::coords && !use_fragment_ts) || (is_dim && !coord_tiles_[0].first.empty()) || (name == constants::coords && coords_tile_.has_value())) { - const auto& tile = this->tile_tuple(name)->fixed_tile(); + auto tile_tuple = this->tile_tuple(name); + if (tile_tuple == nullptr) { + return Status::Ok(); + } + const auto& tile = tile_tuple->fixed_tile(); auto cell_size = tile.cell_size(); auto nbytes = len * cell_size; auto offset = pos * cell_size; diff --git a/tiledb/sm/query/readers/result_tile.h b/tiledb/sm/query/readers/result_tile.h index afc1a26f91b..003a49f92eb 100644 --- a/tiledb/sm/query/readers/result_tile.h +++ b/tiledb/sm/query/readers/result_tile.h @@ -225,12 +225,38 @@ class ResultTile { /* CONSTRUCTORS & DESTRUCTORS */ /* ********************************* */ TileData( - void* fixed_filtered_data, - void* var_filtered_data, - void* validity_filtered_data) - : fixed_filtered_data_(fixed_filtered_data) - , var_filtered_data_(var_filtered_data) - , validity_filtered_data_(validity_filtered_data) { + std::pair fixed_filtered_data, + std::pair var_filtered_data, + std::pair validity_filtered_data) + : fixed_filtered_data_(fixed_filtered_data.first) + , var_filtered_data_(var_filtered_data.first) + , validity_filtered_data_(validity_filtered_data.first) + , fixed_filtered_data_task_(fixed_filtered_data.second) + , var_filtered_data_task_(var_filtered_data.second) + , validity_filtered_data_task_(validity_filtered_data.second) { + } + + ~TileData() { + try { + if (fixed_filtered_data_task_.valid()) { + auto st = fixed_filtered_data_task_.wait(); + } + } catch (...) { + } + + try { + if (var_filtered_data_task_.valid()) { + auto st = var_filtered_data_task_.wait(); + } + } catch (...) { + } + + try { + if (validity_filtered_data_task_.valid()) { + auto st = validity_filtered_data_task_.wait(); + } + } catch (...) { + } } /* ********************************* */ @@ -252,6 +278,21 @@ class ResultTile { return validity_filtered_data_; } + /** @return The fixed filtered data I/O task. */ + inline ThreadPool::SharedTask fixed_filtered_data_task() const { + return fixed_filtered_data_task_; + } + + /** @return The var filtered data I/O task. */ + inline ThreadPool::SharedTask var_filtered_data_task() const { + return var_filtered_data_task_; + } + + /** @return The validity filtered data I/O task. */ + inline ThreadPool::SharedTask validity_filtered_data_task() const { + return validity_filtered_data_task_; + } + private: /* ********************************* */ /* PRIVATE ATTRIBUTES */ @@ -265,6 +306,15 @@ class ResultTile { /** Stores the validity filtered data pointer. */ void* validity_filtered_data_; + + /** Stores the fixed filtered data I/O task. */ + ThreadPool::SharedTask fixed_filtered_data_task_; + + /** Stores the var filtered data I/O task. */ + ThreadPool::SharedTask var_filtered_data_task_; + + /** Stores the validity filtered data I/O task. */ + ThreadPool::SharedTask validity_filtered_data_task_; }; /** @@ -298,7 +348,8 @@ class ResultTile { tile_sizes.tile_size(), tile_data.fixed_filtered_data(), tile_sizes.tile_persisted_size(), - memory_tracker_) { + memory_tracker_, + tile_data.fixed_filtered_data_task()) { if (tile_sizes.has_var_tile()) { auto type = array_schema.type(name); var_tile_.emplace( @@ -309,7 +360,8 @@ class ResultTile { tile_sizes.tile_var_size(), tile_data.var_filtered_data(), tile_sizes.tile_var_persisted_size(), - memory_tracker_); + memory_tracker_, + tile_data.var_filtered_data_task()); } if (tile_sizes.has_validity_tile()) { @@ -321,7 +373,8 @@ class ResultTile { tile_sizes.tile_validity_size(), tile_data.validity_filtered_data(), tile_sizes.tile_validity_persisted_size(), - memory_tracker_); + memory_tracker_, + tile_data.validity_filtered_data_task()); } } @@ -344,6 +397,16 @@ class ResultTile { return validity_tile_.value(); } + /** @returns Var tile. */ + const std::optional& var_tile_opt() const { + return var_tile_; + } + + /** @returns Validity tile. */ + const std::optional& validity_tile_opt() const { + return validity_tile_; + } + /** @returns Fixed tile. */ const Tile& fixed_tile() const { return fixed_tile_; @@ -404,8 +467,8 @@ class ResultTile { DISABLE_COPY_AND_COPY_ASSIGN(ResultTile); DISABLE_MOVE_AND_MOVE_ASSIGN(ResultTile); - /** Default destructor. */ - ~ResultTile() = default; + /** Destructor needs to be virtual, this is a base class. */ + virtual ~ResultTile(); /* ********************************* */ /* API */ @@ -691,6 +754,11 @@ class ResultTile { const uint64_t min_cell, const uint64_t max_cell) const; + /* Waits for all tiles results to be available */ + void wait_all_tiles( + const std::vector>>& tiles) + const; + protected: /* ********************************* */ /* PROTECTED ATTRIBUTES */ @@ -863,6 +931,9 @@ class ResultTileWithBitmap : public ResultTile { DISABLE_COPY_AND_COPY_ASSIGN(ResultTileWithBitmap); DISABLE_MOVE_AND_MOVE_ASSIGN(ResultTileWithBitmap); + /** Default destructor needs to be virtual, this is a base class. */ + virtual ~ResultTileWithBitmap() = default; + public: /* ********************************* */ /* PUBLIC METHODS */ diff --git a/tiledb/sm/query/readers/sparse_global_order_reader.cc b/tiledb/sm/query/readers/sparse_global_order_reader.cc index 5ab35f807d8..4236ae60813 100644 --- a/tiledb/sm/query/readers/sparse_global_order_reader.cc +++ b/tiledb/sm/query/readers/sparse_global_order_reader.cc @@ -1545,7 +1545,6 @@ AddNextCellResult SparseGlobalOrderReader::add_next_cell_to_queue( } } } - std::unique_lock ul(tile_queue_mutex_); // Add all the cells in this tile with the same coordinates as this cell diff --git a/tiledb/sm/query/test/unit_query_condition.cc b/tiledb/sm/query/test/unit_query_condition.cc index a10809236f6..a90e941f3c6 100644 --- a/tiledb/sm/query/test/unit_query_condition.cc +++ b/tiledb/sm/query/test/unit_query_condition.cc @@ -1628,7 +1628,10 @@ void test_apply(const Datatype type, bool var_size, bool nullable) { nullable ? std::optional(cells * constants::cell_validity_size) : std::nullopt, nullable ? std::optional(0) : std::nullopt); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -1687,7 +1690,10 @@ void test_apply(const Datatype type, bool var_size, bool nullable) { nullable ? std::optional(0) : std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -1803,7 +1809,10 @@ TEST_CASE( std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, *frag_md[0], memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -2345,7 +2354,10 @@ void test_apply_dense( std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -2404,7 +2416,10 @@ void test_apply_dense(const Datatype type, bool var_size, bool nullable) { nullable ? std::optional(0) : std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -2519,7 +2534,10 @@ TEST_CASE( std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -3054,7 +3072,10 @@ void test_apply_sparse( std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -3113,7 +3134,10 @@ void test_apply_sparse(const Datatype type, bool var_size, bool nullable) { nullable ? std::optional(0) : std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -3892,7 +3916,10 @@ TEST_CASE( std::nullopt, std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -4184,7 +4211,10 @@ TEST_CASE( std::nullopt, std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -4602,7 +4632,10 @@ TEST_CASE( std::nullopt, std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -4867,7 +4900,10 @@ TEST_CASE( cells * constants::cell_validity_size, 0); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, @@ -4973,7 +5009,10 @@ TEST_CASE( std::nullopt, nullable ? std::optional(0) : std::nullopt); ResultTile result_tile(0, 0, frag_md, memory_tracker); - ResultTile::TileData tile_data{nullptr, nullptr, nullptr}; + ResultTile::TileData tile_data{ + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}, + {nullptr, ThreadPool::SharedTask()}}; result_tile.init_attr_tile( constants::format_version, *array_schema, diff --git a/tiledb/sm/tile/CMakeLists.txt b/tiledb/sm/tile/CMakeLists.txt index 5cab70bf82c..a553c15a979 100644 --- a/tiledb/sm/tile/CMakeLists.txt +++ b/tiledb/sm/tile/CMakeLists.txt @@ -32,7 +32,7 @@ include(object_library) # commence(object_library tile) this_target_sources(tile.cc) - this_target_object_libraries(baseline buffer constants) + this_target_object_libraries(baseline buffer constants thread_pool) conclude(object_library) # diff --git a/tiledb/sm/tile/generic_tile_io.cc b/tiledb/sm/tile/generic_tile_io.cc index 7ebaef581d7..7f818a360f3 100644 --- a/tiledb/sm/tile/generic_tile_io.cc +++ b/tiledb/sm/tile/generic_tile_io.cc @@ -120,7 +120,8 @@ shared_ptr GenericTileIO::read_generic( header.tile_size, filtered_data.data(), header.persisted_size, - memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO)); + memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO), + std::nullopt); // Read the tile. throw_if_not_ok(resources_.vfs().read( diff --git a/tiledb/sm/tile/test/unit_tile.cc b/tiledb/sm/tile/test/unit_tile.cc index 35754dbbfe6..409ef3dc58a 100644 --- a/tiledb/sm/tile/test/unit_tile.cc +++ b/tiledb/sm/tile/test/unit_tile.cc @@ -57,7 +57,8 @@ TEST_CASE("Tile: Test basic IO", "[Tile][basic_io]") { tile_size, nullptr, 0, - tracker); + tracker, + std::nullopt); CHECK(tile.size() == tile_size); // Create a buffer to write to the test Tile. diff --git a/tiledb/sm/tile/tile.cc b/tiledb/sm/tile/tile.cc index afec0f51b36..f979687c1ab 100644 --- a/tiledb/sm/tile/tile.cc +++ b/tiledb/sm/tile/tile.cc @@ -31,6 +31,7 @@ */ #include "tiledb/sm/tile/tile.h" + #include "tiledb/common/exception/exception.h" #include "tiledb/common/heap_memory.h" #include "tiledb/common/memory_tracker.h" @@ -69,7 +70,8 @@ shared_ptr Tile::from_generic( tile_size, nullptr, 0, - memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO)); + memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO), + std::nullopt); } shared_ptr WriterTile::from_generic( @@ -132,7 +134,8 @@ Tile::Tile( const uint64_t size, void* filtered_data, uint64_t filtered_size, - shared_ptr memory_tracker) + shared_ptr memory_tracker, + std::optional data_io_task) : Tile( format_version, type, @@ -141,7 +144,8 @@ Tile::Tile( size, filtered_data, filtered_size, - memory_tracker->get_resource(MemoryType::TILE_DATA)) { + memory_tracker->get_resource(MemoryType::TILE_DATA), + std::move(data_io_task)) { } Tile::Tile( @@ -152,11 +156,13 @@ Tile::Tile( const uint64_t size, void* filtered_data, uint64_t filtered_size, - tdb::pmr::memory_resource* resource) + tdb::pmr::memory_resource* resource, + std::optional filtered_data_io_task) : TileBase(format_version, type, cell_size, size, resource) , zipped_coords_dim_num_(zipped_coords_dim_num) , filtered_data_(filtered_data) - , filtered_size_(filtered_size) { + , filtered_size_(filtered_size) + , filtered_data_io_task_(std::move(filtered_data_io_task)) { } WriterTile::WriterTile( @@ -205,7 +211,7 @@ void TileBase::write(const void* data, uint64_t offset, uint64_t nbytes) { size_ = std::max(offset + nbytes, size_); } -void Tile::zip_coordinates() { +void Tile::zip_coordinates_unsafe() { assert(zipped_coords_dim_num_ > 0); // For easy reference @@ -280,6 +286,15 @@ uint64_t Tile::load_chunk_data( ChunkData& unfiltered_tile, uint64_t expected_original_size) { assert(filtered()); + std::scoped_lock lock{filtered_data_io_task_mtx_}; + if (filtered_data_io_task_.has_value()) { + if (filtered_data_io_task_.value().valid()) { + throw_if_not_ok(filtered_data_io_task_.value().wait()); + } else { + throw std::future_error(std::future_errc::no_state); + } + } + Deserializer deserializer(filtered_data(), filtered_size()); // Make a pass over the tile to get the chunk information. diff --git a/tiledb/sm/tile/tile.h b/tiledb/sm/tile/tile.h index 343aaa3ce3c..e14ea0d1595 100644 --- a/tiledb/sm/tile/tile.h +++ b/tiledb/sm/tile/tile.h @@ -46,6 +46,7 @@ using namespace tiledb::common; namespace tiledb { namespace sm { +class FilteredData; class MemoryTracker; /** @@ -72,6 +73,8 @@ class TileBase { DISABLE_COPY_AND_COPY_ASSIGN(TileBase); DISABLE_MOVE_AND_MOVE_ASSIGN(TileBase); + virtual ~TileBase() = default; + /* ********************************* */ /* API */ /* ********************************* */ @@ -92,6 +95,16 @@ class TileBase { return static_cast(data()); } + /** + * Converts the data pointer to a specific type with no check on compute + * task. This is used for getting thte data from inside the compute thread + * itself for unfiltering. + */ + template + inline T* data_as_unsafe() const { + return static_cast(data_unsafe()); + } + /** Gets the size, considering the data as a specific type. */ template inline size_t size_as() const { @@ -103,6 +116,14 @@ class TileBase { return data_.get(); } + /** + * Returns the internal buffer. This is used for getting thte data from + * inside the compute thread itself for unfiltering. + */ + inline void* data_unsafe() const { + return data_.get(); + } + /** * Reads from the tile at the given offset into the input * buffer of size nbytes. Does not mutate the internal offset. @@ -134,8 +155,8 @@ class TileBase { * * @param var_tile Var tile. */ - void add_extra_offset(TileBase& var_tile) { - data_as()[size_ / cell_size_ - 1] = var_tile.size(); + void add_extra_offset_unsafe(TileBase& var_tile) { + data_as_unsafe()[size_ / cell_size_ - 1] = var_tile.size(); } protected: @@ -191,6 +212,8 @@ class Tile : public TileBase { * @param size The size of the tile. * @param filtered_data Pointer to the external filtered data. * @param filtered_size The filtered size to allocate. + * @param memory_tracker The memory resource to use. + * @param filtered_data_io_task The I/O task to wait on for data to be valid. */ Tile( const format_version_t format_version, @@ -200,7 +223,8 @@ class Tile : public TileBase { const uint64_t size, void* filtered_data, uint64_t filtered_size, - shared_ptr memory_tracker); + shared_ptr memory_tracker, + std::optional filtered_data_io_task); /** * Constructor. @@ -214,6 +238,7 @@ class Tile : public TileBase { * @param filtered_data Pointer to the external filtered data. * @param filtered_size The filtered size to allocate. * @param resource The memory resource to use. + * @param filtered_data_io_task The I/O task to wait on for data to be valid. */ Tile( const format_version_t format_version, @@ -223,11 +248,14 @@ class Tile : public TileBase { const uint64_t size, void* filtered_data, uint64_t filtered_size, - tdb::pmr::memory_resource* resource); + tdb::pmr::memory_resource* resource, + std::optional filtered_data_io_task); DISABLE_MOVE_AND_MOVE_ASSIGN(Tile); DISABLE_COPY_AND_COPY_ASSIGN(Tile); + ~Tile() = default; + /* ********************************* */ /* API */ /* ********************************* */ @@ -253,17 +281,43 @@ class Tile : public TileBase { /** Returns the buffer that contains the filtered, on-disk format. */ inline char* filtered_data() { + std::scoped_lock lock{filtered_data_io_task_mtx_}; + if (filtered_data_io_task_.has_value()) { + if (filtered_data_io_task_.value().valid()) { + throw_if_not_ok(filtered_data_io_task_.value().wait()); + } else { + throw std::future_error(std::future_errc::no_state); + } + } return static_cast(filtered_data_); } /** Returns the data casted as a type. */ template inline T* filtered_data_as() { + std::scoped_lock lock{filtered_data_io_task_mtx_}; + if (filtered_data_io_task_.has_value()) { + if (filtered_data_io_task_.value().valid()) { + throw_if_not_ok(filtered_data_io_task_.value().wait()); + } else { + throw std::future_error(std::future_errc::no_state); + } + } + return static_cast(filtered_data_); } /** Clears the filtered buffer. */ void clear_filtered_buffer() { + std::scoped_lock lock{filtered_data_io_task_mtx_}; + if (filtered_data_io_task_.has_value()) { + if (filtered_data_io_task_.value().valid()) { + throw_if_not_ok(filtered_data_io_task_.value().wait()); + } else { + throw std::future_error(std::future_errc::no_state); + } + } + filtered_data_ = nullptr; filtered_size_ = 0; } @@ -278,8 +332,12 @@ class Tile : public TileBase { /** * Zips the coordinate values such that a cell's coordinates across * all dimensions appear contiguously in the buffer. + * + * This is marked unsafe because we don't check for unfiltering to be + * completed since this function is used by the unfiltering task itself as + * part of post processing. */ - void zip_coordinates(); + void zip_coordinates_unsafe(); /** * Reads the chunk data of a tile buffer and populates a chunk data structure. @@ -353,6 +411,17 @@ class Tile : public TileBase { /** The size of the filtered data. */ uint64_t filtered_size_; + + /** I/O task to check and block on if filtered data is ready. */ + mutable std::optional filtered_data_io_task_; + + /** + * Lock for checking task, since this tile can be used by multiple threads. + * The ThreadPool::SharedTask lets multiple threads copy the task, but it + * doesn't let multiple threads access a single task itself. Due to this we + * need a mutex since the tile will be accessed by multiple threads. + */ + mutable std::recursive_mutex filtered_data_io_task_mtx_; }; /** @@ -396,7 +465,7 @@ class WriterTile : public TileBase { * @param type The data type. * @param cell_size The cell size. * @param size The size of the tile. - * @param meory_tracker The memory tracker to use. + * @param memory_tracker The memory tracker to use. */ WriterTile( const format_version_t format_version,