Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move channel manager into node state #2780

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions edl/ccf.edl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ enclave {
from "openenclave/edl/time.edl" import *;

include "common/enclave_interface_types.h"
include "enclave/consensus_type.h"

trusted {

Expand All @@ -24,7 +23,6 @@ enclave {
size_t enclave_version_size,
[out] size_t* enclave_version_len,
StartType start_type,
ConsensusType consensus_type,
size_t num_worker_thread,
[user_check] void* time_location,
);
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/consensus_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace consensus
{
struct Configuration
{
ConsensusType consensus_type;
size_t raft_request_timeout;
size_t raft_election_timeout;
size_t bft_view_change_timeout;
Expand All @@ -18,6 +19,7 @@ namespace consensus
DECLARE_JSON_TYPE(Configuration);
DECLARE_JSON_REQUIRED_FIELDS(
Configuration,
consensus_type,
raft_request_timeout,
raft_election_timeout,
bft_view_change_timeout,
Expand Down
3 changes: 0 additions & 3 deletions src/enclave/ccf_v.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ extern "C"
size_t,
size_t*,
StartType,
ConsensusType,
size_t,
void*);

Expand Down Expand Up @@ -121,7 +120,6 @@ extern "C"
size_t enclave_version_size,
size_t* enclave_version_len,
StartType start_type,
ConsensusType consensus_type,
size_t num_worker_thread,
void* time_location)
{
Expand All @@ -142,7 +140,6 @@ extern "C"
enclave_version_size,
enclave_version_len,
start_type,
consensus_type,
num_worker_thread,
time_location);
return OE_OK;
Expand Down
38 changes: 4 additions & 34 deletions src/enclave/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ namespace enclave
oversized::WriterFactory writer_factory;
ccf::NetworkState network;
ccf::ShareManager share_manager;
std::shared_ptr<ccf::NodeToNode> n2n_channels;
std::shared_ptr<RPCMap> rpc_map;
std::shared_ptr<RPCSessions> rpcsessions;
std::unique_ptr<ccf::NodeState> node;
std::shared_ptr<ccf::Forwarder<ccf::NodeToNode>> cmd_forwarder;
ringbuffer::WriterPtr to_host = nullptr;
std::chrono::microseconds last_tick_time;
ENGINE* rdrand_engine = nullptr;
Expand Down Expand Up @@ -67,7 +65,6 @@ namespace enclave
Enclave(
const EnclaveConfig& ec,
const CCFConfig::SignatureIntervals& signature_intervals,
const ConsensusType& consensus_type_,
const consensus::Configuration& consensus_config,
const CurveID& curve_id) :
circuit(
Expand All @@ -79,13 +76,10 @@ namespace enclave
ec.from_enclave_buffer_offsets}),
basic_writer_factory(circuit),
writer_factory(basic_writer_factory, ec.writer_config),
network(consensus_type_),
network(consensus_config.consensus_type),
share_manager(network),
n2n_channels(std::make_shared<ccf::NodeToNodeImpl>(writer_factory)),
rpc_map(std::make_shared<RPCMap>()),
rpcsessions(std::make_shared<RPCSessions>(writer_factory, rpc_map)),
cmd_forwarder(std::make_shared<ccf::Forwarder<ccf::NodeToNode>>(
rpcsessions, n2n_channels, rpc_map, consensus_type_))
rpcsessions(std::make_shared<RPCSessions>(writer_factory, rpc_map))
{
ccf::initialize_oe();

Expand Down Expand Up @@ -130,19 +124,10 @@ namespace enclave
rpc_map->register_frontend<ccf::ActorsType::nodes>(
std::make_unique<ccf::NodeRpcFrontend>(network, *context));

for (auto& [actor, fe] : rpc_map->frontends())
{
fe->set_sig_intervals(
signature_intervals.sig_tx_interval,
signature_intervals.sig_ms_interval);
fe->set_cmd_forwarder(cmd_forwarder);
}

node->initialize(
consensus_config,
n2n_channels,
rpc_map,
cmd_forwarder,
rpcsessions,
signature_intervals.sig_tx_interval,
signature_intervals.sig_ms_interval);
}
Expand Down Expand Up @@ -275,22 +260,7 @@ namespace enclave

DISPATCHER_SET_MESSAGE_HANDLER(
bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
const auto [body] =
ringbuffer::read_message<ccf::node_inbound>(data, size);

auto p = body.data();
auto psize = body.size();

if (
serialized::peek<ccf::NodeMsgType>(p, psize) ==
ccf::NodeMsgType::forwarded_msg)
{
cmd_forwarder->recv_message(p, psize);
}
else
{
node->node_msg(std::move(body));
}
node->recv_node_inbound(data, size);
});

DISPATCHER_SET_MESSAGE_HANDLER(
Expand Down
7 changes: 1 addition & 6 deletions src/enclave/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ extern "C"
size_t enclave_version_size,
size_t* enclave_version_len,
StartType start_type,
ConsensusType consensus_type,
size_t num_worker_threads,
void* time_location)
{
Expand Down Expand Up @@ -144,11 +143,7 @@ extern "C"
#endif

auto enclave = new enclave::Enclave(
ec,
cc.signature_intervals,
consensus_type,
cc.consensus_config,
cc.curve_id);
ec, cc.signature_intervals, cc.consensus_config, cc.curve_id);

if (!enclave->create_new_node(
start_type,
Expand Down
2 changes: 0 additions & 2 deletions src/host/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ namespace host
std::vector<uint8_t>& node_cert,
std::vector<uint8_t>& network_cert,
StartType start_type,
ConsensusType consensus_type,
size_t num_worker_thread,
void* time_location)
{
Expand Down Expand Up @@ -122,7 +121,6 @@ namespace host
enclave_version_buf.size(),
&enclave_version_len,
start_type,
consensus_type,
num_worker_thread,
time_location);

Expand Down
4 changes: 2 additions & 2 deletions src/host/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,8 @@ int main(int argc, char** argv)
#endif

CCFConfig ccf_config;
ccf_config.consensus_config = {raft_timeout,
ccf_config.consensus_config = {consensus,
raft_timeout,
raft_election_timeout,
bft_view_change_timeout,
bft_status_interval};
Expand Down Expand Up @@ -846,7 +847,6 @@ int main(int argc, char** argv)
node_cert,
network_cert,
start_type,
consensus,
num_worker_threads,
time_updater->behaviour.get_value());

Expand Down
71 changes: 42 additions & 29 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,22 +293,31 @@ namespace ccf
//
void initialize(
const consensus::Configuration& consensus_config_,
std::shared_ptr<NodeToNode> n2n_channels_,
std::shared_ptr<enclave::RPCMap> rpc_map_,
std::shared_ptr<Forwarder<NodeToNode>> cmd_forwarder_,
std::shared_ptr<enclave::AbstractRPCResponder> rpc_sessions_,
size_t sig_tx_interval_,
size_t sig_ms_interval_)
{
std::lock_guard<std::mutex> guard(lock);
sm.expect(State::uninitialized);

consensus_config = consensus_config_;
n2n_channels = n2n_channels_;
rpc_map = rpc_map_;
cmd_forwarder = cmd_forwarder_;
sig_tx_interval = sig_tx_interval_;
sig_ms_interval = sig_ms_interval_;

n2n_channels = std::make_shared<ccf::NodeToNodeImpl>(writer_factory);

cmd_forwarder = std::make_shared<ccf::Forwarder<ccf::NodeToNode>>(
rpc_sessions_, n2n_channels, rpc_map, consensus_config.consensus_type);

sm.advance(State::initialized);

for (auto& [actor, fe] : rpc_map->frontends())
{
fe->set_sig_intervals(sig_tx_interval, sig_ms_interval);
fe->set_cmd_forwarder(cmd_forwarder);
}
}

//
Expand Down Expand Up @@ -1405,41 +1414,45 @@ namespace ccf
consensus->periodic_end();
}

void node_msg(const std::vector<uint8_t>& data)
void recv_node_inbound(const uint8_t* payload_data, size_t payload_size)
{
// Only process messages once part of network
if (
!sm.check(State::partOfNetwork) &&
!sm.check(State::partOfPublicNetwork) &&
!sm.check(State::readingPrivateLedger))
{
return;
}

const uint8_t* payload_data = data.data();
size_t payload_size = data.size();

NodeMsgType msg_type =
serialized::overlay<NodeMsgType>(payload_data, payload_size);
NodeId from = serialized::read<NodeId::Value>(payload_data, payload_size);

switch (msg_type)
if (msg_type == ccf::NodeMsgType::forwarded_msg)
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
{
case channel_msg:
{
n2n_channels->recv_message(from, payload_data, payload_size);
break;
}
case consensus_msg:
cmd_forwarder->recv_message(from, payload_data, payload_size);
}
else
{
// Only process messages once part of network
if (
!sm.check(State::partOfNetwork) &&
!sm.check(State::partOfPublicNetwork) &&
!sm.check(State::readingPrivateLedger))
{
consensus->recv_message(from, payload_data, payload_size);
break;
return;
}

default:
switch (msg_type)
{
LOG_FAIL_FMT("Unknown node message type: {}", msg_type);
return;
case channel_msg:
{
n2n_channels->recv_message(from, payload_data, payload_size);
break;
}
case consensus_msg:
{
consensus->recv_message(from, payload_data, payload_size);
break;
}

default:
{
LOG_FAIL_FMT("Unknown node message type: {}", msg_type);
return;
}
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/node/rpc/forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,10 @@ namespace ccf
return m;
}

void recv_message(const uint8_t* data, size_t size)
void recv_message(const NodeId& from, const uint8_t* data, size_t size)
{
try
{
serialized::skip(data, size, sizeof(NodeMsgType));

NodeId from = serialized::read<NodeId::Value>(data, size);
auto forwarded_msg = serialized::peek<ForwardedMsg>(data, size);

switch (forwarded_msg)
Expand Down