Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return errors when session consistency would be broken #4351

Closed
Closed
1 change: 1 addition & 0 deletions include/ccf/odata_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ namespace ccf
ERROR(PrimaryNotFound)
ERROR(RequestAlreadyForwarded)
ERROR(NodeNotRetiredCommitted)
ERROR(SessionConsistencyLost)

// node-to-node (/join and /create):
ERROR(ConsensusTypeMismatch)
Expand Down
2 changes: 2 additions & 0 deletions include/ccf/rpc_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace ccf
// Only set in the case of a forwarded RPC
bool is_forwarded = false;

std::optional<ccf::TxID> last_tx_id = std::nullopt;

SessionContext(
size_t client_session_id_,
const std::vector<uint8_t>& caller_cert_,
Expand Down
3 changes: 3 additions & 0 deletions src/enclave/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache 2.0 License.
#pragma once

#include <span>
#include <sys/socket.h>
#include <vector>

Expand All @@ -14,5 +15,7 @@ namespace ccf

virtual void recv(const uint8_t* data, size_t size, sockaddr) = 0;
virtual void send(std::vector<uint8_t>&& data, sockaddr) = 0;

virtual void record_response_txid(std::span<const uint8_t> raw_response) {}
};
}
1 change: 1 addition & 0 deletions src/enclave/rpc_sessions.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ namespace ccf

LOG_DEBUG_FMT("Replying to session {}", id);

search->second.second->record_response_txid({data.data(), data.size()});
search->second.second->send(std::move(data), {});
return true;
}
Expand Down
55 changes: 50 additions & 5 deletions src/http/http_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ namespace http
tls::ConnID session_id;
ccf::ListenInterfaceID interface_id;

void update_session_ctx()
{
if (session_ctx == nullptr)
{
session_ctx = std::make_shared<ccf::SessionContext>(
session_id, peer_cert(), interface_id);
}
}

public:
HTTPServerEndpoint(
std::shared_ptr<ccf::RPCMap> rpc_map,
Expand All @@ -174,6 +183,46 @@ namespace http
send_raw(std::move(data));
}

void record_response_txid(std::span<const uint8_t> raw_response) override
{
// To avoid a full HTTP parse, search for the desired header directly
std::string_view response(
(char const*)raw_response.data(), raw_response.size());
std::string_view target(http::headers::CCF_TX_ID);
auto header_begin = std::search(
response.begin(), response.end(), target.begin(), target.end());
auto header_name_end = std::find(header_begin, response.end(), ':');
auto header_value_end = std::find(header_name_end, response.end(), '\r');
Copy link
Member

@achamayou achamayou Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems unfortunate at best, and possibly insecure at worst if it's possible to add http::headers::CCF_TX_ID inline as a header value, followed by a bogus value. We should at least look for "\r{}", http::headers::CCF_TX_ID, I think.

x-my-evil-header: x-ms-ccf-transaction-id: 9.99\r
x-ms-ccf-transaction-id: 2.1


if (header_value_end == response.end())
{
// Response has no TxID header value - this is valid
return;
}

auto after_name_end = std::next(header_name_end);
std::string_view header_value(
after_name_end, header_value_end - after_name_end);
const auto leading_spaces = header_value.find_first_not_of(' ');
if (leading_spaces != std::string::npos)
{
header_value.remove_prefix(leading_spaces);
}

auto tx_id = ccf::TxID::from_str(header_value);
if (!tx_id.has_value())
{
LOG_FAIL_FMT(
"Error parsing TxID from response header: {}",
std::string_view(header_begin, header_value_end - header_begin));
return;
}

update_session_ctx();

session_ctx->last_tx_id = tx_id;
}

void handle_request(
llhttp_method verb,
const std::string_view& url,
Expand All @@ -189,11 +238,7 @@ namespace http

try
{
if (session_ctx == nullptr)
{
session_ctx = std::make_shared<ccf::SessionContext>(
session_id, peer_cert(), interface_id);
}
update_session_ctx();

std::shared_ptr<http::HttpRpcContext> rpc_ctx = nullptr;
try
Expand Down
70 changes: 70 additions & 0 deletions src/node/rpc/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,51 @@ namespace ccf
return true;
}

struct SessionConsistencyConflict
{
ccf::TxID session_tx_id;
ccf::TxID commit_tx_id;
ccf::TxStatus status;
};

std::optional<SessionConsistencyConflict> check_session_consistency(
std::shared_ptr<ccf::SessionContext> session_ctx)
{
if (session_ctx->last_tx_id.has_value())
{
if (consensus == nullptr)
{
LOG_FAIL_FMT("consensus deleted after TxID assigned");
return std::nullopt;
}

auto tx_id = session_ctx->last_tx_id.value();

const auto tx_view = consensus->get_view(tx_id.seqno);
const auto committed_seqno = consensus->get_committed_seqno();
const auto committed_view = consensus->get_view(committed_seqno);

auto tx_status = ccf::evaluate_tx_status(
tx_id.view, tx_id.seqno, tx_view, committed_view, committed_seqno);
// Pending and Committed obviously retain session consistency. It's also
// possible for the status to be Unknown - the last_tx_id was populated
// from a forwarded response about state this node doesn't yet know.
// Only once this Tx is known to be invalidated do we report that
// session consistency has been lost.
if (tx_status == ccf::TxStatus::Invalid)
{
SessionConsistencyConflict scc;
scc.session_tx_id = tx_id;
scc.commit_tx_id.view = committed_view;
scc.commit_tx_id.seqno = committed_seqno;
scc.status = tx_status;
return scc;
}
}

return std::nullopt;
}

std::unique_ptr<AuthnIdentity> get_authenticated_identity(
std::shared_ptr<ccf::RpcContextImpl> ctx,
kv::CommittableTx& tx,
Expand Down Expand Up @@ -409,6 +454,30 @@ namespace ccf

endpoints.execute_endpoint(endpoint, args);

{
// NB: Do this check after execution, to ensure this transaction has
// claimed a read version if it is going to. If we do it any
// earlier, it is possible that we report that the previous TxID is
// still valid, but invalidate it between that point and when this
// transaction executes.
const auto scc =
check_session_consistency(ctx->get_session_context());
if (scc.has_value())
{
ctx->set_error(
HTTP_STATUS_INTERNAL_SERVER_ERROR,
ccf::errors::SessionConsistencyLost,
fmt::format(
"Previously reported transaction ID {} on this session, but "
"this node is now at {}, making the previous transaction ID "
"{}. Please start a new TLS session.",
scc->session_tx_id.to_str(),
scc->commit_tx_id.to_str(),
tx_status_to_str(scc->status)));
return;
}
}

if (!ctx->should_apply_writes())
{
update_metrics(ctx);
Expand Down Expand Up @@ -464,6 +533,7 @@ namespace ccf
auto tx_id = tx.get_txid();
if (tx_id.has_value() && consensus != nullptr)
{
ctx->get_session_context()->last_tx_id = tx_id;
try
{
// Only transactions that acquired one or more map handles
Expand Down
20 changes: 20 additions & 0 deletions tests/infra/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,26 @@ def find_primary_and_any_backup(self, timeout=3):
backup = random.choice(backups)
return primary, backup

def resize(self, target_count, args):
node_count = len(self.get_joined_nodes())
initial_node_count = node_count
LOG.info(f"Resizing network from {initial_node_count} to {target_count} nodes")
while node_count < target_count:
new_node = self.create_node("local://localhost")
self.join_node(new_node, args.package, args)
self.trust_node(new_node, args)
node_count += 1
while node_count > target_count:
primary, backup = self.find_primary_and_any_backup()
self.retire_node(primary, backup)
node_count -= 1
primary, _ = self.find_primary()
self.wait_for_all_nodes_to_commit(primary)
LOG.success(
f"Resized network from {initial_node_count} to {target_count} nodes"
)
return initial_node_count

def wait_for_all_nodes_to_commit(self, primary=None, tx_id=None, timeout=10):
"""
Wait for all nodes to have joined the network and committed all transactions
Expand Down
9 changes: 6 additions & 3 deletions tests/infra/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ def client(
signing_identity=None,
interface_name=infra.interfaces.PRIMARY_RPC_INTERFACE,
verify_ca=True,
description=None,
**kwargs,
):
if self.network_state == NodeNetworkState.stopped:
Expand Down Expand Up @@ -627,9 +628,11 @@ def client(
akwargs["http2"] = True
akwargs.update(self.session_auth(identity))
akwargs.update(self.signing_auth(signing_identity))
akwargs[
"description"
] = f"[{self.local_node_id}|{identity or ''}|{signing_identity or ''}]"
description = (
description
or f"{self.local_node_id}|{identity or ''}|{signing_identity or ''}"
)
akwargs["description"] = f"[{description}]"
akwargs.update(kwargs)

if self.curl:
Expand Down
Loading