diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a99461e3797..4b59325dba2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [3.0.8] + +### Fixed + +- Fixed a bug where historical query fetches could stall when requesting a range of large ledger entries (#5026, #5058). + ## [3.0.7] [3.0.7]: https://github.com/microsoft/CCF/releases/tag/ccf-3.0.7 diff --git a/src/host/ledger.h b/src/host/ledger.h index ab4dacc0dbe5..abe4e8537251 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -125,6 +125,12 @@ namespace asynchost return match; } + struct LedgerReadResult + { + std::vector data; + size_t end_idx; + }; + class LedgerFile { private: @@ -378,6 +384,13 @@ namespace asynchost if (from == to) { // Request one entry that is too large: no entries are found + LOG_TRACE_FMT( + "Single ledger entry at {} in file {} is too large for remaining " + "space (size {} > max {})", + from, + file_name, + size, + max_size.value()); return {0, 0}; } size_t to_ = from + (to - from) / 2; @@ -397,7 +410,7 @@ namespace asynchost return {size, to}; } - std::optional, size_t>> read_entries( + std::optional read_entries( size_t from, size_t to, std::optional max_size = std::nullopt) { if ((from < start_idx) || (to > get_last_idx()) || (to < from)) @@ -428,7 +441,7 @@ namespace asynchost file_name)); } - return std::make_pair(entries, to_); + return LedgerReadResult{entries, to_}; } bool truncate(size_t idx) @@ -762,11 +775,10 @@ namespace asynchost return files.back(); } - std::optional> read_entries_range( + std::optional read_entries_range( size_t from, size_t to, bool read_cache_only = false, - bool strict = true, std::optional max_entries_size = std::nullopt) { // Note: if max_entries_size is set, this returns contiguous ledger @@ -777,20 +789,17 @@ namespace asynchost return std::nullopt; } - // If non-strict, return as many entries as possible + // During recovery or other low-knowledge batch operations, we might + // request entries past the end of the ledger - truncate to the true end + // here. if (to > last_idx) { - if (strict) - { - return std::nullopt; - } - else - { - to = last_idx; - } + to = last_idx; } - std::vector entries = {}; + LedgerReadResult rr; + rr.end_idx = to; + size_t idx = from; while (idx <= to) { @@ -804,19 +813,19 @@ namespace asynchost std::optional max_size = std::nullopt; if (max_entries_size.has_value()) { - max_size = max_entries_size.value() - entries.size(); + max_size = max_entries_size.value() - rr.data.size(); } auto v = f_from->read_entries(idx, to_, max_size); if (!v.has_value()) { - return std::nullopt; + break; } - auto& [e, to_read] = v.value(); - entries.insert( - entries.end(), - std::make_move_iterator(e.begin()), - std::make_move_iterator(e.end())); - if (to_read != to_) + rr.end_idx = v->end_idx; + rr.data.insert( + rr.data.end(), + std::make_move_iterator(v->data.begin()), + std::make_move_iterator(v->data.end())); + if (v->end_idx != to_) { // If all the entries requested from a file are not returned (i.e. // because the requested entries are larger than max_entries_size), @@ -827,7 +836,14 @@ namespace asynchost idx = to_ + 1; } - return entries; + if (!rr.data.empty()) + { + return rr; + } + else + { + return std::nullopt; + } } void ignore_ledger_file(const std::string& file_name) @@ -1113,7 +1129,7 @@ namespace asynchost recovery_start_idx = idx; } - std::optional> read_entry(size_t idx) + std::optional read_entry(size_t idx) { TimeBoundLogger log_if_slow( fmt::format("Reading ledger entry at {}", idx)); @@ -1121,16 +1137,15 @@ namespace asynchost return read_entries_range(idx, idx); } - std::optional> read_entries( + std::optional read_entries( size_t from, size_t to, - bool strict = true, std::optional max_entries_size = std::nullopt) { TimeBoundLogger log_if_slow( fmt::format("Reading ledger entries from {} to {}", from, to)); - return read_entries_range(from, to, false, strict, max_entries_size); + return read_entries_range(from, to, false, max_entries_size); } size_t write_entry(const uint8_t* data, size_t size, bool committable) @@ -1293,30 +1308,31 @@ namespace asynchost Ledger* ledger; size_t from_idx; size_t to_idx; + size_t max_size; // First argument is ledger entries (or nullopt if not found) // Second argument is uv status code, which may indicate a cancellation using ResultCallback = - std::function>&&, int)>; + std::function&&, int)>; ResultCallback result_cb; // Final result - std::optional> entries = std::nullopt; + std::optional read_result = std::nullopt; }; static void on_ledger_get_async(uv_work_t* req) { auto data = static_cast(req->data); - data->entries = - data->ledger->read_entries_range(data->from_idx, data->to_idx, true); + data->read_result = data->ledger->read_entries_range( + data->from_idx, data->to_idx, true, data->max_size); } static void on_ledger_get_async_complete(uv_work_t* req, int status) { auto data = static_cast(req->data); - data->result_cb(std::move(data->entries), status); + data->result_cb(std::move(data->read_result), status); delete data; delete req; @@ -1325,18 +1341,18 @@ namespace asynchost void write_ledger_get_range_response( size_t from_idx, size_t to_idx, - std::optional>&& entries, + std::optional&& read_result, consensus::LedgerRequestPurpose purpose) { - if (entries.has_value()) + if (read_result.has_value()) { RINGBUFFER_WRITE_MESSAGE( consensus::ledger_entry_range, to_enclave, from_idx, - to_idx, + read_result->end_idx, purpose, - entries.value()); + read_result->data); } else { @@ -1401,11 +1417,6 @@ namespace asynchost auto [from_idx, to_idx, purpose] = ringbuffer::read_message(data, size); - // Recovery reads ledger in fixed-size batches until it reaches the - // end of the ledger. When the end of the ledger is reached, we return - // as many entries as possible including the very last one. - bool strict = purpose != consensus::LedgerRequestPurpose::Recovery; - // Ledger entries response has metadata so cap total entries size // accordingly constexpr size_t write_ledger_range_response_metadata_size = 2048; @@ -1423,21 +1434,15 @@ namespace asynchost job->ledger = this; job->from_idx = from_idx; job->to_idx = to_idx; - job->result_cb = [this, - from_idx = from_idx, - to_idx = to_idx, - purpose = purpose, - strict = strict, - max_entries_size = - max_entries_size](auto&& entry, int status) { - // NB: Even if status is cancelled (and entry is empty), we - // want to write this result back to the enclave - write_ledger_get_range_response( - from_idx, - to_idx, - read_entries(from_idx, to_idx, strict, max_entries_size), - purpose); - }; + job->max_size = max_entries_size; + job->result_cb = + [this, from_idx = from_idx, to_idx = to_idx, purpose = purpose]( + auto&& read_result, int status) { + // NB: Even if status is cancelled (and entry is empty), we + // want to write this result back to the enclave + write_ledger_get_range_response( + from_idx, to_idx, std::move(read_result), purpose); + }; work_handle->data = job; } @@ -1455,7 +1460,7 @@ namespace asynchost write_ledger_get_range_response( from_idx, to_idx, - read_entries(from_idx, to_idx, strict, max_entries_size), + read_entries(from_idx, to_idx, max_entries_size), purpose); } }); diff --git a/src/host/node_connections.h b/src/host/node_connections.h index be336203ecd5..51e3fea328f7 100644 --- a/src/host/node_connections.h +++ b/src/host/node_connections.h @@ -337,17 +337,42 @@ namespace asynchost // Find the total frame size, and write it along with the header. uint32_t frame = (uint32_t)size_to_send; - std::optional> framed_entries = std::nullopt; - framed_entries = ledger.read_entries(ae.prev_idx + 1, ae.idx); - if (framed_entries.has_value()) + if (ae.idx > ae.prev_idx) { - frame += (uint32_t)framed_entries->size(); - outbound_connection->write(sizeof(uint32_t), (uint8_t*)&frame); - outbound_connection->write(size_to_send, data_to_send); + std::optional read_result = + ledger.read_entries(ae.prev_idx + 1, ae.idx); - frame = (uint32_t)framed_entries->size(); - outbound_connection->write(frame, framed_entries->data()); + if (!read_result.has_value()) + { + LOG_FAIL_FMT( + "Unable to send AppendEntries ({}, {}]: Ledger read failed", + ae.prev_idx, + ae.idx); + return; + } + else if (ae.idx != read_result->end_idx) + { + // NB: This should never happen since we do not pass a max_size + // to read_entries + LOG_FAIL_FMT( + "Unable to send AppendEntries ({}, {}]: Ledger read returned " + "entries to {}", + ae.prev_idx, + ae.idx, + read_result->end_idx); + return; + } + else + { + const auto& framed_entries = read_result->data; + frame += (uint32_t)framed_entries.size(); + outbound_connection->write(sizeof(uint32_t), (uint8_t*)&frame); + outbound_connection->write(size_to_send, data_to_send); + + outbound_connection->write( + framed_entries.size(), framed_entries.data()); + } } else { diff --git a/src/host/test/ledger.cpp b/src/host/test/ledger.cpp index 13983e3dce7e..ccb8b11e9632 100644 --- a/src/host/test/ledger.cpp +++ b/src/host/test/ledger.cpp @@ -135,8 +135,11 @@ size_t number_of_recovery_files_in_ledger_dir() } void verify_framed_entries_range( - const std::vector& framed_entries, size_t from, size_t to) + const asynchost::LedgerReadResult& read_result, size_t from, size_t to) { + REQUIRE(read_result.end_idx <= to); + + const auto& framed_entries = read_result.data; size_t idx = from; for (size_t pos = 0; pos < framed_entries.size();) { @@ -152,7 +155,7 @@ void verify_framed_entries_range( idx++; } - REQUIRE(idx == to + 1); + REQUIRE(idx == read_result.end_idx + 1); } void read_entry_from_ledger(Ledger& ledger, size_t idx) @@ -160,7 +163,7 @@ void read_entry_from_ledger(Ledger& ledger, size_t idx) auto framed_entry = ledger.read_entry(idx); REQUIRE(framed_entry.has_value()); - auto& entry = framed_entry.value(); + auto& entry = framed_entry->data; const uint8_t* data = entry.data(); auto size = entry.size(); auto header = serialized::read(data, size); @@ -170,13 +173,13 @@ void read_entry_from_ledger(Ledger& ledger, size_t idx) REQUIRE(TestLedgerEntry(data, size).value() == idx); } -void read_entries_range_from_ledger( +size_t read_entries_range_from_ledger( Ledger& ledger, size_t from, size_t to, std::optional max_entries_size = std::nullopt) { - auto entries = ledger.read_entries(from, to, true, max_entries_size); + auto entries = ledger.read_entries(from, to, max_entries_size); if (!entries.has_value()) { throw std::logic_error( @@ -184,6 +187,8 @@ void read_entries_range_from_ledger( } verify_framed_entries_range(entries.value(), from, to); + + return entries->end_idx; } // Keeps track of ledger entries written to the ledger. @@ -233,8 +238,13 @@ class TestEntrySubmitter if (idx > 0) { read_entries_range_from_ledger(ledger, 1, idx); + + { + auto truncated_read_result = ledger.read_entries(1, idx + 1); + REQUIRE(truncated_read_result.has_value()); + REQUIRE(truncated_read_result->end_idx == idx); + } } - REQUIRE_FALSE(ledger.read_entries(1, idx + 1).has_value()); if (idx < last_idx) { @@ -398,9 +408,15 @@ TEST_CASE("Regular chunking") // Reading from 0 fails REQUIRE_FALSE(ledger.read_entries(0, end_of_first_chunk_idx).has_value()); - // Reading in the future fails - REQUIRE_FALSE(ledger.read_entries(1, last_idx + 1).has_value()); - REQUIRE_FALSE(ledger.read_entries(last_idx, last_idx + 1).has_value()); + { + // Reading in the future reports correct end-of-ledger + auto result = ledger.read_entries(1, last_idx + 1); + REQUIRE(result.has_value()); + REQUIRE(result->end_idx == last_idx); + result = ledger.read_entries(last_idx, last_idx + 1); + REQUIRE(result.has_value()); + REQUIRE(result->end_idx == last_idx); + } // Reading from the start to any valid index succeeds read_entries_range_from_ledger(ledger, 1, 1); @@ -422,15 +438,14 @@ TEST_CASE("Regular chunking") read_entries_range_from_ledger( ledger, end_of_first_chunk_idx + 1, last_idx - 1); - // Non strict - bool strict = false; - auto entries = ledger.read_entries(1, last_idx, strict); + // Non strict, always capped by last_idx + auto entries = ledger.read_entries(1, last_idx); verify_framed_entries_range(entries.value(), 1, last_idx); - entries = ledger.read_entries(1, last_idx + 1, strict); + entries = ledger.read_entries(1, last_idx + 1); verify_framed_entries_range(entries.value(), 1, last_idx); - entries = ledger.read_entries(end_of_first_chunk_idx, 2 * last_idx, strict); + entries = ledger.read_entries(end_of_first_chunk_idx, 2 * last_idx); verify_framed_entries_range( entries.value(), end_of_first_chunk_idx, last_idx); } @@ -441,23 +456,28 @@ TEST_CASE("Regular chunking") // Reading entries larger than the max entries size fails REQUIRE_FALSE( - ledger.read_entries(1, 1, true, 0 /* max_entries_size */).has_value()); + ledger.read_entries(1, 1, 0 /* max_entries_size */).has_value()); REQUIRE_FALSE( - ledger.read_entries(1, end_of_first_chunk_idx + 1, true, 0).has_value()); + ledger.read_entries(1, end_of_first_chunk_idx + 1, 0).has_value()); // Reading entries larger than max entries size returns some entries size_t max_entries_size = chunk_threshold / entries_per_chunk; - auto e = - ledger.read_entries(1, end_of_first_chunk_idx, true, max_entries_size); + auto e = ledger.read_entries(1, end_of_first_chunk_idx, max_entries_size); REQUIRE(e.has_value()); verify_framed_entries_range(e.value(), 1, 1); - e = ledger.read_entries( - 1, end_of_first_chunk_idx + 1, true, max_entries_size); + e = ledger.read_entries(1, end_of_first_chunk_idx + 1, max_entries_size); REQUIRE(e.has_value()); verify_framed_entries_range(e.value(), 1, 1); + // Even over chunk boundaries + e = ledger.read_entries( + end_of_first_chunk_idx, end_of_first_chunk_idx + 1, max_entries_size); + REQUIRE(e.has_value()); + verify_framed_entries_range( + e.value(), end_of_first_chunk_idx, end_of_first_chunk_idx + 1); + max_entries_size = 2 * chunk_threshold; // All entries are returned @@ -472,13 +492,12 @@ TEST_CASE("Regular chunking") ledger, last_idx - 1, last_idx, max_entries_size); // Only some entries are returned - e = ledger.read_entries( - 1, 2 * end_of_first_chunk_idx, true, max_entries_size); + e = ledger.read_entries(1, 2 * end_of_first_chunk_idx, max_entries_size); REQUIRE(e.has_value()); verify_framed_entries_range(e.value(), 1, end_of_first_chunk_idx + 1); e = ledger.read_entries( - end_of_first_chunk_idx + 1, last_idx, true, max_entries_size); + end_of_first_chunk_idx + 1, last_idx, max_entries_size); REQUIRE(e.has_value()); verify_framed_entries_range( e.value(), end_of_first_chunk_idx + 1, 2 * end_of_first_chunk_idx + 1); @@ -641,7 +660,9 @@ TEST_CASE("Commit") last_idx = entry_submitter.get_last_idx(); ledger.truncate(last_idx - 1); // Deletes entry at last_idx read_entries_range_from_ledger(ledger, 1, last_idx - 1); - REQUIRE_FALSE(ledger.read_entries(1, last_idx).has_value()); + const auto truncated_read_result = ledger.read_entries(1, last_idx); + REQUIRE(truncated_read_result.has_value()); + REQUIRE(truncated_read_result->end_idx == last_idx - 1); } } @@ -1508,7 +1529,8 @@ TEST_CASE("Recovery") // Recovery files in read-only ledger directory are ignored on startup REQUIRE(number_of_recovery_files_in_ledger_dir() == 2); - REQUIRE_THROWS(read_entries_range_from_ledger(new_ledger, 1, last_idx)); + auto read_to = read_entries_range_from_ledger(new_ledger, 1, last_idx); + REQUIRE(read_to < last_idx); // Entries pre-recovery can still be read read_entries_range_from_ledger(new_ledger, 1, pre_recovery_last_idx); @@ -1521,7 +1543,8 @@ TEST_CASE("Recovery") // Recovery files in main ledger directory are automatically deleted on // ledger creation REQUIRE(number_of_recovery_files_in_ledger_dir() == 0); - REQUIRE_THROWS(read_entries_range_from_ledger(new_ledger, 1, last_idx)); + auto read_to = read_entries_range_from_ledger(new_ledger, 1, last_idx); + REQUIRE(read_to < last_idx); // Entries pre-recovery can still be read read_entries_range_from_ledger(new_ledger, 1, pre_recovery_last_idx); diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index a0d084880c96..d374db6e81d7 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -377,6 +377,8 @@ TEST_CASE_TEMPLATE( while (indexer.update_strategies(step_time, kv_store.current_txid()) || handled_writes < writes.size()) { + cache->tick(ccf::historical::slow_fetch_threshold / 2); + // Do the fetch, simulating an asynchronous fetch by the historical // query system for (auto it = writes.begin() + handled_writes; it != writes.end(); ++it) @@ -576,6 +578,8 @@ TEST_CASE( while (indexer.update_strategies(step_time, kv_store.current_txid()) || handled_writes < writes.size()) { + cache->tick(ccf::historical::slow_fetch_threshold / 2); + // Do the fetch, simulating an asynchronous fetch by the historical // query system for (auto it = writes.begin() + handled_writes; it != writes.end(); diff --git a/src/node/historical_queries.h b/src/node/historical_queries.h index 654f905eda14..ee0b964c776b 100644 --- a/src/node/historical_queries.h +++ b/src/node/historical_queries.h @@ -101,18 +101,6 @@ namespace ccf::historical using LedgerEntry = std::vector; - struct LedgerSecretRecoveryInfo - { - ccf::SeqNo target_seqno = 0; - LedgerSecretPtr last_ledger_secret; - - LedgerSecretRecoveryInfo( - ccf::SeqNo target_seqno_, LedgerSecretPtr last_ledger_secret_) : - target_seqno(target_seqno_), - last_ledger_secret(last_ledger_secret_) - {} - }; - ccf::VersionedLedgerSecret get_earliest_known_ledger_secret() { if (historical_ledger_secrets->is_empty()) @@ -131,6 +119,7 @@ namespace ccf::historical struct StoreDetails { + std::chrono::milliseconds time_until_fetch; RequestStage current_stage = RequestStage::Fetching; crypto::Sha256Hash entry_digest = {}; ccf::ClaimsDigest claims_digest = {}; @@ -171,12 +160,32 @@ namespace ccf::historical } }; using StoreDetailsPtr = std::shared_ptr; + using RequestedStores = std::map; + + using WeakStoreDetailsPtr = std::weak_ptr; + using AllRequestedStores = std::map; + + struct LedgerSecretRecoveryInfo + { + ccf::SeqNo target_seqno = 0; + LedgerSecretPtr last_ledger_secret; + StoreDetailsPtr target_details; + + LedgerSecretRecoveryInfo( + ccf::SeqNo target_seqno_, + LedgerSecretPtr last_ledger_secret_, + StoreDetailsPtr target_details_) : + target_seqno(target_seqno_), + last_ledger_secret(last_ledger_secret_), + target_details(target_details_) + {} + }; struct Request { - ccf::SeqNo first_requested_seqno = 0; - SeqNoCollection requested_seqnos; - std::map requested_stores; + AllRequestedStores& all_stores; + + RequestedStores my_stores; std::chrono::milliseconds time_to_expiry; bool include_receipts = false; @@ -184,82 +193,114 @@ namespace ccf::historical // Entries from outside the requested range (such as the next signature) // may be needed to produce receipts. They are stored here, distinct from // user-requested stores. - std::map supporting_signatures; + RequestedStores supporting_signatures; // Only set when recovering ledger secrets std::unique_ptr ledger_secret_recovery_info = nullptr; - Request() {} + Request(AllRequestedStores& all_stores_) : all_stores(all_stores_) {} StoreDetailsPtr get_store_details(ccf::SeqNo seqno) const { - auto it = requested_stores.find(seqno); - if (it != requested_stores.end()) - { - return it->second; - } - - auto supporting_it = supporting_signatures.find(seqno); - if (supporting_it != supporting_signatures.end()) + auto it = all_stores.find(seqno); + if (it != all_stores.end()) { - return supporting_it->second; + return it->second.lock(); } return nullptr; } - using SeqNoRange = std::pair; - - // Keep as many existing entries as possible, return indices that weren't - // already present to indicate they should be fetched. For example, if we - // were previously fetching: - // 2 3 4 5 - // and then we adjust to: - // 4 5 - // we don't need to fetch anything new; this is a subrange. But if we - // adjust to: - // 0 1 2 3 4 5 6 - // we need to start fetching 0, 1, and 6. - SeqNoCollection adjust_ranges( - const SeqNoCollection& new_seqnos, bool should_include_receipts) + ccf::SeqNo first_requested_seqno() const { - HISTORICAL_LOG( - "Adjusting ranges, previously {}, new {} ({} vs {})", - requested_seqnos.size(), - new_seqnos.size(), - include_receipts, - should_include_receipts); - if ( - new_seqnos == requested_seqnos && - should_include_receipts == include_receipts) + if (!my_stores.empty()) { - // This is precisely the request we're already tracking - do nothing - HISTORICAL_LOG("Already have this range"); - return {}; + return my_stores.begin()->first; } - std::set newly_requested; - std::map new_stores; + return {}; + } + + void adjust_ranges( + const SeqNoCollection& new_seqnos, + bool should_include_receipts, + SeqNo earliest_ledger_secret_seqno) + { + bool any_diff = false; + + // If a seqno is earlier than the earliest known ledger secret, we will + // store that it was requested with a nullptr in `my_stores`, but not + // add it to `all_stores` to begin fetching until a sufficiently early + // secret has been retrieved. To avoid awkwardly sharding requests (and + // delaying the secret-fetch with a large request for a later range), we + // extend that to say that if _any_ seqno is too early, then _all_ + // subsequent seqnos will be pending. This bool tracks that behaviour. + bool any_too_early = false; - for (auto seqno : new_seqnos) { - auto existing_details = get_store_details(seqno); - if (existing_details == nullptr) + auto prev_it = my_stores.begin(); + auto new_it = new_seqnos.begin(); + while (new_it != new_seqnos.end()) { - newly_requested.insert(seqno); - new_stores[seqno] = std::make_shared(); - HISTORICAL_LOG("{} is new", seqno); + if (*new_it == prev_it->first) + { + // Asking for a seqno which was also requested previously - do + // nothing and advance to compare next entries + ++new_it; + ++prev_it; + } + else if (*new_it > prev_it->first) + { + // No longer looking for a seqno which was previously requested. + // Remove it from my_stores + prev_it = my_stores.erase(prev_it); + any_diff |= true; + } + else + { + // *new_it < prev_it->first + // Asking for a seqno which was not previously being fetched => + // check if another request was fetching it, else create new + // details to track it + if (*new_it < earliest_ledger_secret_seqno || any_too_early) + { + // If this is too early for known secrets, just record that it + // was requested but don't add it to all_stores yet + prev_it = my_stores.insert_or_assign(prev_it, *new_it, nullptr); + any_too_early = true; + } + else + { + auto all_it = all_stores.find(*new_it); + auto details = + all_it == all_stores.end() ? nullptr : all_it->second.lock(); + if (details == nullptr) + { + HISTORICAL_LOG("{} is newly requested", *new_it); + details = std::make_shared(); + all_stores.insert_or_assign(all_it, *new_it, details); + } + prev_it = my_stores.insert_or_assign(prev_it, *new_it, details); + } + any_diff |= true; + } } - else + + if (prev_it != my_stores.end()) { - new_stores[seqno] = std::move(existing_details); - HISTORICAL_LOG("Found {} already", seqno); + // If we have a suffix of seqnos previously requested, now + // unrequested, purge them + my_stores.erase(prev_it, my_stores.end()); + any_diff |= true; } } - requested_stores = std::move(new_stores); - first_requested_seqno = new_seqnos.front(); + if (!any_diff && (should_include_receipts == include_receipts)) + { + HISTORICAL_LOG("Identical to previous request"); + return; + } // If the range has changed, forget what ledger secrets we may have been // fetching - the caller can begin asking for them again @@ -268,7 +309,6 @@ namespace ccf::historical const auto newly_requested_receipts = should_include_receipts && !include_receipts; - requested_seqnos = new_seqnos; include_receipts = should_include_receipts; HISTORICAL_LOG( @@ -283,214 +323,155 @@ namespace ccf::historical for (auto seqno : new_seqnos) { - const auto next_seqno = populate_receipts(seqno); - if (next_seqno.has_value()) - { - newly_requested.insert(*next_seqno); - supporting_signatures[*next_seqno] = - std::make_shared(); - } + populate_receipts(seqno); } } - - return SeqNoCollection(newly_requested.begin(), newly_requested.end()); } - std::optional populate_receipts(ccf::SeqNo new_seqno) + void populate_receipts(ccf::SeqNo new_seqno) { HISTORICAL_LOG( "Looking at {}, and populating receipts from it", new_seqno); auto new_details = get_store_details(new_seqno); - if (new_details->store != nullptr) + if (new_details != nullptr && new_details->store != nullptr) { if (new_details->is_signature) { HISTORICAL_LOG("{} is a signature", new_seqno); - // Iterate through earlier indices. If this signature covers them - // then create a receipt for them - const auto sig = get_signature(new_details->store); - ccf::MerkleTreeHistory tree(get_tree(new_details->store).value()); - - for (auto seqno : requested_seqnos) - { - if (seqno >= new_seqno) - { - break; - } - if (tree.in_range(seqno)) - { - auto details = get_store_details(seqno); - if (details != nullptr && details->store != nullptr) - { - auto proof = tree.get_proof(seqno); - details->transaction_id = {sig->view, seqno}; - details->receipt = std::make_shared( - sig->sig, - proof.get_root(), - proof.get_path(), - sig->node, - sig->cert, - details->entry_digest, - details->get_commit_evidence(), - details->claims_digest); - HISTORICAL_LOG( - "Assigned a sig for {} after given signature at {}", - seqno, - new_seqno); - } - } - } + fill_receipts_from_signature(new_details); } else { + // This isn't a signature. To find the signature for this, we look + // through every subsequent transaction, until we find either a gap + // (a seqno that hasn't been fetched yet), or a signature. If it is + // a signature, and we've found a contiguous range of seqnos to it, + // then it must be a signature over this seqno. Else we find a gap + // first, and fetch it in case it is the signature. It's possible + // that we already have the later signature, and wastefully fill in + // the gaps, but this reduces the cases we have to consider so makes + // the code much simpler. + HISTORICAL_LOG("{} is not a signature", new_seqno); - const auto sig_it = supporting_signatures.find(new_seqno); - if (sig_it != supporting_signatures.end()) + supporting_signatures.erase(new_seqno); + + auto next_seqno = new_seqno + 1; + while (true) { - // This was a search for a supporting signature, but this entry is - // _not_ a signature - fetch the next - // NB: We skip any entries we already have here. It is possible we - // are fetching 10, previously had entries at 13, 14, 15, and the - // signature for all of these is at 20. The supporting signature - // for 10 tries 11, then 12. Next, it should try 16, not 13. - auto next_seqno = new_seqno + 1; - while (requested_seqnos.contains(next_seqno)) + auto all_it = all_stores.find(next_seqno); + auto details = + all_it == all_stores.end() ? nullptr : all_it->second.lock(); + if (details == nullptr) { - ++next_seqno; + HISTORICAL_LOG( + "Looking for new supporting signature at {}", next_seqno); + details = std::make_shared(); + all_stores.insert_or_assign(all_it, next_seqno, details); } - HISTORICAL_LOG( - "{} was a supporting signature attempt, fetch next {}", - new_seqno, - next_seqno); - return {next_seqno}; - } - else if (new_details->receipt == nullptr) - { - HISTORICAL_LOG( - "{} also has no receipt - looking for later signature", - new_seqno); - // Iterate through later indices, see if there's a signature that - // covers this one - const auto& untrusted_digest = new_details->entry_digest; - bool sig_seen = false; - std::optional end_of_matching_range = std::nullopt; - for (const auto& [first_seqno, additional] : - requested_seqnos.get_ranges()) + + if (details->store == nullptr) { - if (first_seqno + additional < new_seqno) - { - HISTORICAL_LOG( - "Ignoring range starting at {} - too early", first_seqno); - continue; - } + // Whether we just started fetching or someone else was already + // looking for this, it's the first gap we've found so _may_ be + // our signature + HISTORICAL_LOG( + "Assigning {} as potential signature for {}", + next_seqno, + new_seqno); + supporting_signatures[next_seqno] = details; + return; + } + else if (details->is_signature) + { + const auto filled_this = + fill_receipts_from_signature(details, new_seqno); - if (!end_of_matching_range.has_value()) + if ( + !filled_this && my_stores.find(new_seqno) != my_stores.end()) { - end_of_matching_range = first_seqno + additional; + throw std::logic_error(fmt::format( + "Unexpected: Found a signature at {}, and contiguous range " + "of transactions from {}, yet signature does not cover " + "this seqno!", + next_seqno, + new_seqno)); } - for (auto seqno = first_seqno; - seqno <= first_seqno + additional; - ++seqno) - { - if (seqno <= new_seqno) - { - HISTORICAL_LOG("Ignoring {} - too early", seqno); - continue; - } + return; + } + else + { + // This is a normal transaction, and its already fetched. + // Nothing to do, consider the next. + ++next_seqno; + } + } + } + } + } - auto details = get_store_details(seqno); - if (details != nullptr) - { - if (details->store != nullptr && details->is_signature) - { - const auto sig = get_signature(details->store); - ccf::MerkleTreeHistory tree( - get_tree(details->store).value()); - if (tree.in_range(new_seqno)) - { - auto proof = tree.get_proof(new_seqno); - new_details->transaction_id = {sig->view, new_seqno}; - new_details->receipt = std::make_shared( - sig->sig, - proof.get_root(), - proof.get_path(), - sig->node, - sig->cert, - new_details->entry_digest, - details->get_commit_evidence(), - new_details->claims_digest); - return std::nullopt; - } - - // Break here - if this signature doesn't cover us, no - // later one can - sig_seen = true; - HISTORICAL_LOG( - "Found a sig for {} at {}", new_seqno, seqno); - break; - } - } - } + private: + bool fill_receipts_from_signature( + const std::shared_ptr& sig_details, + std::optional should_fill = std::nullopt) + { + // Iterate through earlier indices. If this signature covers them + // then create a receipt for them + const auto sig = get_signature(sig_details->store); + ccf::MerkleTreeHistory tree(get_tree(sig_details->store).value()); - if (sig_seen) - { - break; - } - } + // This is either pointing at the sig itself, or the closest larger + // seqno we're holding + auto sig_lower_bound_it = + my_stores.lower_bound(sig_details->transaction_id.seqno); - if (!sig_seen) + if (sig_lower_bound_it != my_stores.begin()) // Skip empty map edge case + { + // Construct reverse iterator to search backwards from here + auto search_rit = std::reverse_iterator(sig_lower_bound_it); + while (search_rit != my_stores.rend()) + { + auto seqno = search_rit->first; + if (tree.in_range(seqno)) + { + auto details = search_rit->second; + if (details != nullptr && details->store != nullptr) { - auto sig_it = supporting_signatures.lower_bound(new_seqno); - if (sig_it != supporting_signatures.end()) + auto proof = tree.get_proof(seqno); + details->transaction_id = {sig->view, seqno}; + details->receipt = std::make_shared( + sig->sig, + proof.get_root(), + proof.get_path(), + sig->node, + sig->cert, + details->entry_digest, + details->get_commit_evidence(), + details->claims_digest); + HISTORICAL_LOG( + "Assigned a sig for {} after given signature at {}", + seqno, + sig_details->transaction_id.to_str()); + + if (should_fill.has_value() && seqno == *should_fill) { - const auto& [sig_seqno, details] = *sig_it; - HISTORICAL_LOG( - "Considering a supporting signature for {} at {}", - new_seqno, - sig_seqno); - if (details->store != nullptr && details->is_signature) - { - const auto sig = get_signature(details->store); - ccf::MerkleTreeHistory tree( - get_tree(details->store).value()); - if (tree.in_range(new_seqno)) - { - auto proof = tree.get_proof(new_seqno); - new_details->transaction_id = {sig->view, new_seqno}; - new_details->receipt = std::make_shared( - sig->sig, - proof.get_root(), - proof.get_path(), - sig->node, - sig->cert, - new_details->entry_digest, - details->get_commit_evidence(), - new_details->claims_digest); - } - } + should_fill.reset(); } } - // If still have no receipt, after considering every larger value - // we have, and the best-guess at a supporting signature, then we - // may need to fetch another supporting signature. Request the - // first entry after the range - if ( - new_details->receipt == nullptr && - end_of_matching_range.has_value()) - { - HISTORICAL_LOG( - "Still nothing, better fetch {}", - end_of_matching_range.value() + 1); - return {end_of_matching_range.value() + 1}; - } + ++search_rit; + } + else + { + // Found a seqno which this signature doesn't cover. It can't + // cover anything else, so break here + break; } } } - return std::nullopt; + return !should_fill.has_value(); } }; @@ -500,11 +481,10 @@ namespace ccf::historical // Track all things currently requested by external callers std::map requests; - // Store each seqno that is currently being fetched by the host, to avoid - // spamming it with duplicate requests, and how long it has been fetched - // for. If this gap gets too large, we will log a warning and allow it to be - // re-fetched - std::unordered_map pending_fetches; + // A map containing (weak pointers to) _all_ of the stores for active + // requests, allowing distinct requests for the same seqnos to share the + // same underlying state (and benefit from faster lookup) + AllRequestedStores all_stores; ExpiryDuration default_expiry_duration = std::chrono::seconds(1800); @@ -515,43 +495,14 @@ namespace ccf::historical void fetch_entries_range(ccf::SeqNo from, ccf::SeqNo to) { - std::optional unfetched_from = std::nullopt; - std::optional unfetched_to = std::nullopt; - LOG_TRACE_FMT("fetch_entries_range({}, {})", from, to); - for (auto seqno = from; seqno <= to; ++seqno) - { - const auto ib = pending_fetches.try_emplace(seqno, 0); - if ( - // Newly requested fetch - ib.second || - // Fetch in-progress, but looks slow enough to retry - ib.first->second >= slow_fetch_threshold) - { - if (!unfetched_from.has_value()) - { - unfetched_from = seqno; - } - unfetched_to = seqno; - ib.first->second = std::chrono::milliseconds(0); - } - } - - if (unfetched_from.has_value()) - { - // Newly requested seqnos - LOG_TRACE_FMT( - "Writing to ringbuffer ledger_get_range({}, {})", - unfetched_from.value(), - unfetched_to.value()); - RINGBUFFER_WRITE_MESSAGE( - consensus::ledger_get_range, - to_host, - static_cast(unfetched_from.value()), - static_cast(unfetched_to.value()), - consensus::LedgerRequestPurpose::HistoricalQuery); - } + RINGBUFFER_WRITE_MESSAGE( + consensus::ledger_get_range, + to_host, + static_cast(from), + static_cast(to), + consensus::LedgerRequestPurpose::HistoricalQuery); } std::unique_ptr fetch_supporting_secret_if_needed( @@ -576,19 +527,29 @@ namespace ccf::historical const auto seqno_to_fetch = previous_secret_stored_version.value(); LOG_TRACE_FMT( "Requesting historical entry at {} but first known ledger " - "secret is applicable from {} - requesting older secret now", + "secret is applicable from {}", seqno, earliest_ledger_secret_seqno); - fetch_entry_at(seqno_to_fetch); + auto it = all_stores.find(seqno_to_fetch); + auto details = it == all_stores.end() ? nullptr : it->second.lock(); + if (details == nullptr) + { + LOG_TRACE_FMT("Requesting older secret at {} now", seqno_to_fetch); + details = std::make_shared(); + all_stores.insert_or_assign(it, seqno_to_fetch, details); + fetch_entry_at(seqno_to_fetch); + } + return std::make_unique( - seqno_to_fetch, earliest_ledger_secret); + seqno_to_fetch, earliest_ledger_secret, details); } return nullptr; } void process_deserialised_store( + const StoreDetailsPtr& details, const kv::StorePtr& store, const crypto::Sha256Hash& entry_digest, ccf::SeqNo seqno, @@ -596,6 +557,36 @@ namespace ccf::historical ccf::ClaimsDigest&& claims_digest, bool has_commit_evidence) { + // Deserialisation includes a GCM integrity check, so all entries + // have been verified by the time we get here. + details->current_stage = RequestStage::Trusted; + details->has_commit_evidence = has_commit_evidence; + + details->entry_digest = entry_digest; + if (!claims_digest.empty()) + details->claims_digest = std::move(claims_digest); + + CCF_ASSERT_FMT( + details->store == nullptr, + "Cache already has store for seqno {}", + seqno); + details->store = store; + + details->is_signature = is_signature; + if (is_signature) + { + // Construct a signature receipt. + // We do this whether it was requested or not, because we have all + // the state to do so already, and it's simpler than constructing + // the receipt _later_ for an already-fetched signature + // transaction. + const auto sig = get_signature(details->store); + assert(sig.has_value()); + details->transaction_id = {sig->view, sig->seqno}; + details->receipt = std::make_shared( + sig->sig, sig->root.h, nullptr, sig->node, sig->cert); + } + auto request_it = requests.begin(); while (request_it != requests.end()) { @@ -620,7 +611,7 @@ namespace ccf::historical } auto new_secret_fetch = - fetch_supporting_secret_if_needed(request.first_requested_seqno); + fetch_supporting_secret_if_needed(request.first_requested_seqno()); if (new_secret_fetch != nullptr) { request.ledger_secret_recovery_info = std::move(new_secret_fetch); @@ -628,12 +619,24 @@ namespace ccf::historical else { // Newly have all required secrets - begin fetching the actual - // entries - for (const auto& [first_requested_seqno, num_following] : - request.requested_seqnos.get_ranges()) + // entries. Note this is adding them to `all_stores`, from where + // they'll be requested on the next tick. + auto my_stores_it = request.my_stores.begin(); + while (my_stores_it != request.my_stores.end()) { - fetch_entries_range( - first_requested_seqno, first_requested_seqno + num_following); + auto [seqno, _] = *my_stores_it; + auto it = all_stores.find(seqno); + auto details = + it == all_stores.end() ? nullptr : it->second.lock(); + + if (details == nullptr) + { + details = std::make_shared(); + all_stores.insert_or_assign(it, seqno, details); + } + + my_stores_it->second = details; + ++my_stores_it; } } @@ -642,58 +645,19 @@ namespace ccf::historical continue; } - auto details = request.get_store_details(seqno); - if ( - details != nullptr && - details->current_stage == RequestStage::Fetching) + if (request.include_receipts) { - // Deserialisation includes a GCM integrity check, so all entries have - // been verified by the time we get here. - details->current_stage = RequestStage::Trusted; - details->has_commit_evidence = has_commit_evidence; - - details->entry_digest = entry_digest; - if (!claims_digest.empty()) - details->claims_digest = std::move(claims_digest); - - CCF_ASSERT_FMT( - details->store == nullptr, - "Request {} already has store for seqno {}", - handle, - seqno); - details->store = store; - - details->is_signature = is_signature; - if (is_signature) + const bool seqno_in_this_request = + (request.my_stores.find(seqno) != request.my_stores.end() || + request.supporting_signatures.find(seqno) != + request.supporting_signatures.end()); + if (seqno_in_this_request) { - // Construct a signature receipt. - // We do this whether it was requested or not, because we have all - // the state to do so already, and it's simpler than constructing - // the receipt _later_ for an already-fetched signature transaction. - const auto sig = get_signature(details->store); - assert(sig.has_value()); - details->transaction_id = {sig->view, sig->seqno}; - details->receipt = std::make_shared( - sig->sig, sig->root.h, nullptr, sig->node, sig->cert); + request.populate_receipts(seqno); } - - if (request.include_receipts) - { - const auto next_seqno = request.populate_receipts(seqno); - if (next_seqno.has_value()) - { - request.supporting_signatures.erase(seqno); - fetch_entry_at(*next_seqno); - request.supporting_signatures[*next_seqno] = - std::make_shared(); - } - ++request_it; - } - } - else - { - ++request_it; } + + ++request_it; } } @@ -765,21 +729,31 @@ namespace ccf::historical if (it == requests.end()) { // This is a new handle - insert a newly created Request for it - it = requests.emplace_hint(it, handle, Request()); + it = requests.emplace_hint(it, handle, Request(all_stores)); HISTORICAL_LOG("First time I've seen handle {}", handle); } Request& request = it->second; - // Update this Request to represent the currently requested ranges, - // returning any newly requested indices - auto new_seqnos = request.adjust_ranges(seqnos, include_receipts); + auto [earliest_ledger_secret_seqno, _] = + get_earliest_known_ledger_secret(); + + // Update this Request to represent the currently requested ranges + HISTORICAL_LOG( + "Adjusting handle {} to cover {} seqnos starting at {} " + "(include_receipts={})", + handle, + seqnos.size(), + *seqnos.begin(), + include_receipts); + request.adjust_ranges( + seqnos, include_receipts, earliest_ledger_secret_seqno); // If the earliest target entry cannot be deserialised with the earliest // known ledger secret, record the target seqno and begin fetching the // previous historical ledger secret. auto secret_fetch = - fetch_supporting_secret_if_needed(request.first_requested_seqno); + fetch_supporting_secret_if_needed(request.first_requested_seqno()); if (secret_fetch != nullptr) { if ( @@ -790,16 +764,6 @@ namespace ccf::historical request.ledger_secret_recovery_info = std::move(secret_fetch); } } - else - { - // If we have sufficiently early secrets, begin fetching any newly - // requested entries. If we don't fall into this branch, they'll only - // begin to be fetched once the secret arrives. - for (const auto& [start_seqno, additional] : new_seqnos.get_ranges()) - { - fetch_entries_range(start_seqno, start_seqno + additional); - } - } // Reset the expiry timer as this has just been requested request.time_to_expiry = ms_until_expiry; @@ -809,12 +773,9 @@ namespace ccf::historical for (auto seqno : seqnos) { auto target_details = request.get_store_details(seqno); - if (target_details == nullptr) - { - throw std::logic_error("Request isn't tracking state for seqno"); - } if ( + target_details != nullptr && target_details->current_stage == RequestStage::Trusted && (!request.include_receipts || target_details->receipt != nullptr)) { @@ -1004,6 +965,7 @@ namespace ccf::historical { std::lock_guard guard(requests_lock); const auto erased_count = requests.erase(handle); + HISTORICAL_LOG("Dropping historical request {}", handle); return erased_count > 0; } @@ -1015,15 +977,16 @@ namespace ccf::historical bool handle_ledger_entry(ccf::SeqNo seqno, const uint8_t* data, size_t size) { std::lock_guard guard(requests_lock); - const auto it = pending_fetches.find(seqno); - if (it == pending_fetches.end()) + const auto it = all_stores.find(seqno); + auto details = it == all_stores.end() ? nullptr : it->second.lock(); + if ( + details == nullptr || details->current_stage != RequestStage::Fetching) { - // Unexpected entry - ignore it? + // Unexpected entry, we already have it or weren't asking for it - + // ignore this resubmission return false; } - pending_fetches.erase(it); - kv::ApplyResult deserialise_result; ccf::ClaimsDigest claims_digest; bool has_commit_evidence; @@ -1085,6 +1048,7 @@ namespace ccf::historical (size_t)deserialise_result); const auto entry_digest = crypto::Sha256Hash({data, size}); process_deserialised_store( + details, store, entry_digest, seqno, @@ -1123,9 +1087,16 @@ namespace ccf::historical ++seqno; } - CCF_ASSERT_FMT( - seqno == to_seqno + 1, - "Ledger entry range doesn't contain claimed entries"); + if (seqno != to_seqno + 1) + { + LOG_FAIL_FMT( + "Claimed ledger entries: [{}, {}), actual [{}, {}]", + from_seqno, + to_seqno, + from_seqno, + seqno); + } + return all_accepted; } @@ -1146,12 +1117,12 @@ namespace ccf::historical // forget about it and drop any requests which were looking for it - // don't have a mechanism for remembering this failure and reporting it // to users. - const auto fetches_it = pending_fetches.find(seqno); - if (fetches_it != pending_fetches.end()) + const auto fetches_it = all_stores.find(seqno); + if (fetches_it != all_stores.end()) { delete_all_interested_requests(seqno); - pending_fetches.erase(fetches_it); + all_stores.erase(fetches_it); } } } @@ -1228,33 +1199,78 @@ namespace ccf::historical void tick(const std::chrono::milliseconds& elapsed_ms) { std::lock_guard guard(requests_lock); - auto it = requests.begin(); - while (it != requests.end()) + { - auto& request = it->second; - if (elapsed_ms >= request.time_to_expiry) - { - it = requests.erase(it); - } - else + auto it = requests.begin(); + while (it != requests.end()) { - request.time_to_expiry -= elapsed_ms; - ++it; + auto& request = it->second; + if (elapsed_ms >= request.time_to_expiry) + { + LOG_DEBUG_FMT( + "Dropping expired historical query with handle {}", it->first); + it = requests.erase(it); + } + else + { + request.time_to_expiry -= elapsed_ms; + ++it; + } } } - for (auto& [seqno, time_since_sent] : pending_fetches) { - const auto time_after = time_since_sent + elapsed_ms; - // Log once, when we cross the time threshold - if ( - time_since_sent < slow_fetch_threshold && - time_after >= slow_fetch_threshold) + auto it = all_stores.begin(); + std::optional> range_to_request = + std::nullopt; + while (it != all_stores.end()) { - LOG_FAIL_FMT( - "Fetch for seqno {} is taking an unusually long time", seqno); + auto details = it->second.lock(); + if (details == nullptr) + { + it = all_stores.erase(it); + } + else + { + if (details->current_stage == RequestStage::Fetching) + { + details->time_until_fetch -= elapsed_ms; + if (details->time_until_fetch.count() <= 0) + { + details->time_until_fetch = slow_fetch_threshold; + + const auto seqno = it->first; + if ( + range_to_request.has_value() && + range_to_request->second + 1 == seqno) + { + range_to_request->second = seqno; + } + else + { + if (range_to_request.has_value()) + { + // Submit fetch for previously tracked range + fetch_entries_range( + range_to_request->first, range_to_request->second); + } + + // Track new range + range_to_request = std::make_pair(seqno, seqno); + } + } + } + + ++it; + } + } + + if (range_to_request.has_value()) + { + // Submit fetch for final tracked range + fetch_entries_range( + range_to_request->first, range_to_request->second); } - time_since_sent = time_after; } } }; diff --git a/src/node/test/historical_queries.cpp b/src/node/test/historical_queries.cpp index 8635ab64a0e0..5803298b5276 100644 --- a/src/node/test/historical_queries.cpp +++ b/src/node/test/historical_queries.cpp @@ -277,45 +277,15 @@ TEST_CASE("StateCache point queries") } { + INFO("No stores are requested until tick occurs"); + REQUIRE(stub_writer->writes.empty()); + + cache.tick(std::chrono::milliseconds(0)); + + REQUIRE(!stub_writer->writes.empty()); INFO( "If too much time passes without providing request seqnos, failures are " "logged"); - - size_t fails_count = 0; - - struct FailCountingStub : public logger::AbstractLogger - { - size_t& fails_count; - - FailCountingStub(size_t& fc) : fails_count(fc) {} - - void write( - const logger::LogLine& ll, - const std::optional& enclave_offset = std::nullopt) override - { - if (ll.log_level == logger::FAIL) - { - ++fails_count; - } - } - }; - - logger::config::loggers().emplace_back( - std::make_unique(fails_count)); - - // Less time than threshold has elapsed => No failed logging - cache.tick(ccf::historical::slow_fetch_threshold / 2); - REQUIRE(fails_count == 0); - - // Threshold elapsed => Failed logging - cache.tick(ccf::historical::slow_fetch_threshold / 2); - REQUIRE(fails_count == 3); - - // Much more time passes => No more logging - cache.tick(ccf::historical::slow_fetch_threshold * 4); - REQUIRE(fails_count == 3); - - logger::config::loggers().clear(); } { @@ -344,13 +314,14 @@ TEST_CASE("StateCache point queries") } { - INFO("Once sufficient time has passed, re-requesting will re-fetch"); + INFO( + "Once sufficient time passed, requested but unprovided entries will be " + "re-requested"); + REQUIRE(stub_writer->writes.empty()); cache.tick(ccf::historical::slow_fetch_threshold); - REQUIRE(cache.get_state_at(default_handle, low_seqno) == nullptr); - REQUIRE(cache.get_state_at(default_handle, unsigned_seqno) == nullptr); - REQUIRE(!stub_writer->writes.empty()); + const auto& write = stub_writer->writes[0]; const uint8_t* data = write.contents.data(); size_t size = write.contents.size(); @@ -373,8 +344,8 @@ TEST_CASE("StateCache point queries") { INFO("Cache doesn't accept arbitrary entries"); - REQUIRE(!provide_ledger_entry(high_seqno - 1)); - REQUIRE(!provide_ledger_entry(high_seqno + 1)); + REQUIRE_FALSE(provide_ledger_entry(high_seqno - 1)); + REQUIRE_FALSE(provide_ledger_entry(high_seqno + 1)); } { @@ -382,6 +353,11 @@ TEST_CASE("StateCache point queries") "Cache accepts requested entries, and then range of supporting entries"); REQUIRE(provide_ledger_entry(high_seqno)); + { + INFO("Cache doesn't accept repeat submissions for known entries"); + REQUIRE_FALSE(provide_ledger_entry(high_seqno)); + } + // Count up to next signature for (size_t i = high_seqno + 1; i < high_signature_transaction; ++i) { @@ -430,13 +406,19 @@ TEST_CASE("StateCache point queries") } { + // Drop state so it doesn't interfere with next test + cache.drop_cached_states(low_handle); + cache.drop_cached_states(high_handle); + INFO(fmt::format( "Signature transactions can be requested between {} and {}", low_signature_transaction, high_signature_transaction)); for (const auto i : {low_signature_transaction, high_signature_transaction}) { + INFO(fmt::format("Requesting signature at {}", i)); auto state_at_seqno = cache.get_state_at(default_handle, i); + REQUIRE(state_at_seqno == nullptr); REQUIRE(provide_ledger_entry(i)); @@ -464,17 +446,19 @@ TEST_CASE("StateCache point queries") const auto state = cache.get_state_at(default_handle, high_signature_transaction); REQUIRE(state == nullptr); + cache.drop_cached_states(default_handle); } { INFO("Handles are dropped automatically after their expiry duration"); + const auto high_handle_expiry_time = std::chrono::seconds(30); // Initial requests - low uses default expiry while high gets custom // expiry cache.set_default_expiry_duration(std::chrono::seconds(60)); cache.get_state_at(low_handle, low_signature_transaction); cache.get_state_at( - high_handle, high_signature_transaction, std::chrono::seconds(30)); + high_handle, high_signature_transaction, high_handle_expiry_time); REQUIRE(provide_ledger_entry(low_signature_transaction)); REQUIRE(provide_ledger_entry(high_signature_transaction)); @@ -487,7 +471,7 @@ TEST_CASE("StateCache point queries") cache.get_state_at(low_handle, low_signature_transaction) != nullptr); REQUIRE( cache.get_state_at( - high_handle, high_signature_transaction, std::chrono::seconds(30)) != + high_handle, high_signature_transaction, high_handle_expiry_time) != nullptr); // Some time passes, but not enough for either expiry @@ -496,7 +480,7 @@ TEST_CASE("StateCache point queries") cache.get_state_at(low_handle, low_signature_transaction) != nullptr); REQUIRE( cache.get_state_at( - high_handle, high_signature_transaction, std::chrono::seconds(30)) != + high_handle, high_signature_transaction, high_handle_expiry_time) != nullptr); // More time passes, and one request expires @@ -505,7 +489,7 @@ TEST_CASE("StateCache point queries") cache.get_state_at(low_handle, low_signature_transaction) != nullptr); REQUIRE( cache.get_state_at( - high_handle, high_signature_transaction, std::chrono::seconds(30)) == + high_handle, high_signature_transaction, high_handle_expiry_time) == nullptr); // More time passes, and both requests expire @@ -514,7 +498,7 @@ TEST_CASE("StateCache point queries") cache.get_state_at(low_handle, low_signature_transaction) == nullptr); REQUIRE( cache.get_state_at( - high_handle, high_signature_transaction, std::chrono::seconds(30)) == + high_handle, high_signature_transaction, high_handle_expiry_time) == nullptr); } } @@ -529,7 +513,7 @@ TEST_CASE("StateCache get store vs get state") { INFO("Build some interesting state in the store"); - signature_transaction = write_transactions_and_signature(kv_store, 20); + signature_transaction = write_transactions_and_signature(kv_store, 8); REQUIRE(kv_store.current_version() == signature_transaction); } @@ -777,11 +761,10 @@ TEST_CASE("StateCache range queries") std::random_device rd; std::mt19937 g(rd()); - auto next_handle = 0; auto fetch_and_validate_range = [&]( kv::Version range_start, kv::Version range_end) { - const auto this_handle = next_handle++; + constexpr auto this_handle = 42; { auto stores = cache.get_store_range(this_handle, range_start, range_end); REQUIRE(stores.empty()); @@ -823,6 +806,9 @@ TEST_CASE("StateCache range queries") } } } + + // Drop state to allow fresh test next time + cache.drop_cached_states(this_handle); }; { @@ -850,6 +836,113 @@ TEST_CASE("StateCache range queries") } } +TEST_CASE("Incremental progress") +{ + const auto seed = time(NULL); + INFO("Using seed: ", seed); + srand(seed); + + // If host takes multiple attempts to fulfill the range (eg - because the + // entries are too large for a single write), then collaborative retries will + // make eventual progress + auto state = create_and_init_state(); + auto& kv_store = *state.kv_store; + + std::vector signature_versions; + + const auto begin_seqno = kv_store.current_version() + 1; + + { + INFO("Build some interesting state in the store"); + for (size_t batch_size : + {rand() % 10 + 1, rand() % 10 + 1, rand() % 10 + 1, rand() % 10 + 1}) + { + signature_versions.push_back( + write_transactions_and_signature(kv_store, batch_size)); + } + } + + const auto end_seqno = kv_store.current_version(); + + auto stub_writer = std::make_shared(); + ccf::historical::StateCache cache( + kv_store, state.ledger_secrets, stub_writer); + auto ledger = construct_host_ledger(state.kv_store->get_consensus()); + + auto provide_ledger_entries = [&](size_t from, size_t to) { + std::vector combined; + for (auto seqno = from; seqno <= to; ++seqno) + { + const auto it = ledger.find(seqno); + REQUIRE(it != ledger.end()); + combined.insert(combined.end(), it->second.begin(), it->second.end()); + } + bool accepted = cache.handle_ledger_entries(from, to, combined); + return accepted; + }; + + auto require_single_read_request = [&](size_t from, size_t to) { + REQUIRE(stub_writer->writes.size() == 1); + const auto& write = stub_writer->writes[0]; + const uint8_t* data = write.contents.data(); + size_t size = write.contents.size(); + REQUIRE(write.m == consensus::ledger_get_range); + auto [from_seqno_, to_seqno_, purpose_] = + ringbuffer::read_message(data, size); + auto& purpose = purpose_; + auto& from_seqno = from_seqno_; + auto& to_seqno = to_seqno_; + REQUIRE(purpose == consensus::LedgerRequestPurpose::HistoricalQuery); + REQUIRE(from_seqno == from); + REQUIRE(to_seqno == to); + + stub_writer->writes.clear(); + }; + + constexpr auto this_handle = 0; + + { + auto stores = cache.get_store_range(this_handle, begin_seqno, end_seqno); + REQUIRE(stores.empty()); + + cache.tick({}); + require_single_read_request(begin_seqno, end_seqno); + } + + auto sub_range_begin = begin_seqno; + while (true) + { + auto sub_range_end = + std::min(sub_range_begin + 1 + (rand() % 8), end_seqno); + + REQUIRE(provide_ledger_entries(sub_range_begin, sub_range_end)); + + auto stores = cache.get_store_range(this_handle, begin_seqno, end_seqno); + if (sub_range_end == end_seqno) + { + REQUIRE(!stores.empty()); + break; + } + + // Stores aren't returned as complete range is not yet available + REQUIRE(stores.empty()); + + // Additional request has not yet caused re-request to host + REQUIRE(stub_writer->writes.empty()); + + // When enough time has passed, another query will result in a re-request + // for the still-pending range + cache.tick(ccf::historical::slow_fetch_threshold); + stores = cache.get_store_range(this_handle, begin_seqno, end_seqno); + REQUIRE(stores.empty()); + + const auto next_sub_range_begin = sub_range_end + 1; + require_single_read_request(next_sub_range_begin, end_seqno); + + sub_range_begin = next_sub_range_begin; + } +} + TEST_CASE("StateCache sparse queries") { auto state = create_and_init_state(); @@ -896,9 +989,8 @@ TEST_CASE("StateCache sparse queries") std::random_device rd; std::mt19937 g(rd()); - auto next_handle = 0; auto fetch_and_validate_sparse_set = [&](const ccf::SeqNoCollection& seqnos) { - const auto this_handle = next_handle++; + constexpr auto this_handle = 42; { auto stores = cache.get_stores_for(this_handle, seqnos); REQUIRE(stores.empty()); @@ -943,6 +1035,8 @@ TEST_CASE("StateCache sparse queries") } } } + + cache.drop_cached_states(this_handle); }; { @@ -1057,6 +1151,7 @@ TEST_CASE("StateCache concurrent access") } } + cache.tick(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(1)); } }); @@ -1065,7 +1160,7 @@ TEST_CASE("StateCache concurrent access") using Clock = std::chrono::system_clock; // Add a watchdog timeout. Even in Debug+SAN this entire test takes <3 secs, - // so 10 seconds for any single entry is surely deadlock + // so taking this long for any single entry is surely deadlock const auto too_long = std::chrono::seconds(3); auto fetch_until_timeout = [&]( @@ -1328,6 +1423,7 @@ TEST_CASE("StateCache concurrent access") // Explicitly test some problematic cases { + INFO("Problem case 1"); std::vector previously_requested; const auto i = 0; const auto handle = 42; @@ -1344,6 +1440,7 @@ TEST_CASE("StateCache concurrent access") query_random_sparse_set_states(seqnos, handle, error_printer); } { + INFO("Problem case 2"); std::vector previously_requested; const auto i = 0; const auto handle = 42; @@ -1356,6 +1453,7 @@ TEST_CASE("StateCache concurrent access") query_random_range_states(14, 17, handle, error_printer); } { + INFO("Problem case 3"); std::vector previously_requested; const auto i = 0; const auto handle = 42; @@ -1376,6 +1474,7 @@ TEST_CASE("StateCache concurrent access") query_random_sparse_set_states(seqnos, handle, error_printer); } { + INFO("Problem case 4"); std::vector previously_requested; const auto i = 0; const auto handle = 42; @@ -1413,6 +1512,7 @@ TEST_CASE("StateCache concurrent access") } } { + INFO("Problem case 6"); std::vector previously_requested; const auto i = 0; const auto handle = 42; diff --git a/tests/e2e_logging.py b/tests/e2e_logging.py index 61deea39437b..c32c66ca011e 100644 --- a/tests/e2e_logging.py +++ b/tests/e2e_logging.py @@ -862,7 +862,11 @@ def id_for(i): idx = id_for(i) network.txs.issue( - network, repeat=True, idx=idx, wait_for_sync=False, log_capture=[] + network, + repeat=True, + idx=idx, + wait_for_sync=False, + log_capture=[], ) _, tx = network.txs.get_last_tx(idx=idx, priv=False) msg = tx["msg"] diff --git a/tests/infra/logging_app.py b/tests/infra/logging_app.py index a928b7504b2a..c5bbe88ebb2d 100644 --- a/tests/infra/logging_app.py +++ b/tests/infra/logging_app.py @@ -207,7 +207,7 @@ def verify_range_for_idx( self, idx, node=None, - timeout=5, + timeout=10, log_capture=None, from_seqno=None, to_seqno=None,