Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossip: push messages #14

Merged
merged 22 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
build-unix:
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest]

runs-on: ${{matrix.os}}

Expand Down
16 changes: 16 additions & 0 deletions src/bincode/bincode.zig
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ pub fn writeToSlice(slice: []u8, data: anytype, params: Params) ![]u8 {
var writer = stream.writer();
var s = serializer(writer, params);
const ss = s.serializer();

try getty.serialize(null, data, ss);
return stream.getWritten();
}
Expand All @@ -534,6 +535,18 @@ pub fn write(alloc: ?std.mem.Allocator, writer: anytype, data: anytype, params:
try getty.serialize(alloc, data, ss);
}

pub fn get_serialized_size(alloc: std.mem.Allocator, data: anytype, params: Params) !usize {
var list = try std.ArrayList(u8).initCapacity(alloc, @bitSizeOf(@TypeOf(data)));
defer list.deinit();

var writer = list.writer();
var s = serializer(writer, params);
const ss = s.serializer();
try getty.serialize(alloc, data, ss);

return list.items.len;
}

// can call if dont require an allocator
pub fn readFromSlice(alloc: ?std.mem.Allocator, comptime T: type, slice: []const u8, params: Params) !T {
var stream = std.io.fixedBufferStream(slice);
Expand Down Expand Up @@ -606,6 +619,9 @@ test "bincode: custom field serialization" {
std.debug.print("{any}", .{out});
try std.testing.expect(out[out.len - 1] != 20); // skip worked

var size = try get_serialized_size(std.testing.allocator, foo, Params{});
try std.testing.expect(size > 0);

var r = try readFromSlice(std.testing.allocator, Foo, out, Params{});
defer free(std.testing.allocator, r);
std.debug.print("{any}", .{r});
Expand Down
2 changes: 1 addition & 1 deletion src/core/pubkey.zig
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub const Pubkey = struct {
return Self{ .data = bytes[0..32].*, .cached_str = dest[0..44].* };
}

pub fn equals(self: *const Self, other: *Pubkey) bool {
pub fn equals(self: *const Self, other: *const Pubkey) bool {
return std.mem.eql(u8, &self.data, &other.data);
}

Expand Down
20 changes: 20 additions & 0 deletions src/core/transaction.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ pub const Transaction = struct {
.message = Message.default(),
};
}

pub fn sanitize(self: *const Transaction) !void {
const num_required_sigs = self.message.header.num_required_signatures;
const num_signatures = self.signatures.len;
if (num_required_sigs > num_signatures) {
return error.InsufficientSignatures;
}

const num_account_keys = self.message.account_keys.len;
if (num_signatures > num_account_keys) {
return error.TooManySignatures;
}
try self.message.sanitize();
}
};

pub const Message = struct {
Expand All @@ -40,6 +54,12 @@ pub const Message = struct {
.instructions = &[_]CompiledInstruction{},
};
}

pub fn sanitize(self: *const Message) !void {
// TODO:
std.debug.print("sanitize not implemented for type: Message\n", .{});
_ = self;
}
};

pub const MessageHeader = struct {
Expand Down
114 changes: 114 additions & 0 deletions src/gossip/active_set.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
const std = @import("std");
const ClusterInfo = @import("cluster_info.zig").ClusterInfo;
const network = @import("zig-network");
const EndPoint = network.EndPoint;
const Packet = @import("packet.zig").Packet;
const PACKET_DATA_SIZE = @import("packet.zig").PACKET_DATA_SIZE;
const Channel = @import("../sync/channel.zig").Channel;
const Thread = std.Thread;
const AtomicBool = std.atomic.Atomic(bool);
const UdpSocket = network.Socket;
const Tuple = std.meta.Tuple;
const SocketAddr = @import("net.zig").SocketAddr;
const Protocol = @import("protocol.zig").Protocol;
const Ping = @import("ping_pong.zig").Ping;
const bincode = @import("../bincode/bincode.zig");
const crds = @import("../gossip/crds.zig");
const CrdsValue = crds.CrdsValue;

const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const Pubkey = @import("../core/pubkey.zig").Pubkey;
const get_wallclock = @import("../gossip/crds.zig").get_wallclock;

const _crds_table = @import("../gossip/crds_table.zig");
const CrdsTable = _crds_table.CrdsTable;
const CrdsError = _crds_table.CrdsError;
const Logger = @import("../trace/log.zig").Logger;
const GossipRoute = _crds_table.GossipRoute;

const pull_request = @import("../gossip/pull_request.zig");
const CrdsFilter = pull_request.CrdsFilter;
const MAX_NUM_PULL_REQUESTS = pull_request.MAX_NUM_PULL_REQUESTS;

const pull_response = @import("../gossip/pull_response.zig");
const GossipService = @import("../gossip/gossip_service.zig").GossipService;

var gpa_allocator = std.heap.GeneralPurposeAllocator(.{}){};
var gpa = gpa_allocator.allocator();

const PacketChannel = Channel(Packet);
const ProtocolMessage = struct { from_addr: EndPoint, message: Protocol };
const ProtocolChannel = Channel(ProtocolMessage);

const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
const MAX_BYTES_PER_PUSH: u64 = PACKET_DATA_SIZE * 64;
const PUSH_MESSAGE_MAX_PAYLOAD_SIZE: usize = PACKET_DATA_SIZE - 44;

const GOSSIP_SLEEP_MILLIS: u64 = 100;

const NUM_ACTIVE_SET_ENTRIES: usize = 25;
const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;

pub const ActiveSet = struct {
// store pubkeys as keys in crds table bc the data can change
peers: [NUM_ACTIVE_SET_ENTRIES]Pubkey,
len: u8,

const Self = @This();

pub fn init() Self {
return Self{
.peers = undefined,
.len = 0,
};
}

pub fn get_fanout_peers(
self: *const Self,
allocator: std.mem.Allocator,
crds_table: *CrdsTable,
) !std.ArrayList(EndPoint) {
var active_set_endpoints = std.ArrayList(EndPoint).init(allocator);
errdefer active_set_endpoints.deinit();

for (self.peers[0..self.len]) |peer_pubkey| {
const peer_info = crds_table.get(crds.CrdsValueLabel{
.LegacyContactInfo = peer_pubkey,
}).?;
const peer_gossip_addr = peer_info.value.data.LegacyContactInfo.gossip;

crds.sanitize_socket(&peer_gossip_addr) catch continue;
try active_set_endpoints.append(peer_gossip_addr.toEndpoint());

if (active_set_endpoints.items.len == CRDS_GOSSIP_PUSH_FANOUT) {
break;
}
}

return active_set_endpoints;
}

pub fn reset(self: *Self, crds_table: *CrdsTable, my_pubkey: Pubkey, my_shred_version: u16) !void {
const now = get_wallclock();
var buf: [NUM_ACTIVE_SET_ENTRIES]crds.LegacyContactInfo = undefined;
var crds_peers = try GossipService.get_gossip_nodes(
crds_table,
&my_pubkey,
my_shred_version,
&buf,
NUM_ACTIVE_SET_ENTRIES,
now,
);

const size = @min(crds_peers.len, NUM_ACTIVE_SET_ENTRIES);
var rng = std.rand.DefaultPrng.init(get_wallclock());
pull_request.shuffle_first_n(rng.random(), crds.LegacyContactInfo, crds_peers, size);

for (0..size) |i| {
self.peers[i] = crds_peers[i].id;
}
self.len = size;
}
};
Loading
Loading