Skip to content

Commit

Permalink
Make socket_tag a type-safe enum
Browse files Browse the repository at this point in the history
  • Loading branch information
InKryption committed Jun 13, 2024
1 parent b807264 commit 6adc450
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 55 deletions.
6 changes: 3 additions & 3 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ fn validator() !void {
ip_echo_data.shred_version, // TODO atomic owned at top level? or owned by gossip is good?
ip_echo_data.ip,
&.{
.{ .tag = socket_tag.REPAIR, .port = repair_port },
.{ .tag = socket_tag.TURBINE_RECV, .port = turbine_recv_port },
.{ .tag = .REPAIR, .port = repair_port },
.{ .tag = .TURBINE_RECV, .port = turbine_recv_port },
},
);
defer gossip_service.deinit();
Expand Down Expand Up @@ -515,7 +515,7 @@ fn initGossip(
entrypoints: []const SocketAddr,
shred_version: u16,
gossip_host_ip: IpAddr,
sockets: []const struct { tag: u8, port: u16 },
sockets: []const struct { tag: socket_tag, port: u16 },
) !GossipService {
const gossip_port: u16 = config.current.gossip.port;
logger.infof("gossip host: {any}", .{gossip_host_ip});
Expand Down
103 changes: 51 additions & 52 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -977,23 +977,26 @@ pub const SnapshotHashes = struct {
}
};

pub const socket_tag = struct {
pub const GOSSIP: u8 = 0;
pub const REPAIR: u8 = 1;
pub const RPC: u8 = 2;
pub const RPC_PUBSUB: u8 = 3;
pub const SERVE_REPAIR: u8 = 4;
pub const TPU: u8 = 5;
pub const TPU_FORWARDS: u8 = 6;
pub const TPU_FORWARDS_QUIC: u8 = 7;
pub const TPU_QUIC: u8 = 8;
pub const TPU_VOTE: u8 = 9;
pub const socket_tag = enum(u8) {
GOSSIP = 0,
REPAIR = 1,
RPC = 2,
RPC_PUBSUB = 3,
SERVE_REPAIR = 4,
TPU = 5,
TPU_FORWARDS = 6,
TPU_FORWARDS_QUIC = 7,
TPU_QUIC = 8,
TPU_VOTE = 9,
/// Analogous to [SOCKET_TAG_TVU](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L36)
pub const TURBINE_RECV: u8 = 10;
TURBINE_RECV = 10,
/// Analogous to [SOCKET_TAG_TVU_QUIC](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L37)
pub const TURBINE_RECV_QUIC: u8 = 11;
TURBINE_RECV_QUIC = 11,
_,

pub const BincodeSize = u8;
};
pub const SOCKET_CACHE_SIZE: usize = socket_tag.TURBINE_RECV_QUIC + 1;
pub const SOCKET_CACHE_SIZE: usize = @intFromEnum(socket_tag.TURBINE_RECV_QUIC) + 1;

pub const ContactInfo = struct {
pubkey: Pubkey,
Expand Down Expand Up @@ -1065,7 +1068,7 @@ pub const ContactInfo = struct {
}

for (0..6) |_| {
sockets.append(.{ .key = 10, .index = 20, .offset = 30 }) catch unreachable;
sockets.append(.{ .key = .TURBINE_RECV, .index = 20, .offset = 30 }) catch unreachable;
}

return ContactInfo{
Expand All @@ -1084,26 +1087,30 @@ pub const ContactInfo = struct {
try sanitizeWallclock(self.wallclock);
}

pub fn getSocket(self: *const Self, key: u8) ?SocketAddr {
if (self.cache[key].eql(&SocketAddr.UNSPECIFIED)) {
pub fn getSocket(self: *const Self, key: socket_tag) ?SocketAddr {
const socket = &self.cache[@intFromEnum(key)];
if (socket.eql(&SocketAddr.UNSPECIFIED)) {
return null;
}
return self.cache[key];
return socket.*;
}

pub fn setSocket(self: *Self, key: u8, socket_addr: SocketAddr) !void {
pub fn setSocket(self: *Self, key: socket_tag, socket_addr: SocketAddr) !void {
self.removeSocket(key);

var offset = socket_addr.port();
var index: ?usize = null;
for (self.sockets.items, 0..) |socket, idx| {
offset = std.math.sub(u16, offset, socket.offset) catch {
index = idx;
break;
};
}
const offset: u16, const index: ?usize = blk: {
var offset = socket_addr.port();
const index = for (self.sockets.items, 0..) |socket, idx| {
offset = std.math.sub(u16, offset, socket.offset) catch break idx;
} else null;
break :blk .{ offset, index };
};

const entry = SocketEntry.init(key, try self.pushAddr(socket_addr.ip()), offset);
const entry: SocketEntry = .{
.key = key,
.index = try self.pushAddr(socket_addr.ip()),
.offset = offset,
};

if (index) |i| {
self.sockets.items[i].offset -= entry.offset;
Expand All @@ -1112,18 +1119,14 @@ pub const ContactInfo = struct {
try self.sockets.append(entry);
}

self.cache[key] = socket_addr;
self.cache[@intFromEnum(key)] = socket_addr;
}

pub fn removeSocket(self: *Self, key: u8) void {
pub fn removeSocket(self: *Self, key: socket_tag) void {
// find existing socket index
var existing_socket_index: ?usize = null;
for (self.sockets.items, 0..) |socket, idx| {
if (socket.key == key) {
existing_socket_index = idx;
break;
}
}
const existing_socket_index = for (self.sockets.items, 0..) |socket, idx| {
if (socket.key == key) break idx;
} else null;
// if found, remove it, it's associated IpAddr, set cache[key] to unspecified
if (existing_socket_index) |index| {
// first we remove this existing socket
Expand All @@ -1134,7 +1137,7 @@ pub const ContactInfo = struct {
next_entry.offset += removed_entry.offset;
}
self.removeAddrIfUnused(removed_entry.index);
self.cache[key] = SocketAddr.unspecified();
self.cache[@intFromEnum(key)] = SocketAddr.unspecified();
}
}

Expand Down Expand Up @@ -1218,21 +1221,16 @@ const Sockets = struct {
};

pub const SocketEntry = struct {
key: u8, // GossipMessageidentifier, e.g. turbine_recv, tpu, etc
index: u8, // IpAddr index in the accompanying addrs vector.
offset: u16, // Port offset with respect to the previous entry.

pub const @"!bincode-config:offset" = var_int_config_u16;
/// GossipMessageIdentifier, e.g. turbine_recv, tpu, etc
key: socket_tag,
/// IpAddr index in the accompanying addrs vector.
index: u8,
/// Port offset with respect to the previous entry.
offset: u16,

const Self = @This();

pub fn init(key: u8, index: u8, offset: u16) Self {
return Self{
.key = key,
.index = index,
.offset = offset,
};
}
pub const @"!bincode-config:offset" = var_int_config_u16;

pub fn eql(self: *const Self, other: *const Self) bool {
return self.key == other.key and
Expand Down Expand Up @@ -1327,7 +1325,7 @@ test "gossip.data: set & get socket on contact info" {
var set_socket = ci.getSocket(socket_tag.RPC);
try testing.expect(set_socket.?.eql(&SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899)));
try testing.expect(ci.addrs.items[0].eql(&IpAddr.newIpv4(127, 0, 0, 1)));
try testing.expect(ci.sockets.items[0].eql(&SocketEntry.init(socket_tag.RPC, 0, 8899)));
try testing.expect(ci.sockets.items[0].eql(&.{ .key = .RPC, .index = 0, .offset = 8899 }));
}

test "gossip.data: contact info bincode serialize matches rust bincode" {
Expand Down Expand Up @@ -1388,7 +1386,8 @@ test "gossip.data: ContactInfo bincode roundtrip maintains data integrity" {
test "gossip.data: SocketEntry serializer works" {
testing.log_level = .debug;

const se = SocketEntry.init(3, 3, 30304);
comptime std.debug.assert(@intFromEnum(socket_tag.RPC_PUBSUB) == 3);
const se: SocketEntry = .{ .key = .RPC_PUBSUB, .index = 3, .offset = 30304 };

var buf = std.ArrayList(u8).init(testing.allocator);
defer buf.deinit();
Expand Down

0 comments on commit 6adc450

Please sign in to comment.