Skip to content

Commit

Permalink
setup sortition module against public key aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley committed Sep 13, 2024
1 parent b1dba82 commit b43e59d
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 54 deletions.
23 changes: 15 additions & 8 deletions packages/ciphernode/core/src/ciphernode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,27 @@ use crate::{
Subscribe,
};
use actix::prelude::*;
use alloy_primitives::Address;
use anyhow::Result;

pub struct Ciphernode {
fhe: Addr<Fhe>,
data: Addr<Data>,
bus: Addr<EventBus>,
address: Address
}

impl Actor for Ciphernode {
type Context = Context<Self>;
}

impl Ciphernode {
pub fn new(bus: Addr<EventBus>, fhe: Addr<Fhe>, data: Addr<Data>) -> Self {
Self { bus, fhe, data }
pub fn new(bus: Addr<EventBus>, fhe: Addr<Fhe>, data: Addr<Data>, address: Address) -> Self {
Self { bus, fhe, data, address }
}

pub async fn attach(bus: Addr<EventBus>, fhe: Addr<Fhe>, data: Addr<Data>) -> Addr<Self> {
let node = Ciphernode::new(bus.clone(), fhe, data).start();
pub async fn attach(bus: Addr<EventBus>, fhe: Addr<Fhe>, data: Addr<Data>, address: Address) -> Addr<Self> {
let node = Ciphernode::new(bus.clone(), fhe, data, address).start();
let _ = bus
.send(Subscribe::new("CiphernodeSelected", node.clone().into()))
.await;
Expand Down Expand Up @@ -58,7 +60,8 @@ impl Handler<CiphernodeSelected> for Ciphernode {
let fhe = self.fhe.clone();
let data = self.data.clone();
let bus = self.bus.clone();
Box::pin(async { on_ciphernode_selected(fhe, data, bus, event).await.unwrap() })
let address = self.address;
Box::pin(async move { on_ciphernode_selected(fhe, data, bus, event, address).await.unwrap() })
}
}

Expand All @@ -69,8 +72,9 @@ impl Handler<CiphertextOutputPublished> for Ciphernode {
let fhe = self.fhe.clone();
let data = self.data.clone();
let bus = self.bus.clone();
Box::pin(async {
on_decryption_requested(fhe, data, bus, event)
let address = self.address;
Box::pin(async move {
on_decryption_requested(fhe, data, bus, event, address)
.await
.unwrap()
})
Expand All @@ -82,6 +86,7 @@ async fn on_ciphernode_selected(
data: Addr<Data>,
bus: Addr<EventBus>,
event: CiphernodeSelected,
address: Address
) -> Result<()> {
let CiphernodeSelected { e3_id, .. } = event;
// generate keyshare
Expand All @@ -98,7 +103,7 @@ async fn on_ciphernode_selected(
data.do_send(Insert(format!("{}/pk", e3_id).into(), pubkey.clone()));

// broadcast the KeyshareCreated message
let event = EnclaveEvent::from(KeyshareCreated { pubkey, e3_id });
let event = EnclaveEvent::from(KeyshareCreated { pubkey, e3_id, node: address });
bus.do_send(event);

Ok(())
Expand All @@ -109,6 +114,7 @@ async fn on_decryption_requested(
data: Addr<Data>,
bus: Addr<EventBus>,
event: CiphertextOutputPublished,
address: Address
) -> Result<()> {
let CiphertextOutputPublished {
e3_id,
Expand All @@ -130,6 +136,7 @@ async fn on_decryption_requested(
let event = EnclaveEvent::from(DecryptionshareCreated {
e3_id,
decryption_share,
node: address
});

bus.do_send(event);
Expand Down
8 changes: 6 additions & 2 deletions packages/ciphernode/core/src/ciphernode_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// TODO: if the ciphernode fails restart the node by replaying all stored events back to it

use actix::prelude::*;
use alloy_primitives::Address;

use crate::{Ciphernode, Data, EnclaveEvent, EventBus, Fhe};

Expand All @@ -11,14 +12,16 @@ pub struct CiphernodeSequencer {
data: Addr<Data>,
bus: Addr<EventBus>,
child: Option<Addr<Ciphernode>>,
address: Address,
}
impl CiphernodeSequencer {
pub fn new(fhe: Addr<Fhe>, data: Addr<Data>, bus: Addr<EventBus>) -> Self {
pub fn new(fhe: Addr<Fhe>, data: Addr<Data>, bus: Addr<EventBus>, address:Address) -> Self {
Self {
fhe,
bus,
data,
child: None,
address
}
}
}
Expand All @@ -32,9 +35,10 @@ impl Handler<EnclaveEvent> for CiphernodeSequencer {
let bus = self.bus.clone();
let fhe = self.fhe.clone();
let data = self.data.clone();
let address = self.address;
let sink = self
.child
.get_or_insert_with(|| Ciphernode::new(bus, fhe, data).start());
.get_or_insert_with(|| Ciphernode::new(bus, fhe, data, address).start());
sink.do_send(msg);
}
}
4 changes: 4 additions & 0 deletions packages/ciphernode/core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,15 @@ impl fmt::Display for EnclaveEvent {
pub struct KeyshareCreated {
pub pubkey: Vec<u8>,
pub e3_id: E3id,
pub node: Address
}

#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[rtype(result = "anyhow::Result<()>")]
pub struct DecryptionshareCreated {
pub decryption_share: Vec<u8>,
pub e3_id: E3id,
pub node: Address
}

#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -343,6 +345,7 @@ mod tests {
events::extract_enclave_event_name, serializers::PublicKeyShareSerializer, E3id,
KeyshareCreated,
};
use alloy_primitives::address;
use fhe::{
bfv::{BfvParametersBuilder, SecretKey},
mbfv::{CommonRandomPoly, PublicKeyShare},
Expand Down Expand Up @@ -381,6 +384,7 @@ mod tests {
let kse = EnclaveEvent::from(KeyshareCreated {
e3_id: E3id::from(1001),
pubkey,
node: address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045")
});
let kse_bytes = kse.to_bytes()?;
let _ = EnclaveEvent::from_bytes(&kse_bytes.clone());
Expand Down
12 changes: 9 additions & 3 deletions packages/ciphernode/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ mod tests {

// create ciphernode actor for managing ciphernode flow
let sortition = Sortition::attach(bus.clone());
CiphernodeSelector::attach(bus.clone(), sortition, addr);
Registry::attach(bus.clone(), data.clone(), rng).await;
CiphernodeSelector::attach(bus.clone(), sortition.clone(), addr);
Registry::attach(bus.clone(), data.clone(), sortition, rng, addr).await;
}

fn setup_bfv_params(
Expand Down Expand Up @@ -253,14 +253,17 @@ mod tests {
EnclaveEvent::from(KeyshareCreated {
pubkey: p1.clone(),
e3_id: e3_id.clone(),
node: eth_addrs[0]
}),
EnclaveEvent::from(KeyshareCreated {
pubkey: p2.clone(),
e3_id: e3_id.clone(),
node: eth_addrs[1]
}),
EnclaveEvent::from(KeyshareCreated {
pubkey: p3.clone(),
e3_id: e3_id.clone()
e3_id: e3_id.clone(),
node: eth_addrs[2]
}),
EnclaveEvent::from(PublicKeyAggregated {
pubkey: PublicKeySerializer::to_bytes(pubkey.clone(), params.clone())?,
Expand Down Expand Up @@ -321,14 +324,17 @@ mod tests {
EnclaveEvent::from(DecryptionshareCreated {
decryption_share: ds1.clone(),
e3_id: e3_id.clone(),
node: eth_addrs[0]
}),
EnclaveEvent::from(DecryptionshareCreated {
decryption_share: ds2.clone(),
e3_id: e3_id.clone(),
node: eth_addrs[1]
}),
EnclaveEvent::from(DecryptionshareCreated {
decryption_share: ds3.clone(),
e3_id: e3_id.clone(),
node: eth_addrs[2]
}),
EnclaveEvent::from(PlaintextAggregated {
e3_id: e3_id.clone(),
Expand Down
96 changes: 67 additions & 29 deletions packages/ciphernode/core/src/publickey_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use crate::{
events::{E3id, EnclaveEvent, KeyshareCreated, PublicKeyAggregated},
fhe::{Fhe, GetAggregatePublicKey},
ordered_set::OrderedSet,
GetHasNode, Sortition,
};
use actix::prelude::*;
use anyhow::{anyhow, Result};
use anyhow::Result;

#[derive(Debug, Clone)]
pub enum PublicKeyAggregatorState {
Collecting {
nodecount: usize,
keyshares: OrderedSet<Vec<u8>>,
seed: u64,
},
Computing {
keyshares: OrderedSet<Vec<u8>>,
Expand All @@ -31,6 +33,7 @@ struct ComputeAggregate {
pub struct PublicKeyAggregator {
fhe: Addr<Fhe>,
bus: Addr<EventBus>,
sortition: Addr<Sortition>,
e3_id: E3id,
state: PublicKeyAggregatorState,
}
Expand All @@ -42,14 +45,23 @@ pub struct PublicKeyAggregator {
/// It is expected to change this mechanism as we work through adversarial scenarios and write tests
/// for them.
impl PublicKeyAggregator {
pub fn new(fhe: Addr<Fhe>, bus: Addr<EventBus>, e3_id: E3id, nodecount: usize) -> Self {
pub fn new(
fhe: Addr<Fhe>,
bus: Addr<EventBus>,
sortition: Addr<Sortition>,
e3_id: E3id,
nodecount: usize,
seed: u64,
) -> Self {
PublicKeyAggregator {
fhe,
bus,
e3_id,
sortition,
state: PublicKeyAggregatorState::Collecting {
nodecount,
keyshares: OrderedSet::new(),
seed,
},
}
}
Expand All @@ -58,6 +70,7 @@ impl PublicKeyAggregator {
let PublicKeyAggregatorState::Collecting {
nodecount,
keyshares,
..
} = &mut self.state
else {
return Err(anyhow::anyhow!("Can only add keyshare in Collecting state"));
Expand Down Expand Up @@ -94,41 +107,68 @@ impl Actor for PublicKeyAggregator {
impl Handler<EnclaveEvent> for PublicKeyAggregator {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
match msg {
EnclaveEvent::KeyshareCreated { data, .. } => ctx.notify(data),
_ => ()
if let EnclaveEvent::KeyshareCreated { data, .. } = msg {
ctx.notify(data)
}
}
}

impl Handler<KeyshareCreated> for PublicKeyAggregator {
type Result = Result<()>;

fn handle(&mut self, event: KeyshareCreated, ctx: &mut Self::Context) -> Self::Result {

if event.e3_id != self.e3_id {
return Err(anyhow!(
"Wrong e3_id sent to aggregator. This should not happen."
));
}
type Result = ResponseActFuture<Self, Result<()>>;

let PublicKeyAggregatorState::Collecting { .. } = self.state else {
return Err(anyhow!(
"Aggregator has been closed for collecting keyshares."
));
fn handle(&mut self, event: KeyshareCreated, _: &mut Self::Context) -> Self::Result {
let PublicKeyAggregatorState::Collecting {
nodecount, seed, ..
} = self.state.clone()
else {
return Box::pin(fut::ready(Ok(())));
};

// add the keyshare and
self.state = self.add_keyshare(event.pubkey)?;
let size = nodecount;
let address = event.node;
let e3_id = event.e3_id.clone();
let pubkey = event.pubkey.clone();

// Check the state and if it has changed to the computing
if let PublicKeyAggregatorState::Computing { keyshares } = &self.state {
ctx.notify(ComputeAggregate {
keyshares: keyshares.clone(),
})
}
Box::pin(
self.sortition
.send(GetHasNode {
address,
size,
seed,
})
.into_actor(self)
.map(move |res, act, ctx| {
// NOTE: Returning Ok(()) on errors as we probably dont need a result type here since
// we will not be doing a send
let has_node = res?;
if !has_node {
println!("Node not found in committee"); // TODO: log properly
return Ok(());
}

if e3_id != act.e3_id {
println!("Wrong e3_id sent to aggregator. This should not happen.");
return Ok(());
}

let PublicKeyAggregatorState::Collecting { .. } = act.state else {
println!("Aggregator has been closed for collecting keyshares."); // TODO: log properly
return Ok(());
};

// add the keyshare and
act.state = act.add_keyshare(pubkey)?;

// Check the state and if it has changed to the computing
if let PublicKeyAggregatorState::Computing { keyshares } = &act.state {
ctx.notify(ComputeAggregate {
keyshares: keyshares.clone(),
})
}

Ok(())
Ok(())
}),
)
}
}

Expand Down Expand Up @@ -170,5 +210,3 @@ impl Handler<ComputeAggregate> for PublicKeyAggregator {
)
}
}


Loading

0 comments on commit b43e59d

Please sign in to comment.