Skip to content

Commit

Permalink
Merge pull request #171 from Syndica/dnut/fix/shred-processor-error-h…
Browse files Browse the repository at this point in the history
…andling

fix(shred-collector): SlotUnderflow bug, and generally improve error handling in shred processor
  • Loading branch information
0xNineteen authored Jun 12, 2024
2 parents 7ecf82f + 4bd9aa1 commit 10082ad
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/shred_collector/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub fn start(
try service_manager.spawn(
.{ .name = "Shred Processor" },
shred_collector.shred_processor.runShredProcessor,
.{ deps.allocator, verified_shred_channel, shred_tracker },
.{ deps.allocator, deps.logger, verified_shred_channel, shred_tracker },
);

// repair (thread)
Expand Down
4 changes: 2 additions & 2 deletions src/shred_collector/shred.zig
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub const Shred = union(ShredType) {
}

pub fn fromPayload(allocator: Allocator, payload: []const u8) !Self {
const variant = layout.getShredVariant(payload) orelse return error.uygugj;
const variant = layout.getShredVariant(payload) orelse return error.InvalidShredVariant;
return switch (variant.shred_type) {
.Code => .{ .Code = .{ .fields = try CodingShred.Fields.fromPayload(allocator, payload) } },
.Data => .{ .Data = .{ .fields = try DataShred.Fields.fromPayload(allocator, payload) } },
Expand Down Expand Up @@ -154,7 +154,7 @@ pub const DataShred = struct {
return self.payload[consts.headers_size..size];
}

pub fn parent(self: *const Self) !Slot {
pub fn parent(self: *const Self) error{InvalidParentOffset}!Slot {
const slot = self.fields.common.slot;
if (self.fields.custom.parent_offset == 0 and slot != 0) {
return error.InvalidParentOffset;
Expand Down
65 changes: 43 additions & 22 deletions src/shred_collector/shred_processor.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ const ArrayList = std.ArrayList;

const BasicShredTracker = shred_collector.shred_tracker.BasicShredTracker;
const Channel = sig.sync.Channel;
const Logger = sig.trace.Logger;
const Packet = sig.net.Packet;
const Shred = shred_collector.shred.Shred;

/// Analogous to [WindowService](https://github.com/anza-xyz/agave/blob/aa2f078836434965e1a5a03af7f95c6640fe6e1e/core/src/window_service.rs#L395)
pub fn runShredProcessor(
allocator: Allocator,
logger: Logger,
// shred verifier --> me
verified_shred_receiver: *Channel(ArrayList(Packet)),
tracker: *BasicShredTracker,
) !void {
var processed_count: usize = 0;
var buf = ArrayList(ArrayList(Packet)).init(allocator);
var error_context = ErrorContext{};
while (true) {
try verified_shred_receiver.tryDrainRecycle(&buf);
if (buf.items.len == 0) {
Expand All @@ -29,29 +31,48 @@ pub fn runShredProcessor(
}
for (buf.items) |packet_batch| {
for (packet_batch.items) |*packet| if (!packet.flags.isSet(.discard)) {
const shred_payload = layout.getShred(packet) orelse continue;
const slot = layout.getSlot(shred_payload) orelse continue;
const index = layout.getIndex(shred_payload) orelse continue;
tracker.registerShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => continue,
else => return err,
processShred(allocator, tracker, packet, &error_context) catch |e| {
logger.errf(
"failed to process verified shred {?}.{?}: {}",
.{ error_context.slot, error_context.index, e },
);
error_context = .{};
};
var shred = try Shred.fromPayload(allocator, shred_payload);
if (shred == Shred.Data) {
const parent = try shred.Data.parent();
if (parent + 1 != slot) {
try tracker.skipSlots(parent, slot);
}
}
defer shred.deinit();
if (shred.isLastInSlot()) {
tracker.setLastShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => continue,
else => return err,
};
}
processed_count += 1;
};
}
}
}

const ErrorContext = struct { slot: ?u64 = null, index: ?u32 = null };

fn processShred(
allocator: Allocator,
tracker: *BasicShredTracker,
packet: *const Packet,
error_context: *ErrorContext,
) !void {
const shred_payload = layout.getShred(packet) orelse return error.InvalidPayload;
const slot = layout.getSlot(shred_payload) orelse return error.InvalidSlot;
errdefer error_context.slot = slot;
const index = layout.getIndex(shred_payload) orelse return error.InvalidIndex;
errdefer error_context.index = index;

tracker.registerShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => return,
};
var shred = try Shred.fromPayload(allocator, shred_payload);
defer shred.deinit();
if (shred == Shred.Data) {
const parent = try shred.Data.parent();
if (parent + 1 != slot) {
tracker.skipSlots(parent, slot) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => {},
};
}
}
if (shred.isLastInSlot()) {
tracker.setLastShred(slot, index) catch |err| switch (err) {
error.SlotUnderflow, error.SlotOverflow => return,
};
}
}
20 changes: 11 additions & 9 deletions src/shred_collector/shred_tracker.zig
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub const BasicShredTracker = struct {
self: *Self,
start_inclusive: Slot,
end_exclusive: Slot,
) !void {
) SlotOutOfBounds!void {
self.mux.lock();
defer self.mux.unlock();

Expand All @@ -75,17 +75,17 @@ pub const BasicShredTracker = struct {
self: *Self,
slot: Slot,
shred_index: u64,
) !void {
) SlotOutOfBounds!void {
self.mux.lock();
defer self.mux.unlock();

const monitored_slot = try self.observeSlot(slot);
const new = try monitored_slot.record(shred_index);
const new = monitored_slot.record(shred_index);
if (new) self.logger.debugf("new slot: {}", .{slot});
self.max_slot_processed = @max(self.max_slot_processed, slot);
}

pub fn setLastShred(self: *Self, slot: Slot, index: usize) !void {
pub fn setLastShred(self: *Self, slot: Slot, index: usize) SlotOutOfBounds!void {
self.mux.lock();
defer self.mux.unlock();

Expand All @@ -98,7 +98,7 @@ pub const BasicShredTracker = struct {
}

/// returns whether it makes sense to send any repair requests
pub fn identifyMissing(self: *Self, slot_reports: *MultiSlotReport) !bool {
pub fn identifyMissing(self: *Self, slot_reports: *MultiSlotReport) (Allocator.Error || SlotOutOfBounds)!bool {
if (self.start_slot == null) return false;
self.mux.lock();
defer self.mux.unlock();
Expand Down Expand Up @@ -131,14 +131,14 @@ pub const BasicShredTracker = struct {

/// - Record that a slot has been observed.
/// - Acquire the slot's status for mutation.
fn observeSlot(self: *Self, slot: Slot) !*MonitoredSlot {
fn observeSlot(self: *Self, slot: Slot) SlotOutOfBounds!*MonitoredSlot {
self.maybeSetStart(slot);
self.max_slot_seen = @max(self.max_slot_seen, slot);
const monitored_slot = try self.getMonitoredSlot(slot);
return monitored_slot;
}

fn getMonitoredSlot(self: *Self, slot: Slot) error{ SlotUnderflow, SlotOverflow }!*MonitoredSlot {
fn getMonitoredSlot(self: *Self, slot: Slot) SlotOutOfBounds!*MonitoredSlot {
if (slot > self.current_bottom_slot + num_slots - 1) {
return error.SlotOverflow;
}
Expand Down Expand Up @@ -179,6 +179,8 @@ pub const SlotReport = struct {

const ShredSet = std.bit_set.ArrayBitSet(usize, MAX_SHREDS_PER_SLOT / 10);

pub const SlotOutOfBounds = error{ SlotUnderflow, SlotOverflow };

const MonitoredSlot = struct {
shreds: ShredSet = ShredSet.initEmpty(),
max_seen: ?usize = null,
Expand All @@ -189,7 +191,7 @@ const MonitoredSlot = struct {
const Self = @This();

/// returns whether this is the first shred received for the slot
pub fn record(self: *Self, shred_index: usize) !bool {
pub fn record(self: *Self, shred_index: usize) bool {
if (self.is_complete) return false;
self.shreds.set(shred_index);
if (self.max_seen == null) {
Expand All @@ -201,7 +203,7 @@ const MonitoredSlot = struct {
return false;
}

pub fn identifyMissing(self: *Self, missing_shreds: *ArrayList(Range)) !void {
pub fn identifyMissing(self: *Self, missing_shreds: *ArrayList(Range)) Allocator.Error!void {
missing_shreds.clearRetainingCapacity();
if (self.is_complete) return;
const highest_shred_to_check = self.last_shred orelse self.max_seen orelse 0;
Expand Down
2 changes: 1 addition & 1 deletion src/utils/collections.zig
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn RecyclingList(
self.len = 0;
}

pub fn addOne(self: *Self) !*T {
pub fn addOne(self: *Self) Allocator.Error!*T {
if (self.len < self.private.items.len) {
const item = &self.private.items[self.len];
resetItem(item);
Expand Down

0 comments on commit 10082ad

Please sign in to comment.