Skip to content

Commit

Permalink
[release/3.x] Cherry pick: Historical queries fixes (#5026, #5040, #5058
Browse files Browse the repository at this point in the history
) (#5033)
  • Loading branch information
eddyashton authored Feb 27, 2023
1 parent e7dc04c commit 0ffb5a5
Show file tree
Hide file tree
Showing 9 changed files with 712 additions and 529 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [3.0.8]

### Fixed

- Fixed a bug where historical query fetches could stall when requesting a range of large ledger entries (#5026, #5058).

## [3.0.7]

[3.0.7]: https://github.com/microsoft/CCF/releases/tag/ccf-3.0.7
Expand Down
119 changes: 62 additions & 57 deletions src/host/ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ namespace asynchost
return match;
}

struct LedgerReadResult
{
std::vector<uint8_t> data;
size_t end_idx;
};

class LedgerFile
{
private:
Expand Down Expand Up @@ -378,6 +384,13 @@ namespace asynchost
if (from == to)
{
// Request one entry that is too large: no entries are found
LOG_TRACE_FMT(
"Single ledger entry at {} in file {} is too large for remaining "
"space (size {} > max {})",
from,
file_name,
size,
max_size.value());
return {0, 0};
}
size_t to_ = from + (to - from) / 2;
Expand All @@ -397,7 +410,7 @@ namespace asynchost
return {size, to};
}

std::optional<std::pair<std::vector<uint8_t>, size_t>> read_entries(
std::optional<LedgerReadResult> read_entries(
size_t from, size_t to, std::optional<size_t> max_size = std::nullopt)
{
if ((from < start_idx) || (to > get_last_idx()) || (to < from))
Expand Down Expand Up @@ -428,7 +441,7 @@ namespace asynchost
file_name));
}

return std::make_pair(entries, to_);
return LedgerReadResult{entries, to_};
}

bool truncate(size_t idx)
Expand Down Expand Up @@ -762,11 +775,10 @@ namespace asynchost
return files.back();
}

std::optional<std::vector<uint8_t>> read_entries_range(
std::optional<LedgerReadResult> read_entries_range(
size_t from,
size_t to,
bool read_cache_only = false,
bool strict = true,
std::optional<size_t> max_entries_size = std::nullopt)
{
// Note: if max_entries_size is set, this returns contiguous ledger
Expand All @@ -777,20 +789,17 @@ namespace asynchost
return std::nullopt;
}

// If non-strict, return as many entries as possible
// During recovery or other low-knowledge batch operations, we might
// request entries past the end of the ledger - truncate to the true end
// here.
if (to > last_idx)
{
if (strict)
{
return std::nullopt;
}
else
{
to = last_idx;
}
to = last_idx;
}

std::vector<uint8_t> entries = {};
LedgerReadResult rr;
rr.end_idx = to;

size_t idx = from;
while (idx <= to)
{
Expand All @@ -804,19 +813,19 @@ namespace asynchost
std::optional<size_t> max_size = std::nullopt;
if (max_entries_size.has_value())
{
max_size = max_entries_size.value() - entries.size();
max_size = max_entries_size.value() - rr.data.size();
}
auto v = f_from->read_entries(idx, to_, max_size);
if (!v.has_value())
{
return std::nullopt;
break;
}
auto& [e, to_read] = v.value();
entries.insert(
entries.end(),
std::make_move_iterator(e.begin()),
std::make_move_iterator(e.end()));
if (to_read != to_)
rr.end_idx = v->end_idx;
rr.data.insert(
rr.data.end(),
std::make_move_iterator(v->data.begin()),
std::make_move_iterator(v->data.end()));
if (v->end_idx != to_)
{
// If all the entries requested from a file are not returned (i.e.
// because the requested entries are larger than max_entries_size),
Expand All @@ -827,7 +836,14 @@ namespace asynchost
idx = to_ + 1;
}

return entries;
if (!rr.data.empty())
{
return rr;
}
else
{
return std::nullopt;
}
}

void ignore_ledger_file(const std::string& file_name)
Expand Down Expand Up @@ -1113,24 +1129,23 @@ namespace asynchost
recovery_start_idx = idx;
}

std::optional<std::vector<uint8_t>> read_entry(size_t idx)
std::optional<LedgerReadResult> read_entry(size_t idx)
{
TimeBoundLogger log_if_slow(
fmt::format("Reading ledger entry at {}", idx));

return read_entries_range(idx, idx);
}

std::optional<std::vector<uint8_t>> read_entries(
std::optional<LedgerReadResult> read_entries(
size_t from,
size_t to,
bool strict = true,
std::optional<size_t> max_entries_size = std::nullopt)
{
TimeBoundLogger log_if_slow(
fmt::format("Reading ledger entries from {} to {}", from, to));

return read_entries_range(from, to, false, strict, max_entries_size);
return read_entries_range(from, to, false, max_entries_size);
}

size_t write_entry(const uint8_t* data, size_t size, bool committable)
Expand Down Expand Up @@ -1293,30 +1308,31 @@ namespace asynchost
Ledger* ledger;
size_t from_idx;
size_t to_idx;
size_t max_size;

// First argument is ledger entries (or nullopt if not found)
// Second argument is uv status code, which may indicate a cancellation
using ResultCallback =
std::function<void(std::optional<std::vector<uint8_t>>&&, int)>;
std::function<void(std::optional<LedgerReadResult>&&, int)>;
ResultCallback result_cb;

// Final result
std::optional<std::vector<uint8_t>> entries = std::nullopt;
std::optional<LedgerReadResult> read_result = std::nullopt;
};

static void on_ledger_get_async(uv_work_t* req)
{
auto data = static_cast<AsyncLedgerGet*>(req->data);

data->entries =
data->ledger->read_entries_range(data->from_idx, data->to_idx, true);
data->read_result = data->ledger->read_entries_range(
data->from_idx, data->to_idx, true, data->max_size);
}

static void on_ledger_get_async_complete(uv_work_t* req, int status)
{
auto data = static_cast<AsyncLedgerGet*>(req->data);

data->result_cb(std::move(data->entries), status);
data->result_cb(std::move(data->read_result), status);

delete data;
delete req;
Expand All @@ -1325,18 +1341,18 @@ namespace asynchost
void write_ledger_get_range_response(
size_t from_idx,
size_t to_idx,
std::optional<std::vector<uint8_t>>&& entries,
std::optional<LedgerReadResult>&& read_result,
consensus::LedgerRequestPurpose purpose)
{
if (entries.has_value())
if (read_result.has_value())
{
RINGBUFFER_WRITE_MESSAGE(
consensus::ledger_entry_range,
to_enclave,
from_idx,
to_idx,
read_result->end_idx,
purpose,
entries.value());
read_result->data);
}
else
{
Expand Down Expand Up @@ -1401,11 +1417,6 @@ namespace asynchost
auto [from_idx, to_idx, purpose] =
ringbuffer::read_message<consensus::ledger_get_range>(data, size);

// Recovery reads ledger in fixed-size batches until it reaches the
// end of the ledger. When the end of the ledger is reached, we return
// as many entries as possible including the very last one.
bool strict = purpose != consensus::LedgerRequestPurpose::Recovery;

// Ledger entries response has metadata so cap total entries size
// accordingly
constexpr size_t write_ledger_range_response_metadata_size = 2048;
Expand All @@ -1423,21 +1434,15 @@ namespace asynchost
job->ledger = this;
job->from_idx = from_idx;
job->to_idx = to_idx;
job->result_cb = [this,
from_idx = from_idx,
to_idx = to_idx,
purpose = purpose,
strict = strict,
max_entries_size =
max_entries_size](auto&& entry, int status) {
// NB: Even if status is cancelled (and entry is empty), we
// want to write this result back to the enclave
write_ledger_get_range_response(
from_idx,
to_idx,
read_entries(from_idx, to_idx, strict, max_entries_size),
purpose);
};
job->max_size = max_entries_size;
job->result_cb =
[this, from_idx = from_idx, to_idx = to_idx, purpose = purpose](
auto&& read_result, int status) {
// NB: Even if status is cancelled (and entry is empty), we
// want to write this result back to the enclave
write_ledger_get_range_response(
from_idx, to_idx, std::move(read_result), purpose);
};

work_handle->data = job;
}
Expand All @@ -1455,7 +1460,7 @@ namespace asynchost
write_ledger_get_range_response(
from_idx,
to_idx,
read_entries(from_idx, to_idx, strict, max_entries_size),
read_entries(from_idx, to_idx, max_entries_size),
purpose);
}
});
Expand Down
41 changes: 33 additions & 8 deletions src/host/node_connections.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,42 @@ namespace asynchost

// Find the total frame size, and write it along with the header.
uint32_t frame = (uint32_t)size_to_send;
std::optional<std::vector<uint8_t>> framed_entries = std::nullopt;

framed_entries = ledger.read_entries(ae.prev_idx + 1, ae.idx);
if (framed_entries.has_value())
if (ae.idx > ae.prev_idx)
{
frame += (uint32_t)framed_entries->size();
outbound_connection->write(sizeof(uint32_t), (uint8_t*)&frame);
outbound_connection->write(size_to_send, data_to_send);
std::optional<asynchost::LedgerReadResult> read_result =
ledger.read_entries(ae.prev_idx + 1, ae.idx);

frame = (uint32_t)framed_entries->size();
outbound_connection->write(frame, framed_entries->data());
if (!read_result.has_value())
{
LOG_FAIL_FMT(
"Unable to send AppendEntries ({}, {}]: Ledger read failed",
ae.prev_idx,
ae.idx);
return;
}
else if (ae.idx != read_result->end_idx)
{
// NB: This should never happen since we do not pass a max_size
// to read_entries
LOG_FAIL_FMT(
"Unable to send AppendEntries ({}, {}]: Ledger read returned "
"entries to {}",
ae.prev_idx,
ae.idx,
read_result->end_idx);
return;
}
else
{
const auto& framed_entries = read_result->data;
frame += (uint32_t)framed_entries.size();
outbound_connection->write(sizeof(uint32_t), (uint8_t*)&frame);
outbound_connection->write(size_to_send, data_to_send);

outbound_connection->write(
framed_entries.size(), framed_entries.data());
}
}
else
{
Expand Down
Loading

0 comments on commit 0ffb5a5

Please sign in to comment.