Skip to content

Commit

Permalink
sm::array_open_for_reads: fetch fragments async instead of schema
Browse files Browse the repository at this point in the history
Invert the tasking order for async loading established in
  #2061

Now load the fragments in a task and array schema on main.
Fixes several mutex ownership issues which surfaced on Windows.

(1) We need to wait on the async task in all return clauses to avoid
    the following situation which causes an exit on Windows:
    if the context owning the storage manager is destroyed
    without joining the async thread, then we get an error
    on Windows due to attempting to destruct a locked mutex.

(2) We cannot call open_array->mtx_unlock() on the main thread after
    running the array_open call async on the thread pool (which locks
    the open_array.mtx_), because lock/unlock on different threads
    is undefined behavior and a runtime error on windows.
  • Loading branch information
ihnorton committed Feb 5, 2021
1 parent ce00c29 commit 95acab5
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 46 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
## Deprecations

## Bug fixes
* Fix mutex locking bugs on Windows due to unlocking on different thread and missing task join [#2077](https://github.com/TileDB-Inc/TileDB/pull/2077)

## API additions

Expand Down
108 changes: 62 additions & 46 deletions tiledb/sm/storage_manager/storage_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,48 +204,58 @@ Status StorageManager::array_open_for_reads(
std::vector<FragmentMetadata*>* fragment_metadata) {
STATS_START_TIMER(stats::Stats::TimerType::READ_ARRAY_OPEN)

// Open array without fragments async. This loads the array schema which
// is not needed in this function, so it can safely be loaded in parallel
// to listing of the fragment metadata and loading the consolidated fragment
// metadata file
auto open_array = (OpenArray*)nullptr;
std::vector<ThreadPool::Task> load_array_schema_task;
load_array_schema_task.emplace_back(io_tp_->execute([&, this]() {
RETURN_NOT_OK_ELSE(
array_open_without_fragments(array_uri, enc_key, &open_array),
*array_schema = nullptr);
return Status::Ok();
}));

// Determine which fragments to load
std::vector<TimestampedURI> fragments_to_load;
std::vector<URI> fragment_uris;
URI meta_uri;
RETURN_NOT_OK(get_fragment_uris(array_uri, &fragment_uris, &meta_uri));
RETURN_NOT_OK(get_sorted_uris(fragment_uris, timestamp, &fragments_to_load));

// Get the consolidated fragment metadata
Buffer f_buff;
std::unordered_map<std::string, uint64_t> offsets;
uint32_t meta_version = 0;
RETURN_NOT_OK(load_consolidated_fragment_meta(
meta_uri, enc_key, &f_buff, &offsets, &meta_version));

// Wait for array schema to be loaded
RETURN_NOT_OK(io_tp_->wait_all(load_array_schema_task));
// Fetch array fragments async
std::vector<ThreadPool::Task> load_array_fragments_task;
load_array_fragments_task.emplace_back(io_tp_->execute([&, this]() {
// Determine which fragments to load
RETURN_NOT_OK(get_fragment_uris(array_uri, &fragment_uris, &meta_uri));
RETURN_NOT_OK(
get_sorted_uris(fragment_uris, timestamp, &fragments_to_load));
// Get the consolidated fragment metadata
RETURN_NOT_OK(load_consolidated_fragment_meta(
meta_uri, enc_key, &f_buff, &offsets, &meta_version));
return Status::Ok();
}));

auto open_array = (OpenArray*)nullptr;
Status st = array_open_without_fragments(array_uri, enc_key, &open_array);

if (!st.ok()) {
io_tp_->wait_all(load_array_fragments_task);
*array_schema = nullptr;
return st;
}

// Wait for array fragments to be loaded
st = io_tp_->wait_all(load_array_fragments_task);

if (!st.ok()) {
open_array->mtx_unlock();
array_close_for_reads(array_uri);
*array_schema = nullptr;
return st;
}

// Retrieve array schema
*array_schema = open_array->array_schema();

// Get fragment metadata in the case of reads, if not fetched already
Status st = load_fragment_metadata(
st = load_fragment_metadata(
open_array,
enc_key,
fragments_to_load,
&f_buff,
offsets,
meta_version,
fragment_metadata);

if (!st.ok()) {
open_array->mtx_unlock();
array_close_for_reads(array_uri);
Expand All @@ -270,53 +280,59 @@ Status StorageManager::array_open_for_reads(
std::vector<FragmentMetadata*>* fragment_metadata) {
STATS_START_TIMER(stats::Stats::TimerType::READ_ARRAY_OPEN)

// Open array without fragments async. This loads the array schema which
// is not needed in this function, so it can safely be loaded in parallel
// to listing of the fragment metadata and loading the consolidated fragment
// metadata file
auto open_array = (OpenArray*)nullptr;
std::vector<ThreadPool::Task> load_array_schema_task;
load_array_schema_task.emplace_back(io_tp_->execute([&, this]() {
RETURN_NOT_OK_ELSE(
array_open_without_fragments(array_uri, enc_key, &open_array),
*array_schema = nullptr);
return Status::Ok();
}));

// Determine which fragments to load
std::vector<TimestampedURI> fragments_to_load;
const auto& fragments = fragment_info.fragments();
for (const auto& fragment : fragments)
fragments_to_load.emplace_back(fragment.uri(), fragment.timestamp_range());

// Get the consolidated fragment metadata URI
URI meta_uri;
std::vector<URI> uris;
RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
RETURN_NOT_OK(get_consolidated_fragment_meta_uri(uris, &meta_uri));

// Get the consolidated fragment metadata
Buffer f_buff;
std::unordered_map<std::string, uint64_t> offsets;
uint32_t meta_version = 0;
RETURN_NOT_OK(load_consolidated_fragment_meta(
meta_uri, enc_key, &f_buff, &offsets, &meta_version));

std::vector<ThreadPool::Task> load_array_fragments_task;
load_array_fragments_task.emplace_back(io_tp_->execute([&, this]() {
RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
// Get the consolidated fragment metadata URI
RETURN_NOT_OK(get_consolidated_fragment_meta_uri(uris, &meta_uri));
// Get the consolidated fragment metadata
RETURN_NOT_OK(load_consolidated_fragment_meta(
meta_uri, enc_key, &f_buff, &offsets, &meta_version));
return Status::Ok();
}));

auto open_array = (OpenArray*)nullptr;
Status st = array_open_without_fragments(array_uri, enc_key, &open_array);
if (!st.ok()) {
io_tp_->wait_all(load_array_fragments_task);
*array_schema = nullptr;
return st;
}

// Wait for array schema to be loaded
RETURN_NOT_OK(io_tp_->wait_all(load_array_schema_task));
st = io_tp_->wait_all(load_array_fragments_task);
if (!st.ok()) {
open_array->mtx_unlock();
array_close_for_reads(array_uri);
*array_schema = nullptr;
return st;
}

// Retrieve array schema
// Assign array schema
*array_schema = open_array->array_schema();

// Get fragment metadata in the case of reads, if not fetched already
Status st = load_fragment_metadata(
st = load_fragment_metadata(
open_array,
enc_key,
fragments_to_load,
&f_buff,
offsets,
meta_version,
fragment_metadata);

if (!st.ok()) {
open_array->mtx_unlock();
array_close_for_reads(array_uri);
Expand Down

0 comments on commit 95acab5

Please sign in to comment.