Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Add signature verification to gossip #1937

Merged
merged 17 commits into from
Dec 1, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions sdk/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ impl Signature {
}
}

pub trait Signable {
fn sign(&mut self, keypair: &Keypair) {
let data = self.signable_data();
self.set_signature(Signature::new(&keypair.sign(&data).as_ref()));
}
fn verify(&self) -> bool {
self.get_signature()
.verify(&self.pubkey().as_ref(), &self.signable_data())
}

fn pubkey(&self) -> Pubkey;
fn signable_data(&self) -> Vec<u8>;
fn get_signature(&self) -> Signature;
fn set_signature(&mut self, signature: Signature);
}

impl AsRef<[u8]> for Signature {
fn as_ref(&self) -> &[u8] {
&self.0[..]
Expand Down
151 changes: 127 additions & 24 deletions src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use rand::{thread_rng, Rng};
use rayon::prelude::*;
use result::Result;
use rpc::RPC_PORT;
use signature::{Keypair, KeypairUtil};
use signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{duration_as_ms, timestamp};
Expand Down Expand Up @@ -60,17 +60,64 @@ pub enum ClusterInfoError {
pub struct ClusterInfo {
/// The network
pub gossip: CrdsGossip,
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
keypair: Arc<Keypair>,
}

// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Debug, Deserialize, Serialize)]
pub struct PruneData {
/// Pubkey of the node that sent this prune data
pub pubkey: Pubkey,
/// Pubkeys of nodes that should be pruned
pub prunes: Vec<Pubkey>,
/// Signature of this Prune Message
pub signature: Signature,
/// The Pubkey of the intended node/destination for this message
pub destination: Pubkey,
/// Wallclock of the node that generated this message
pub wallclock: u64,
}

impl Signable for PruneData {
fn pubkey(&self) -> Pubkey {
self.pubkey
}

fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
pubkey: Pubkey,
prunes: Vec<Pubkey>,
destination: Pubkey,
wallclock: u64,
}
let data = SignData {
pubkey: self.pubkey,
prunes: self.prunes.clone(),
destination: self.destination,
wallclock: self.wallclock,
};
serialize(&data).expect("serialize PruneData")
}

fn get_signature(&self) -> Signature {
self.signature
}

fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}

// TODO These messages should go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize, Debug)]
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
enum Protocol {
/// Gosisp protocol messages
/// Gossip protocol messages
PullRequest(Bloom<Hash>, CrdsValue),
PullResponse(Pubkey, Vec<CrdsValue>),
PushMessage(Pubkey, Vec<CrdsValue>),
PruneMessage(Pubkey, Vec<Pubkey>),
PruneMessage(Pubkey, PruneData),

/// Window protocol messages
/// TODO: move this message to a different module
Expand All @@ -79,8 +126,13 @@ enum Protocol {

impl ClusterInfo {
pub fn new(node_info: NodeInfo) -> Self {
//Without a keypair, gossip will not function. Only useful for tests.
ClusterInfo::new_with_keypair(node_info, Arc::new(Keypair::new()))
}
pub fn new_with_keypair(node_info: NodeInfo, keypair: Arc<Keypair>) -> Self {
let mut me = ClusterInfo {
gossip: CrdsGossip::default(),
keypair,
};
let id = node_info.id;
me.gossip.set_self(id);
Expand All @@ -92,12 +144,14 @@ impl ClusterInfo {
let mut my_data = self.my_data();
let now = timestamp();
my_data.wallclock = now;
let entry = CrdsValue::ContactInfo(my_data);
let mut entry = CrdsValue::ContactInfo(my_data);
entry.sign(&self.keypair);
self.gossip.refresh_push_active_set();
self.gossip.process_push_message(&[entry], now);
}
pub fn insert_info(&mut self, node_info: NodeInfo) {
let value = CrdsValue::ContactInfo(node_info);
let mut value = CrdsValue::ContactInfo(node_info);
value.sign(&self.keypair);
let _ = self.gossip.crds.insert(value, timestamp());
}
pub fn id(&self) -> Pubkey {
Expand Down Expand Up @@ -165,13 +219,10 @@ impl ClusterInfo {
let prev = self.leader_id();
let self_id = self.gossip.id;
let now = timestamp();
let leader = LeaderId {
id: self_id,
leader_id: key,
wallclock: now,
};
let entry = CrdsValue::LeaderId(leader);
let leader = LeaderId::new(self_id, key, now);
let mut entry = CrdsValue::LeaderId(leader);
warn!("{}: LEADER_UPDATE TO {} from {}", self_id, key, prev);
entry.sign(&self.keypair);
self.gossip.process_push_message(&[entry], now);
}

Expand Down Expand Up @@ -743,15 +794,23 @@ impl ClusterInfo {
.gossip
.process_push_message(&data, timestamp());
if !prunes.is_empty() {
let mut wme = me.write().unwrap();
inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len());
let rsp = Protocol::PruneMessage(self_id, prunes);
let ci = wme.lookup(from).cloned();
let pushes: Vec<_> = wme.new_push_requests();
let ci = me.read().unwrap().lookup(from).cloned();
let pushes: Vec<_> = me.write().unwrap().new_push_requests();
inc_new_counter_info!("cluster_info-push_message-pushes", pushes.len());
let mut rsp: Vec<_> = ci
.and_then(|ci| to_blob(rsp, ci.ncp).ok())
.into_iter()
.and_then(|ci| {
let mut prune_msg = PruneData {
pubkey: self_id,
prunes,
signature: Signature::default(),
destination: from,
wallclock: timestamp(),
};
prune_msg.sign(&me.read().unwrap().keypair);
let rsp = Protocol::PruneMessage(self_id, prune_msg);
to_blob(rsp, ci.ncp).ok()
}).into_iter()
.collect();
let mut blobs: Vec<_> = pushes
.into_iter()
Expand Down Expand Up @@ -821,19 +880,35 @@ impl ClusterInfo {
ledger_window: &mut Option<&mut LedgerWindow>,
) -> Vec<SharedBlob> {
match request {
// TODO sigverify these
// TODO verify messages faster
Protocol::PullRequest(filter, caller) => {
//Pulls don't need to be verified
Self::handle_pull_request(me, filter, caller, from_addr)
}
Protocol::PullResponse(from, data) => {
Protocol::PullResponse(from, mut data) => {
data.retain(|v| v.verify());
Self::handle_pull_response(me, from, data);
vec![]
}
Protocol::PushMessage(from, data) => Self::handle_push_message(me, from, &data),
Protocol::PushMessage(from, mut data) => {
data.retain(|v| v.verify());
Self::handle_push_message(me, from, &data)
}
Protocol::PruneMessage(from, data) => {
inc_new_counter_info!("cluster_info-prune_message", 1);
inc_new_counter_info!("cluster_info-prune_message-size", data.len());
me.write().unwrap().gossip.process_prune_msg(from, &data);
if data.verify() {
inc_new_counter_info!("cluster_info-prune_message", 1);
inc_new_counter_info!("cluster_info-prune_message-size", data.prunes.len());
me.write()
.unwrap()
.gossip
.process_prune_msg(
from,
data.destination,
&data.prunes,
data.wallclock,
timestamp(),
).ok();
}
vec![]
}
Protocol::RequestWindowIndex(from, ix) => {
Expand Down Expand Up @@ -1343,4 +1418,32 @@ mod tests {
assert!(node.sockets.repair.local_addr().unwrap().port() >= FULLNODE_PORT_RANGE.0);
assert!(node.sockets.repair.local_addr().unwrap().port() < FULLNODE_PORT_RANGE.1);
}

//test that all cluster_info objects only generate signed messages
//when constructed with keypairs
#[test]
fn test_gossip_signature_verification() {
//create new cluster info, leader, and peer
let keypair = Keypair::new();
let peer_keypair = Keypair::new();
let leader_keypair = Keypair::new();
let node_info = NodeInfo::new_localhost(keypair.pubkey(), 0);
let leader = NodeInfo::new_localhost(leader_keypair.pubkey(), 0);
let peer = NodeInfo::new_localhost(peer_keypair.pubkey(), 0);
let mut cluster_info = ClusterInfo::new_with_keypair(node_info.clone(), Arc::new(keypair));
cluster_info.set_leader(leader.id);
cluster_info.insert_info(peer.clone());
//check that all types of gossip messages are signed correctly
let (_, _, vals) = cluster_info.gossip.new_push_messages(timestamp());
// there should be some pushes ready
assert!(vals.len() > 0);
vals.par_iter().for_each(|v| assert!(v.verify()));

let (_, _, val) = cluster_info
.gossip
.new_pull_request(timestamp())
.ok()
.unwrap();
assert!(val.verify());
}
}
48 changes: 47 additions & 1 deletion src/contact_info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bincode::serialize;
use rpc::RPC_PORT;
use signature::{Keypair, KeypairUtil};
use signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
Expand All @@ -8,6 +9,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ContactInfo {
pub id: Pubkey,
/// signature of this ContactInfo
pub signature: Signature,
/// gossip address
pub ncp: SocketAddr,
/// address to connect to for replication
Expand Down Expand Up @@ -52,6 +55,7 @@ impl Default for ContactInfo {
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
wallclock: 0,
signature: Signature::default(),
}
}
}
Expand All @@ -69,6 +73,7 @@ impl ContactInfo {
) -> Self {
ContactInfo {
id,
signature: Signature::default(),
ncp,
tvu,
tpu,
Expand Down Expand Up @@ -161,6 +166,47 @@ impl ContactInfo {
}
}

impl Signable for ContactInfo {
fn pubkey(&self) -> Pubkey {
self.id
}

fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
id: Pubkey,
ncp: SocketAddr,
tvu: SocketAddr,
tpu: SocketAddr,
storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
wallclock: u64,
}

let me = self;
let data = SignData {
id: me.id,
ncp: me.ncp,
tvu: me.tvu,
tpu: me.tpu,
storage_addr: me.storage_addr,
rpc: me.rpc,
rpc_pubsub: me.rpc_pubsub,
wallclock: me.wallclock,
};
serialize(&data).expect("failed to serialize ContactInfo")
}

fn get_signature(&self) -> Signature {
self.signature
}

fn set_signature(&mut self, signature: Signature) {
self.signature = signature
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading