Skip to content

Commit

Permalink
add a lot of docs
Browse files Browse the repository at this point in the history
  • Loading branch information
0xNineteen committed Aug 23, 2023
1 parent d0e4e59 commit fe30bc4
Showing 1 changed file with 105 additions and 12 deletions.
117 changes: 105 additions & 12 deletions src/gossip/gossip_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ pub const GossipService = struct {
}
}

/// spawns required threads for the gossip serivce.
/// including: 1) socket reciever 2) packet verifier 3) packet processor 4) build message loop (to send outgoing message)
/// and 5) a socket responder (to send outgoing packets)
pub fn run(self: *Self, logger: *Logger) !void {
const id = self.cluster_info.our_contact_info.pubkey;
logger.infof("running gossip service at {any} with pubkey {s}", .{ self.gossip_socket.getLocalEndPoint(), id.cached_str.? });
Expand Down Expand Up @@ -193,6 +196,9 @@ pub const GossipService = struct {
build_message_loop_handle.join();
}

/// main logic for deserializing Packets into Protocol messages
/// and verifing they have valid values, and have valid signatures.
/// Verified Protocol messages are then sent to the verified_channel.
fn verify_packets(
allocator: std.mem.Allocator,
packet_channel: *PacketChannel,
Expand Down Expand Up @@ -229,13 +235,17 @@ pub const GossipService = struct {
}
}

/// main logic for recieving and processing `Protocol` messages.
pub fn process_messages(
allocator: std.mem.Allocator,
/// channel which sends verified Protocol messages
verified_channel: *ProtocolChannel,
/// channel which sends outgoing Packets
responder_channel: *PacketChannel,
crds_table_rw: *RwMux(CrdsTable),
active_set_rw: *RwMux(ActiveSet),
failed_pull_hashes_mux: *Mux(HashTimeQueue),
/// the localnode's keypair to sign messages
my_keypair: *KeyPair,
logger: *Logger,
) !void {
Expand All @@ -262,7 +272,7 @@ pub const GossipService = struct {

if (failed_insert_origins.count() == 0) continue;

var prune_packets = build_prune_messages(allocator, crds_table_rw, &failed_insert_origins, push_from, my_keypair) catch |err| {
var prune_packets = build_prune_message(allocator, crds_table_rw, &failed_insert_origins, push_from, my_keypair) catch |err| {
logger.warnf("error building prune messages: {s}", .{@errorName(err)});
continue;
};
Expand Down Expand Up @@ -337,14 +347,24 @@ pub const GossipService = struct {
}
}

/// main gossip loop for periodically sending new protocol messages.
/// this includes sending push messages, pull requests, and triming old
/// gossip data (in the crds_table, active_set, and failed_pull_hashes).
fn build_message_loop(
allocator: std.mem.Allocator,
/// channel to send outgoing packets to
responder_channel: *PacketChannel,
/// the crds table
crds_table_rw: *RwMux(CrdsTable),
/// the active set to send push messages to (is also periodically rotated)
active_set_rw: *RwMux(ActiveSet),
/// the failed pull hashes queue to include in new pull requests
failed_pull_hashes_mux: *Mux(HashTimeQueue),
/// the queue of crds values which should be periodically pushed out
push_msg_queue_mux: *Mux(std.ArrayList(CrdsValue)),
/// local node's keypair used to sign outgoing messages
my_keypair: *KeyPair,
/// logger used for debugging
logger: *Logger,
) !void {
var last_push_ts: u64 = 0;
Expand Down Expand Up @@ -467,11 +487,18 @@ pub const GossipService = struct {
}
}

/// logic for building new push messages which are sent to peers from the
/// active set and serialized into packets.
fn build_push_messages(
allocator: std.mem.Allocator,
/// crds table to read new values from
crds_table_rw: *RwMux(CrdsTable),
/// the active set to get peers to send push messages to
active_set_rw: *RwMux(ActiveSet),
/// the local node's pubkey used to build the push message
my_pubkey: Pubkey,
/// push messages include crds values which have been inserted past this cursor
/// note: this cursor is updated to record new values which are included in the push messages
push_cursor: *u64,
) !std.ArrayList(Packet) {
// TODO: find a better static value?
Expand Down Expand Up @@ -572,12 +599,19 @@ pub const GossipService = struct {
return push_packets;
}

/// builds new pull request messages and serializes it into a list of Packets
/// to be sent to a random set of gossip nodes.
fn build_pull_requests(
allocator: std.mem.Allocator,
/// the crds table used to build the pull request
crds_table: *RwMux(CrdsTable),
/// failed pull hashes to include in the pull request
failed_pull_hashes: *const std.ArrayList(Hash),
/// the bloomsize of the pull request's filters
bloom_size: usize,
/// crds value used to construct the pull request message
my_contact_info: CrdsValue,
/// the logger to use for debuggin
logger: *Logger,
) !std.ArrayList(Packet) {
// NOTE: these filters need to be de-init at some point
Expand Down Expand Up @@ -636,12 +670,20 @@ pub const GossipService = struct {
return output;
}

/// logic for handling a pull request message
/// values which are missing in the pull request filter are returned as a pull response
/// which are serialized into packets.
fn handle_pull_request(
allocator: std.mem.Allocator,
/// the crds table to search in
crds_table_rw: *RwMux(CrdsTable),
/// the crds value associated with the pull request
pull_value: CrdsValue,
/// the crds filter of the pull request
pull_filter: CrdsFilter,
/// the endpoint of the peer sending the pull request (/who to send the pull response to)
pull_from_endpoint: EndPoint,
/// the local nodes pubkey used to build the pull response message
my_pubkey: Pubkey,
) error{ SerializationError, OutOfMemory }!std.ArrayList(Packet) {
const now = get_wallclock();
Expand Down Expand Up @@ -689,10 +731,17 @@ pub const GossipService = struct {
);
}

/// logic for handling a pull response message.
/// successful inserted values, have their origin value timestamps updated.
/// failed inserts (ie, too old or duplicate values) are added to the failed pull hashes so that they can be
/// included in the next pull request (so we dont receive them again).
fn handle_pull_response(
allocator: std.mem.Allocator,
/// the crds table to insert the values into
crds_table_rw: *RwMux(CrdsTable),
/// the failed pull hashes to update with the values which fail the insertion
failed_pull_hashes_mux: *Mux(HashTimeQueue),
/// the array of values to insert into the crds table
crds_values: []CrdsValue,
) error{OutOfMemory}!void {
// TODO: benchmark and compare with labs' preprocessing
Expand Down Expand Up @@ -754,9 +803,15 @@ pub const GossipService = struct {
}
}

/// logic for handling a prune message. verifies the prune message
/// is not too old, and that the destination pubkey is the local node,
/// then updates the active set to prune the list of origin Pubkeys.
fn handle_prune_message(
/// the prune message to process
prune_msg: *const PruneData,
/// the active set to update
active_set_rw: *RwMux(ActiveSet),
/// the local nodes pubkey to verify the prune message is for us
my_pubkey: *const Pubkey,
) error{ PruneMessageTooOld, BadDestination }!void {
const now = get_wallclock();
Expand Down Expand Up @@ -786,19 +841,25 @@ pub const GossipService = struct {
}
}

fn build_prune_messages(
/// builds a prune message for a list of origin Pubkeys and serializes the values
/// into packets to send to the prune_destination.
fn build_prune_message(
allocator: std.mem.Allocator,
/// the crds table used to lookup the contact info of the `prune_destination` pubkey
crds_table_rw: *RwMux(CrdsTable),
/// origin Pubkeys which will be pruned
failed_origins: *const std.AutoArrayHashMap(Pubkey, void),
push_from: Pubkey,
/// the pubkey of the node which we will send the prune message to
prune_destination: Pubkey,
/// our keypair to sign the prune message
my_keypair: *KeyPair,
) error{ CantFindContactInfo, InvalidGossipAddress, OutOfMemory, SignatureError }!std.ArrayList(Packet) {
const from_contact_info = blk: {
var crds_table_lg = crds_table_rw.read();
defer crds_table_lg.unlock();

const crds_table: *const CrdsTable = crds_table_lg.get();
break :blk crds_table.get(crds.CrdsValueLabel{ .LegacyContactInfo = push_from }) orelse {
break :blk crds_table.get(crds.CrdsValueLabel{ .LegacyContactInfo = prune_destination }) orelse {
return error.CantFindContactInfo;
};
};
Expand All @@ -825,7 +886,7 @@ pub const GossipService = struct {
const is_last_iter = i == failed_origin_len - 1;
if (origin_count == MAX_PRUNE_DATA_NODES or is_last_iter) {
// create protocol message
var prune_data = PruneData.init(my_pubkey, origin_buf[0..origin_count], push_from, now);
var prune_data = PruneData.init(my_pubkey, origin_buf[0..origin_count], prune_destination, now);
prune_data.sign(my_keypair) catch return error.SignatureError;

// put it into a packet
Expand All @@ -843,9 +904,14 @@ pub const GossipService = struct {
return prune_packets;
}

/// logic for handling push messages. crds values from the push message
/// are inserted into the crds table. the origin pubkeys of values which
/// fail the insertion are returned to generate prune messages.
fn handle_push_message(
allocator: std.mem.Allocator,
/// push message values to insert into the crds table
push_values: []CrdsValue,
/// the crds table
crds_table_rw: *RwMux(CrdsTable),
) error{OutOfMemory}!std.AutoArrayHashMap(Pubkey, void) {
const failed_insert_indexs = blk: {
Expand Down Expand Up @@ -879,9 +945,14 @@ pub const GossipService = struct {
return failed_origins;
}

/// builds a corresponding Pong message for a given Ping message and serializes the
/// protocol message into a Packet.
fn handle_ping_message(
/// the ping message to build a Pong message for
ping: *const Ping,
/// the keypair used to sign the Pong message
my_keypair: *KeyPair,
/// the endpoint to send the Pong message
from_endpoint: EndPoint,
) error{ SignatureError, SerializationError }!Packet {
const pong = try Pong.init(ping, my_keypair);
Expand All @@ -897,9 +968,16 @@ pub const GossipService = struct {
return packet;
}

/// removes old values from the crds table and failed pull hashes struct
/// based on the current time. This includes triming the purged values from the
/// crds table, triming the max number of pubkeys in the crds table, and removing
/// old labels from the crds table.
fn trim_memory(
/// the crds table to remove old values from
crds_table_rw: *RwMux(CrdsTable),
/// the failed pull hashes struct to remove old values from
failed_pull_hashes_mux: *Mux(HashTimeQueue),
/// the current time
now: u64,
) error{OutOfMemory}!void {
const purged_cutoff_timestamp = now -| (5 * CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS);
Expand All @@ -920,9 +998,14 @@ pub const GossipService = struct {
failed_pull_hashes_lg.unlock();
}

/// drains values from the push queue and inserts them into the crds table.
/// when inserting values in the crds table, any errors are ignored.
fn drain_push_queue_to_crds_table(
/// the crds table to insert values into
crds_table_rw: *RwMux(CrdsTable),
/// the push queue to drain
push_msg_queue_mux: *Mux(std.ArrayList(CrdsValue)),
/// the current time to insert the values with
now: u64,
) void {
var push_msg_queue_lg = push_msg_queue_mux.lock();
Expand All @@ -938,13 +1021,23 @@ pub const GossipService = struct {
}
}

/// returns a list of valid gossip nodes. this works by reading
/// the contact infos from the crds table and filtering out
/// nodes that are 1) too old, 2) have a different shred version, or 3) have
/// an invalid gossip address.
pub fn get_gossip_nodes(
crds_table_rw: *RwMux(CrdsTable), // reads to get contact infos
my_pubkey: *const Pubkey, // used to filter out ourself
my_shred_version: u16, // used to filter matching shredversions
nodes: []crds.LegacyContactInfo, // output
comptime MAX_SIZE: usize, // max_size == nodes.len but comptime for init of stack array
now: u64, // filters old values
/// the crds table to read contact infos from
crds_table_rw: *RwMux(CrdsTable),
/// the pubkey of ourself (used to filter out ourself)
my_pubkey: *const Pubkey,
/// the shred version of ourself (returns only nodes with the same shred version)
my_shred_version: u16,
/// the output slice which will be filled with gossip nodes
nodes: []crds.LegacyContactInfo,
/// the maximum number of nodes to return ( max_size == nodes.len but comptime for init of stack array)
comptime MAX_SIZE: usize,
/// current time (used to filter out nodes that are too old)
now: u64,
) []crds.LegacyContactInfo {
std.debug.assert(MAX_SIZE == nodes.len);

Expand Down Expand Up @@ -1275,7 +1368,7 @@ test "gossip.gossip_service: test build prune messages and handle_push_msgs" {
defer failed_origins.deinit();
try std.testing.expect(failed_origins.keys().len > 0);

var prune_packets = try GossipService.build_prune_messages(
var prune_packets = try GossipService.build_prune_message(
allocator,
&crds_table_rw,
&failed_origins,
Expand Down

0 comments on commit fe30bc4

Please sign in to comment.