Skip to content

Commit

Permalink
socket_utils simplifcations & gossip fixes
Browse files Browse the repository at this point in the history
* Always use nonblocking mode for recvMmsg.
* We don't care about windows, so eschew the win32 heap allocator.
* Rename `task_allocator` to `message_allocator`, and pass it to
  `processMessages` as well to properly free the metadata with
  `shallowFree`.
  • Loading branch information
InKryption committed May 30, 2024
1 parent a74dec5 commit f321947
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 43 deletions.
4 changes: 2 additions & 2 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const requestIpEcho = @import("../net/echo.zig").requestIpEcho;
const servePrometheus = @import("../prometheus/http.zig").servePrometheus;
const parallelUnpackZstdTarBall = @import("../accountsdb/snapshots.zig").parallelUnpackZstdTarBall;
const downloadSnapshotsFromGossip = @import("../accountsdb/download.zig").downloadSnapshotsFromGossip;
const SOCKET_TIMEOUT = @import("../net/socket_utils.zig").SOCKET_TIMEOUT;
const SOCKET_TIMEOUT_US = @import("../net/socket_utils.zig").SOCKET_TIMEOUT_US;

const config = @import("config.zig");
// var validator_config = config.current;
Expand Down Expand Up @@ -411,7 +411,7 @@ fn validator() !void {
// repair
var repair_socket = try Socket.create(network.AddressFamily.ipv4, network.Protocol.udp);
try repair_socket.bindToPort(repair_port);
try repair_socket.setReadTimeout(SOCKET_TIMEOUT);
try repair_socket.setReadTimeout(SOCKET_TIMEOUT_US);

var repair_svc = try initRepair(
logger,
Expand Down
43 changes: 20 additions & 23 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub const GossipService = struct {
const gossip_address = my_contact_info.getSocket(socket_tag.GOSSIP) orelse return error.GossipAddrUnspecified;
var gossip_socket = UdpSocket.create(.ipv4, .udp) catch return error.SocketCreateFailed;
gossip_socket.bindToPort(gossip_address.port()) catch return error.SocketBindFailed;
gossip_socket.setReadTimeout(socket_utils.SOCKET_TIMEOUT) catch return error.SocketSetTimeoutFailed; // 1 second
gossip_socket.setReadTimeout(socket_utils.SOCKET_TIMEOUT_US) catch return error.SocketSetTimeoutFailed; // 1 second

const failed_pull_hashes = HashTimeQueue.init(allocator);
const push_msg_q = ArrayList(SignedGossipData).init(allocator);
Expand Down Expand Up @@ -295,19 +295,12 @@ pub const GossipService = struct {
// TODO: use better allocator, unless GPA becomes more performant.
var gp_task_allocator: std.heap.GeneralPurposeAllocator(.{ .thread_safe = !builtin.single_threaded }) = .{};
defer _ = gp_task_allocator.deinit();
const message_allocator = gp_task_allocator.allocator();

var heap_task_allocator = if (builtin.os.tag == .windows) std.heap.HeapAllocator.init() else {};
defer if (builtin.os.tag == .windows) heap_task_allocator.deinit();

const task_allocator = switch (builtin.os.tag) {
.windows => heap_task_allocator.allocator(),
else => gp_task_allocator.allocator(),
};

const packet_verifier_handle = try Thread.spawn(.{}, verifyPackets, .{ self, task_allocator });
const packet_verifier_handle = try Thread.spawn(.{}, verifyPackets, .{ self, message_allocator });
defer self.joinAndExit(packet_verifier_handle);

const packet_handle = try Thread.spawn(.{}, processMessages, .{self});
const packet_handle = try Thread.spawn(.{}, processMessages, .{ self, message_allocator });
defer self.joinAndExit(packet_handle);

const maybe_build_messages_handle = if (!spy_node) try Thread.spawn(.{}, buildMessages, .{self}) else null;
Expand All @@ -332,7 +325,7 @@ pub const GossipService = struct {

const VerifyMessageTask = ThreadPoolTask(VerifyMessageEntry);
const VerifyMessageEntry = struct {
allocator: std.mem.Allocator,
message_allocator: std.mem.Allocator,
packet_batch: ArrayList(Packet),
verified_incoming_channel: *Channel(GossipMessageWithEndpoint),
logger: Logger,
Expand All @@ -342,7 +335,7 @@ pub const GossipService = struct {

for (@as([]const Packet, self.packet_batch.items)) |*packet| {
var message = bincode.readFromSlice(
self.allocator,
self.message_allocator,
GossipMessage,
packet.data[0..packet.size],
bincode.Params.standard,
Expand All @@ -353,7 +346,7 @@ pub const GossipService = struct {

message.sanitize() catch {
self.logger.errf("gossip: packet_verify: failed to sanitize", .{});
bincode.free(self.allocator, message);
bincode.free(self.message_allocator, message);
continue;
};

Expand All @@ -362,7 +355,7 @@ pub const GossipService = struct {
"gossip: packet_verify: failed to verify signature: {} from {}",
.{ e, packet.addr },
);
bincode.free(self.allocator, message);
bincode.free(self.message_allocator, message);
continue;
};

Expand All @@ -382,14 +375,14 @@ pub const GossipService = struct {
self: *Self,
/// Must be thread-safe. Can be a specific allocator which will
/// only be contended for by the tasks spawned by in function.
task_allocator: std.mem.Allocator,
message_allocator: std.mem.Allocator,
) !void {
const tasks = try VerifyMessageTask.init(self.allocator, GOSSIP_VERIFY_PACKET_PARALLEL_TASKS);
defer self.allocator.free(tasks);

// pre-allocate all the tasks
for (tasks) |*task| task.entry = .{
.allocator = task_allocator,
.message_allocator = message_allocator,
.verified_incoming_channel = self.verified_incoming_channel,
.packet_batch = undefined,
.logger = self.logger,
Expand Down Expand Up @@ -418,8 +411,7 @@ pub const GossipService = struct {
task_ptr.entry.packet_batch = packet_batch;
task_ptr.result catch |err| self.logger.errf("VerifyMessageTask encountered error: {s}", .{@errorName(err)});

const batch = Batch.from(&task_ptr.task);
self.thread_pool.schedule(batch);
self.thread_pool.schedule(task_ptr.asBatch());
}
}

Expand Down Expand Up @@ -460,7 +452,12 @@ pub const GossipService = struct {
};

/// main logic for recieving and processing gossip messages.
pub fn processMessages(self: *Self) !void {
pub fn processMessages(
self: *Self,
/// Should be the same allocator passed to the corresponding `verifyPackets` thread,
/// used for freeing message metadata.
message_allocator: std.mem.Allocator,
) !void {
var timer = std.time.Timer.start() catch unreachable;
var last_table_trim_ts: u64 = 0;
var msg_count: usize = 0;
Expand Down Expand Up @@ -529,7 +526,7 @@ pub const GossipService = struct {
// would be safer. For more info, see:
// - GossipTable.remove
// - https://github.com/Syndica/sig/pull/69
msg.message.shallowFree(self.allocator);
msg.message.shallowFree(message_allocator);
}
self.verified_incoming_channel.allocator.free(messages);
}
Expand Down Expand Up @@ -2768,7 +2765,7 @@ test "gossip.gossip_service: process contact info push packet" {
var packet_handle = try Thread.spawn(
.{},
GossipService.processMessages,
.{&gossip_service},
.{ &gossip_service, std.testing.allocator },
);

// send a push message
Expand Down Expand Up @@ -2937,7 +2934,7 @@ pub const BenchmarkGossipServiceGeneral = struct {
// reset stats
defer gossip_service.stats.reset();

var packet_handle = try Thread.spawn(.{}, GossipService.run, .{
const packet_handle = try Thread.spawn(.{}, GossipService.run, .{
&gossip_service,
true, // dont build any outgoing messages
false,
Expand Down
23 changes: 9 additions & 14 deletions src/net/socket_utils.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ const Packet = @import("packet.zig").Packet;
const PACKET_DATA_SIZE = @import("packet.zig").PACKET_DATA_SIZE;
const Channel = @import("../sync/channel.zig").Channel;
const std = @import("std");
const builtin = @import("builtin");
const Logger = @import("../trace/log.zig").Logger;

pub const SOCKET_TIMEOUT: usize = 1000000;
pub const SOCKET_TIMEOUT_US: usize = 1000000;
pub const PACKETS_PER_BATCH: usize = 64;

pub fn readSocket(
Expand All @@ -28,13 +29,16 @@ pub fn readSocket(
var packet_batch = try std.ArrayList(Packet).initCapacity(allocator, PACKETS_PER_BATCH);
errdefer packet_batch.deinit();

// NOTE: usually this would be null (ie, blocking)
// but in order to exit cleanly in tests we set to 1 second
try socket.setReadTimeout(1 * std.time.ms_per_s);
try socket.setReadTimeout(SOCKET_TIMEOUT_US);

if (builtin.is_test) {
// NOTE: usually this would be null (ie, blocking)
// but in order to exit cleanly in tests we set to 1 second
try socket.setReadTimeout(1 * std.time.ms_per_s);
}

// recv packets into batch
while (true) {
const old_count = packet_batch.items.len;
packet_batch.items.len += recvMmsg(socket, packet_batch.unusedCapacitySlice(), exit) catch |err| {
logger.debugf("readSocket error: {s}", .{@errorName(err)});
continue;
Expand All @@ -44,11 +48,6 @@ pub fn readSocket(
if (packet_batch.items.len == packet_batch.capacity) {
break;
}

if (old_count == 0) {
// set to nonblocking mode
try socket.setReadTimeout(SOCKET_TIMEOUT);
}
}

packet_batch.shrinkAndFree(packet_batch.items.len);
Expand Down Expand Up @@ -86,10 +85,6 @@ pub fn recvMmsg(
packet.addr = recv_meta.sender;
packet.size = bytes_read;

if (count == 0) {
// nonblocking mode
try socket.setReadTimeout(SOCKET_TIMEOUT);
}
count += 1;
}

Expand Down
7 changes: 3 additions & 4 deletions src/utils/tar.zig
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn parallelUntarToFileSystem(
const strip_components: u32 = 0;
loop: while (true) {
const header_buf = try allocator.alloc(u8, 512);
if (header_buf.len != try reader.readAtLeast(header_buf, 512)) {
if (try reader.readAtLeast(header_buf, 512) != 512) {
std.debug.panic("Actual file size too small for header (< 512).", .{});
}

Expand Down Expand Up @@ -140,7 +140,7 @@ pub fn parallelUntarToFileSystem(

const contents = try allocator.alloc(u8, file_size);
const actual_contents_len = try reader.readAtLeast(contents, file_size);
if (contents.len != actual_contents_len) {
if (actual_contents_len != file_size) {
std.debug.panic("Reported file ({d}) size does not match actual file size ({d})", .{ contents.len, actual_contents_len });
}

Expand All @@ -156,8 +156,7 @@ pub fn parallelUntarToFileSystem(
.header_buf = header_buf,
};

const batch = ThreadPool.Batch.from(&task_ptr.task);
thread_pool.schedule(batch);
thread_pool.schedule(task_ptr.asBatch());
},
.global_extended_header, .extended_header => {
return error.TarUnsupportedFileType;
Expand Down
4 changes: 4 additions & 0 deletions src/utils/thread.zig
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ pub fn ThreadPoolTask(comptime Entry: type) type {
self.result = self.entry.callback();
}

pub inline fn asBatch(self: *Self) Batch {
return Batch.from(&self.task);
}

/// Waits for any of the tasks in the slice to become available. Once one does,
/// it is atomically set to be unavailable, and its index is returned.
pub fn awaitAndAcquireFirstAvailableTask(tasks: []Self, start_index: usize) usize {
Expand Down

0 comments on commit f321947

Please sign in to comment.