From 4bd9aa1de5f494b1e2e978c125dcb97e6cc22cb0 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Mon, 10 Jun 2024 15:19:41 -0400 Subject: [PATCH] fix(shred-collector): error handling in shred processor --- src/shred_collector/service.zig | 2 +- src/shred_collector/shred.zig | 4 +- src/shred_collector/shred_processor.zig | 65 ++++++++++++++++--------- src/shred_collector/shred_tracker.zig | 20 ++++---- src/utils/collections.zig | 2 +- 5 files changed, 58 insertions(+), 35 deletions(-) diff --git a/src/shred_collector/service.zig b/src/shred_collector/service.zig index 3838f5506..8d03ef020 100644 --- a/src/shred_collector/service.zig +++ b/src/shred_collector/service.zig @@ -103,7 +103,7 @@ pub fn start( try service_manager.spawn( .{ .name = "Shred Processor" }, shred_collector.shred_processor.runShredProcessor, - .{ deps.allocator, verified_shred_channel, shred_tracker }, + .{ deps.allocator, deps.logger, verified_shred_channel, shred_tracker }, ); // repair (thread) diff --git a/src/shred_collector/shred.zig b/src/shred_collector/shred.zig index 7dec5faaf..c7b5604be 100644 --- a/src/shred_collector/shred.zig +++ b/src/shred_collector/shred.zig @@ -48,7 +48,7 @@ pub const Shred = union(ShredType) { } pub fn fromPayload(allocator: Allocator, payload: []const u8) !Self { - const variant = layout.getShredVariant(payload) orelse return error.uygugj; + const variant = layout.getShredVariant(payload) orelse return error.InvalidShredVariant; return switch (variant.shred_type) { .Code => .{ .Code = .{ .fields = try CodingShred.Fields.fromPayload(allocator, payload) } }, .Data => .{ .Data = .{ .fields = try DataShred.Fields.fromPayload(allocator, payload) } }, @@ -154,7 +154,7 @@ pub const DataShred = struct { return self.payload[consts.headers_size..size]; } - pub fn parent(self: *const Self) !Slot { + pub fn parent(self: *const Self) error{InvalidParentOffset}!Slot { const slot = self.fields.common.slot; if (self.fields.custom.parent_offset == 0 and slot != 0) { return error.InvalidParentOffset; diff --git a/src/shred_collector/shred_processor.zig b/src/shred_collector/shred_processor.zig index 5c1445da6..26eeeaea1 100644 --- a/src/shred_collector/shred_processor.zig +++ b/src/shred_collector/shred_processor.zig @@ -9,18 +9,20 @@ const ArrayList = std.ArrayList; const BasicShredTracker = shred_collector.shred_tracker.BasicShredTracker; const Channel = sig.sync.Channel; +const Logger = sig.trace.Logger; const Packet = sig.net.Packet; const Shred = shred_collector.shred.Shred; /// Analogous to [WindowService](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/window_service.rs#L395) pub fn runShredProcessor( allocator: Allocator, + logger: Logger, // shred verifier --> me verified_shred_receiver: *Channel(ArrayList(Packet)), tracker: *BasicShredTracker, ) !void { - var processed_count: usize = 0; var buf = ArrayList(ArrayList(Packet)).init(allocator); + var error_context = ErrorContext{}; while (true) { try verified_shred_receiver.tryDrainRecycle(&buf); if (buf.items.len == 0) { @@ -29,29 +31,48 @@ pub fn runShredProcessor( } for (buf.items) |packet_batch| { for (packet_batch.items) |*packet| if (!packet.flags.isSet(.discard)) { - const shred_payload = layout.getShred(packet) orelse continue; - const slot = layout.getSlot(shred_payload) orelse continue; - const index = layout.getIndex(shred_payload) orelse continue; - tracker.registerShred(slot, index) catch |err| switch (err) { - error.SlotUnderflow, error.SlotOverflow => continue, - else => return err, + processShred(allocator, tracker, packet, &error_context) catch |e| { + logger.errf( + "failed to process verified shred {?}.{?}: {}", + .{ error_context.slot, error_context.index, e }, + ); + error_context = .{}; }; - var shred = try Shred.fromPayload(allocator, shred_payload); - if (shred == Shred.Data) { - const parent = try shred.Data.parent(); - if (parent + 1 != slot) { - try tracker.skipSlots(parent, slot); - } - } - defer shred.deinit(); - if (shred.isLastInSlot()) { - tracker.setLastShred(slot, index) catch |err| switch (err) { - error.SlotUnderflow, error.SlotOverflow => continue, - else => return err, - }; - } - processed_count += 1; }; } } } + +const ErrorContext = struct { slot: ?u64 = null, index: ?u32 = null }; + +fn processShred( + allocator: Allocator, + tracker: *BasicShredTracker, + packet: *const Packet, + error_context: *ErrorContext, +) !void { + const shred_payload = layout.getShred(packet) orelse return error.InvalidPayload; + const slot = layout.getSlot(shred_payload) orelse return error.InvalidSlot; + errdefer error_context.slot = slot; + const index = layout.getIndex(shred_payload) orelse return error.InvalidIndex; + errdefer error_context.index = index; + + tracker.registerShred(slot, index) catch |err| switch (err) { + error.SlotUnderflow, error.SlotOverflow => return, + }; + var shred = try Shred.fromPayload(allocator, shred_payload); + defer shred.deinit(); + if (shred == Shred.Data) { + const parent = try shred.Data.parent(); + if (parent + 1 != slot) { + tracker.skipSlots(parent, slot) catch |err| switch (err) { + error.SlotUnderflow, error.SlotOverflow => {}, + }; + } + } + if (shred.isLastInSlot()) { + tracker.setLastShred(slot, index) catch |err| switch (err) { + error.SlotUnderflow, error.SlotOverflow => return, + }; + } +} diff --git a/src/shred_collector/shred_tracker.zig b/src/shred_collector/shred_tracker.zig index 159508263..54508caa2 100644 --- a/src/shred_collector/shred_tracker.zig +++ b/src/shred_collector/shred_tracker.zig @@ -57,7 +57,7 @@ pub const BasicShredTracker = struct { self: *Self, start_inclusive: Slot, end_exclusive: Slot, - ) !void { + ) SlotOutOfBounds!void { self.mux.lock(); defer self.mux.unlock(); @@ -75,17 +75,17 @@ pub const BasicShredTracker = struct { self: *Self, slot: Slot, shred_index: u64, - ) !void { + ) SlotOutOfBounds!void { self.mux.lock(); defer self.mux.unlock(); const monitored_slot = try self.observeSlot(slot); - const new = try monitored_slot.record(shred_index); + const new = monitored_slot.record(shred_index); if (new) self.logger.debugf("new slot: {}", .{slot}); self.max_slot_processed = @max(self.max_slot_processed, slot); } - pub fn setLastShred(self: *Self, slot: Slot, index: usize) !void { + pub fn setLastShred(self: *Self, slot: Slot, index: usize) SlotOutOfBounds!void { self.mux.lock(); defer self.mux.unlock(); @@ -98,7 +98,7 @@ pub const BasicShredTracker = struct { } /// returns whether it makes sense to send any repair requests - pub fn identifyMissing(self: *Self, slot_reports: *MultiSlotReport) !bool { + pub fn identifyMissing(self: *Self, slot_reports: *MultiSlotReport) (Allocator.Error || SlotOutOfBounds)!bool { if (self.start_slot == null) return false; self.mux.lock(); defer self.mux.unlock(); @@ -131,14 +131,14 @@ pub const BasicShredTracker = struct { /// - Record that a slot has been observed. /// - Acquire the slot's status for mutation. - fn observeSlot(self: *Self, slot: Slot) !*MonitoredSlot { + fn observeSlot(self: *Self, slot: Slot) SlotOutOfBounds!*MonitoredSlot { self.maybeSetStart(slot); self.max_slot_seen = @max(self.max_slot_seen, slot); const monitored_slot = try self.getMonitoredSlot(slot); return monitored_slot; } - fn getMonitoredSlot(self: *Self, slot: Slot) error{ SlotUnderflow, SlotOverflow }!*MonitoredSlot { + fn getMonitoredSlot(self: *Self, slot: Slot) SlotOutOfBounds!*MonitoredSlot { if (slot > self.current_bottom_slot + num_slots - 1) { return error.SlotOverflow; } @@ -179,6 +179,8 @@ pub const SlotReport = struct { const ShredSet = std.bit_set.ArrayBitSet(usize, MAX_SHREDS_PER_SLOT / 10); +pub const SlotOutOfBounds = error{ SlotUnderflow, SlotOverflow }; + const MonitoredSlot = struct { shreds: ShredSet = ShredSet.initEmpty(), max_seen: ?usize = null, @@ -189,7 +191,7 @@ const MonitoredSlot = struct { const Self = @This(); /// returns whether this is the first shred received for the slot - pub fn record(self: *Self, shred_index: usize) !bool { + pub fn record(self: *Self, shred_index: usize) bool { if (self.is_complete) return false; self.shreds.set(shred_index); if (self.max_seen == null) { @@ -201,7 +203,7 @@ const MonitoredSlot = struct { return false; } - pub fn identifyMissing(self: *Self, missing_shreds: *ArrayList(Range)) !void { + pub fn identifyMissing(self: *Self, missing_shreds: *ArrayList(Range)) Allocator.Error!void { missing_shreds.clearRetainingCapacity(); if (self.is_complete) return; const highest_shred_to_check = self.last_shred orelse self.max_seen orelse 0; diff --git a/src/utils/collections.zig b/src/utils/collections.zig index 7af83e02c..a25672925 100644 --- a/src/utils/collections.zig +++ b/src/utils/collections.zig @@ -43,7 +43,7 @@ pub fn RecyclingList( self.len = 0; } - pub fn addOne(self: *Self) !*T { + pub fn addOne(self: *Self) Allocator.Error!*T { if (self.len < self.private.items.len) { const item = &self.private.items[self.len]; resetItem(item);