diff --git a/src/ledger/insert_shred.zig b/src/ledger/insert_shred.zig index 5a696c29..31fefa8a 100644 --- a/src/ledger/insert_shred.zig +++ b/src/ledger/insert_shred.zig @@ -4,7 +4,6 @@ const sig = @import("../sig.zig"); const ledger = sig.ledger; const meta = ledger.meta; const schema = ledger.schema.schema; -const shred_mod = sig.ledger.shred; const Allocator = std.mem.Allocator; const ArrayList = std.ArrayList; @@ -29,7 +28,11 @@ const SortedMap = sig.utils.collections.SortedMap; const Timer = sig.time.Timer; const BlockstoreDB = ledger.blockstore.BlockstoreDB; -const ColumnFamily = ledger.database.ColumnFamily; +const IndexMetaWorkingSetEntry = ledger.insert_shreds_working_state.IndexMetaWorkingSetEntry; +const InsertShredsWorkingState = ledger.insert_shreds_working_state.InsertShredsWorkingState; +const PossibleDuplicateShred = ledger.insert_shreds_working_state.PossibleDuplicateShred; +const WorkingEntry = ledger.insert_shreds_working_state.WorkingEntry; +const WorkingShredStore = ledger.insert_shreds_working_state.WorkingShredStore; const WriteBatch = BlockstoreDB.WriteBatch; const ErasureMeta = meta.ErasureMeta; @@ -139,24 +142,23 @@ pub const ShredInserter = struct { retransmit_sender: ?PointerClosure([]const []const u8, void), ) !InsertShredsResult { /////////////////////////// - // setup state for the rest of function + // check inputs for validity and edge cases // if (shreds.len == 0) return .{ .completed_data_set_infos = ArrayList(CompletedDataSetInfo).init(self.allocator), .duplicate_shreds = ArrayList(PossibleDuplicateShred).init(self.allocator), }; - + std.debug.assert(shreds.len == is_repaired.len); self.metrics.num_shreds.add(shreds.len); + + /////////////////////////// + // prepare state to insert shreds + // const allocator = self.allocator; - var reed_solomon_cache = try ReedSolomonCache.init(allocator); - defer reed_solomon_cache.deinit(); - std.debug.assert(shreds.len == is_repaired.len); var total_timer = try Timer.start(); - var write_batch = try self.db.initWriteBatch(); - defer write_batch.deinit(); - - var state = InsertShredsState.init(self.allocator); + var state = try InsertShredsWorkingState.init(self.allocator, self.logger, &self.db); defer state.deinit(); + var write_batch = state.write_batch; var get_lock_timer = try Timer.start(); self.lock.lock(); @@ -215,17 +217,19 @@ pub const ShredInserter = struct { } self.metrics.insert_shreds_elapsed_us.add(shred_insertion_timer.read().asMicros()); - ///////////////////// - // recover shreds + ///////////////////////////////////// + // recover shreds and insert them // var shred_recovery_timer = try Timer.start(); var valid_recovered_shreds = ArrayList([]const u8).init(allocator); defer valid_recovered_shreds.deinit(); if (leader_schedule) |slot_leader_provider| { + var reed_solomon_cache = try ReedSolomonCache.init(allocator); + defer reed_solomon_cache.deinit(); const recovered_shreds = try self.tryShredRecovery( &state.erasure_metas, &state.index_working_set, - &state.just_inserted_shreds, + state.shredStore(), &reed_solomon_cache, ); defer { @@ -283,11 +287,11 @@ pub const ShredInserter = struct { self.metrics.shred_recovery_elapsed_us.add(shred_recovery_timer.read().asMicros()); /////////////////////////// - // slot meta: chain and persist + // chain slot metas // - var chaining_timer = try Timer.start(); // Handle chaining for the members of the slot_meta_working_set that were inserted into, // drop the others + var chaining_timer = try Timer.start(); try ledger.slot_chaining.handleChaining( allocator, &self.db, @@ -296,18 +300,11 @@ pub const ShredInserter = struct { ); self.metrics.chaining_elapsed_us.add(chaining_timer.read().asMicros()); - var commit_timer = try Timer.start(); - _ = try commitSlotMetaWorkingSet( - allocator, - &state.slot_meta_working_set, - &.{}, // TODO senders - &write_batch, - ); - // TODO return value - - /////////////////////////// - // process erasure metas + ////////////////////////////////////////////////////// + // check forward chaining for each erasure set // + var merkle_chaining_timer = try Timer.start(); + const em0_keys, const em0_values = state.erasure_metas.items(); for (em0_keys, em0_values) |erasure_set, working_em| if (working_em == .dirty) { const slot = erasure_set.slot; @@ -330,14 +327,14 @@ pub const ShredInserter = struct { &self.db, shred.code, erasure_meta, - &state.just_inserted_shreds, + state.shredStore(), &state.merkle_root_metas, &state.duplicate_shreds, ); }; - /////////////////////////// - // process merkle root metas + ////////////////////////////////////////////////////// + // check backward chaining for each merkle root // var merkle_root_metas_iter = state.merkle_root_metas.iterator(); while (merkle_root_metas_iter.next()) |mrm_entry| { @@ -363,34 +360,18 @@ pub const ShredInserter = struct { self.logger, &self.db, shred, - &state.just_inserted_shreds, + state.shredStore(), &state.erasure_metas, &state.duplicate_shreds, ); } - /////////////////////////// - // put working entries items in write batch - // - try persistWorkingEntries(&write_batch, schema.erasure_meta, &state.erasure_metas); - try persistWorkingEntries(&write_batch, schema.merkle_root_meta, &state.merkle_root_metas); - - var index_working_set_iterator = state.index_working_set.iterator(); - while (index_working_set_iterator.next()) |entry| { - const working_entry = entry.value_ptr; - if (working_entry.did_insert_occur) { - try write_batch.put(schema.index, entry.key_ptr.*, working_entry.index); - } - } - - self.metrics.commit_working_sets_elapsed_us.add(commit_timer.read().asMicros()); + self.metrics.merkle_chaining_elapsed_us.add(merkle_chaining_timer.read().asMicros()); /////////////////////////// - // commit batch and return + // commit and return // - var write_timer = try Timer.start(); - try self.db.commit(write_batch); - self.metrics.write_batch_elapsed_us.add(write_timer.read().asMicros()); + try state.commit(); // TODO send signals @@ -402,28 +383,12 @@ pub const ShredInserter = struct { }; } - fn persistWorkingEntries( - write_batch: *WriteBatch, - cf: ColumnFamily, - /// Map(cf.Key, WorkingEntry(cf.Value)) - working_entries_map: anytype, - ) !void { - var iterator = working_entries_map.iterator(); - while (iterator.next()) |entry| { - const key = entry.key_ptr.*; - const value = entry.value_ptr; - if (value.* == .dirty) { - try write_batch.put(cf, key, value.asRef().*); - } - } - } - /// agave: check_insert_coding_shred /// TODO: break this up fn checkInsertCodeShred( self: *Self, shred: CodeShred, - state: *InsertShredsState, + state: *InsertShredsWorkingState, write_batch: *WriteBatch, is_trusted: bool, shred_source: ShredSource, @@ -431,17 +396,11 @@ pub const ShredInserter = struct { const slot = shred.fields.common.slot; const shred_index: u64 = @intCast(shred.fields.common.index); - const index_meta_working_set_entry = - try self.getIndexMetaEntry(self.allocator, slot, &state.index_working_set); + const index_meta_working_set_entry = try state.getIndexMetaEntry(slot); const index_meta = &index_meta_working_set_entry.index; const erasure_set_id = shred.fields.common.erasureSetId(); - // TODO: redundant get or put pattern - if (!state.merkle_root_metas.contains(erasure_set_id)) { - if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { - try state.merkle_root_metas.put(erasure_set_id, .{ .clean = meta_ }); - } - } + try state.loadMerkleRootMeta(erasure_set_id); // This gives the index of first code shred in this FEC block // So, all code shreds in a given FEC block will have the same set index @@ -462,7 +421,7 @@ pub const ShredInserter = struct { // Compare our current shred against the previous shred for potential // conflicts if (!try self.checkMerkleRootConsistency( - &state.just_inserted_shreds, + state.shredStore(), slot, merkle_root_meta.asRef(), &.{ .code = shred }, @@ -473,18 +432,7 @@ pub const ShredInserter = struct { } } - // TODO: redundant get or put pattern - const erasure_meta_entry = try state.erasure_metas.getOrPut(erasure_set_id); - if (!erasure_meta_entry.found_existing) { - if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { - erasure_meta_entry.value_ptr.* = .{ .clean = meta_ }; - } else { - erasure_meta_entry.value_ptr.* = .{ - .dirty = ErasureMeta.fromCodeShred(shred) orelse return error.Unwrap, - }; - } - } - const erasure_meta = erasure_meta_entry.value_ptr.asRef(); + const erasure_meta = try state.getOrPutErasureMeta(erasure_set_id, shred); // NOTE perf: maybe this can be skipped for trusted shreds. // agave runs this regardless of trust, but we can check if it has @@ -492,11 +440,11 @@ pub const ShredInserter = struct { if (!erasure_meta.checkCodeShred(shred)) { self.metrics.num_code_shreds_invalid_erasure_config.inc(); if (!try self.hasDuplicateShredsInSlot(slot)) { - if (try self.findConflictingCodeShred( + if (try findConflictingCodeShred( + state.shredStore(), shred, slot, erasure_meta, - &state.just_inserted_shreds, )) |conflicting_shred| { // TODO: reduce nesting self.db.put(schema.duplicate_slots, slot, .{ @@ -547,12 +495,9 @@ pub const ShredInserter = struct { const result = if (insertCodeShred(index_meta, shred, write_batch)) |_| blk: { index_meta_working_set_entry.did_insert_occur = true; self.metrics.num_inserted.inc(); - const entry = try state.merkle_root_metas.getOrPut(erasure_set_id); - if (!entry.found_existing) { - entry.value_ptr.* = .{ .dirty = MerkleRootMeta.fromFirstReceivedShred(shred) }; - } + try state.initMerkleRootMetaIfMissing(erasure_set_id, shred); break :blk true; - } else |_| false; + } else |_| false; // TODO: should this return immediately on error? const shred_entry = try state.just_inserted_shreds.getOrPut(shred.fields.id()); if (!shred_entry.found_existing) { @@ -571,17 +516,16 @@ pub const ShredInserter = struct { /// agave: find_conflicting_coding_shred fn findConflictingCodeShred( - self: *Self, - _: CodeShred, + shred_store: WorkingShredStore, + _: CodeShred, // TODO: figure out why this is here. delete it or add what is missing. slot: Slot, erasure_meta: *const ErasureMeta, - just_inserted_shreds: *const AutoHashMap(ShredId, Shred), ) !?[]const u8 { // TODO consider lifetime // Search for the shred which set the initial erasure config, either inserted, // or in the current batch in just_inserted_shreds. const index: u32 = @intCast(erasure_meta.first_received_code_index); const shred_id = ShredId{ .slot = slot, .index = index, .shred_type = .code }; - const maybe_shred = try getShredFromJustInsertedOrDb(&self.db, just_inserted_shreds, shred_id); + const maybe_shred = try shred_store.get(shred_id); if (index != 0 or maybe_shred != null) { return maybe_shred; @@ -594,7 +538,7 @@ pub const ShredInserter = struct { fn checkInsertDataShred( self: *Self, shred: DataShred, - state: *InsertShredsState, + state: *InsertShredsWorkingState, write_batch: *WriteBatch, is_trusted: bool, leader_schedule: ?SlotLeaderProvider, @@ -604,26 +548,13 @@ pub const ShredInserter = struct { const shred_index: u64 = @intCast(shred.fields.common.index); const shred_union = Shred{ .data = shred }; - const index_meta_working_set_entry = try self.getIndexMetaEntry( - self.allocator, - slot, - &state.index_working_set, - ); + const index_meta_working_set_entry = try state.getIndexMetaEntry(slot); const index_meta = &index_meta_working_set_entry.index; - const slot_meta_entry = try self.getSlotMetaEntry( - &state.slot_meta_working_set, - slot, - try shred.parent(), - ); + const slot_meta_entry = try state.getSlotMetaEntry(slot, try shred.parent()); const slot_meta = &slot_meta_entry.new_slot_meta; const erasure_set_id = shred.fields.common.erasureSetId(); - // TODO: redundant get or put pattern - if (!state.merkle_root_metas.contains(erasure_set_id)) { - if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { - try state.merkle_root_metas.put(erasure_set_id, .{ .clean = meta_ }); - } - } + try state.loadMerkleRootMeta(erasure_set_id); if (!is_trusted) { if (isDataShredPresent(shred, slot_meta, &index_meta.data_index)) { @@ -653,7 +584,7 @@ pub const ShredInserter = struct { if (!try self.shouldInsertDataShred( shred, slot_meta, - &state.just_inserted_shreds, + state.shredStore(), self.max_root.load(.unordered), leader_schedule, shred_source, @@ -667,7 +598,7 @@ pub const ShredInserter = struct { // Compare our current shred against the previous shred for potential // conflicts if (!try self.checkMerkleRootConsistency( - &state.just_inserted_shreds, + state.shredStore(), slot, merkle_root_meta.asRef(), &shred_union, @@ -685,77 +616,16 @@ pub const ShredInserter = struct { write_batch, shred_source, ); - const entry = try state.merkle_root_metas.getOrPut(erasure_set_id); - if (!entry.found_existing) { - entry.value_ptr.* = .{ .dirty = MerkleRootMeta.fromFirstReceivedShred(shred) }; - } + try state.initMerkleRootMetaIfMissing(erasure_set_id, shred); try state.just_inserted_shreds.put(shred.fields.id(), shred_union); // TODO check first? index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; - // TODO: redundant get or put pattern - if (!state.erasure_metas.contains(erasure_set_id)) { - if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { - try state.erasure_metas.put(erasure_set_id, .{ .clean = meta_ }); - } - } + try state.loadErasureMeta(erasure_set_id); return newly_completed_data_sets; } - /// agave: get_index_meta_entry - fn getIndexMetaEntry( - self: *Self, - allocator: std.mem.Allocator, - slot: Slot, - working_set: *AutoHashMap(Slot, IndexMetaWorkingSetEntry), - ) !*IndexMetaWorkingSetEntry { - // TODO: redundant get or put pattern - var timer = try Timer.start(); - const entry = try working_set.getOrPut(slot); - if (!entry.found_existing) { - if (try self.db.get(self.allocator, schema.index, slot)) |item| { - entry.value_ptr.* = .{ .index = item }; - } else { - entry.value_ptr.* = IndexMetaWorkingSetEntry.init(allocator, slot); - } - } - self.metrics.index_meta_time_us.add(timer.read().asMicros()); - return entry.value_ptr; - } - - /// agave: get_slot_meta_entry - fn getSlotMetaEntry( - self: *Self, - working_set: *AutoHashMap(u64, SlotMetaWorkingSetEntry), - slot: Slot, - parent_slot: Slot, - ) !*SlotMetaWorkingSetEntry { - // TODO: redundant get or put pattern - const entry = try working_set.getOrPut(slot); - if (!entry.found_existing) { - if (try self.db.get(self.allocator, schema.slot_meta, slot)) |backup| { - var slot_meta: SlotMeta = try backup.clone(self.allocator); - // If parent_slot == None, then this is one of the orphans inserted - // during the chaining process, see the function find_slot_meta_in_cached_state() - // for details. Slots that are orphans are missing a parent_slot, so we should - // fill in the parent now that we know it. - if (slot_meta.isOrphan()) { - slot_meta.parent_slot = parent_slot; - } - entry.value_ptr.* = .{ - .new_slot_meta = slot_meta, - .old_slot_meta = backup, - }; - } else { - entry.value_ptr.* = .{ - .new_slot_meta = SlotMeta.init(self.allocator, slot, parent_slot), - }; - } - } - return entry.value_ptr; - } - /// agave: insert_coding_shred fn insertCodeShred( index_meta: *meta.Index, @@ -788,7 +658,7 @@ pub const ShredInserter = struct { self: *Self, shred: DataShred, slot_meta: *const SlotMeta, - just_inserted_shreds: *AutoHashMap(ShredId, Shred), + shred_store: WorkingShredStore, max_root: Slot, leader_schedule: ?SlotLeaderProvider, shred_source: ShredSource, @@ -813,7 +683,7 @@ pub const ShredInserter = struct { .shred_type = .data, }; // FIXME: leak - decide how to free shred - const maybe_shred = try getShredFromJustInsertedOrDb(&self.db, just_inserted_shreds, shred_id); + const maybe_shred = try shred_store.get(shred_id); const ending_shred = if (maybe_shred) |s| s else { self.logger.errf(&newlinesToSpaces( \\Last received data shred {any} indicated by slot meta \ @@ -854,15 +724,6 @@ pub const ShredInserter = struct { false; } - /// agave: get_data_shred - fn getDataShred(self: *Self, slot: Slot, index: u64) !?[]const u8 { - if (try self.db.getBytes(schema.data_shred, .{ slot, index })) |shred| { - const payload = shred.payload(); - std.debug.assert(payload.len == shred_mod.data_shred_constants.payload_size); - return payload; - } - } - /// agave: has_duplicate_shreds_in_slot fn hasDuplicateShredsInSlot(self: *Self, slot: Slot) !bool { return try self.db.contains(schema.duplicate_slots, slot); @@ -871,7 +732,7 @@ pub const ShredInserter = struct { /// agave: check_merkle_root_consistency fn checkMerkleRootConsistency( self: *Self, - just_inserted_shreds: *const AutoHashMap(ShredId, Shred), + shred_store: WorkingShredStore, slot: Slot, merkle_root_meta: *const meta.MerkleRootMeta, shred: *const Shred, @@ -906,7 +767,7 @@ pub const ShredInserter = struct { .index = merkle_root_meta.first_received_shred_index, .shred_type = merkle_root_meta.first_received_shred_type, }; - if (try getShredFromJustInsertedOrDb(&self.db, just_inserted_shreds, shred_id)) |conflicting_shred| { + if (try shred_store.get(shred_id)) |conflicting_shred| { try duplicate_shreds.append(.{ .MerkleRootConflict = .{ .original = shred.*, // TODO lifetimes (cloned in rust) @@ -992,7 +853,7 @@ pub const ShredInserter = struct { self: *Self, erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), index_working_set: *AutoHashMap(u64, IndexMetaWorkingSetEntry), - prev_inserted_shreds: *const AutoHashMap(ShredId, Shred), + shred_store: WorkingShredStore, reed_solomon_cache: *ReedSolomonCache, ) !ArrayList(Shred) { // Recovery rules: @@ -1011,7 +872,7 @@ pub const ShredInserter = struct { .can_recover => return try self.recoverShreds( &index_meta_entry.index, erasure_meta, - prev_inserted_shreds, + shred_store, reed_solomon_cache, ), .data_full => { @@ -1030,28 +891,28 @@ pub const ShredInserter = struct { self: *Self, index: *const Index, erasure_meta: *const ErasureMeta, - prev_inserted_shreds: *const AutoHashMap(ShredId, Shred), + shred_store: WorkingShredStore, reed_solomon_cache: *ReedSolomonCache, ) !std.ArrayList(Shred) { var available_shreds = ArrayList(Shred).init(self.allocator); defer available_shreds.deinit(); try getRecoveryShreds( - self, + self.allocator, .data, &index.data_index, index.slot, erasure_meta.dataShredsIndices(), - prev_inserted_shreds, + shred_store, &available_shreds, ); try getRecoveryShreds( - self, + self.allocator, .code, &index.code_index, index.slot, erasure_meta.codeShredsIndices(), - prev_inserted_shreds, + shred_store, &available_shreds, ); @@ -1071,167 +932,22 @@ pub const ShredInserter = struct { // agave: get_recovery_data_shreds and get_recovery_coding_shreds fn getRecoveryShreds( - self: *Self, + allocator: Allocator, comptime shred_type: sig.ledger.shred.ShredType, index: *const ShredIndex, slot: Slot, shred_indices: [2]u64, - prev_inserted_shreds: *const AutoHashMap(ShredId, Shred), + shred_store: WorkingShredStore, available_shreds: *ArrayList(Shred), ) !void { - const column_family = switch (shred_type) { - .data => schema.data_shred, - .code => schema.code_shred, - }; for (shred_indices[0]..shred_indices[1]) |i| { - const key = ShredId{ .slot = slot, .index = @intCast(i), .shred_type = shred_type }; - if (prev_inserted_shreds.get(key)) |shred| { + if (try shred_store.getWithIndex(allocator, index, shred_type, slot, i)) |shred| { try available_shreds.append(shred); - } else if (index.contains(i)) { - const shred = try self.db.getBytes(column_family, .{ slot, i }) orelse { - self.logger.errf(&newlinesToSpaces( - \\Unable to read the {s} with slot {}, index {} for shred - \\recovery. The shred is marked present in the slot's {s} index, - \\but the shred could not be found in the {s} column. - ), .{ column_family.name, slot, i, column_family.name, column_family.name }); - continue; - }; - defer shred.deinit(); - try available_shreds.append(try Shred.fromPayload(self.allocator, shred.data)); - } - } - } - - /// For each slot in the slot_meta_working_set which has any change, include - /// corresponding updates to schema.slot_meta via the specified `write_batch`. - /// The `write_batch` will later be atomically committed to the blockstore. - /// - /// Arguments: - /// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta` - /// mapping. - /// - `completed_slot_senders`: the units which are responsible for sending - /// signals for completed slots. - /// - `write_batch`: the write batch which includes all the updates of the - /// the current write and ensures their atomicity. - /// - /// On success, the function returns an Ok result with pair where: - /// - `should_signal`: a boolean flag indicating whether to send signal. - /// - `newly_completed_slots`: a subset of slot_meta_working_set which are - /// newly completed. - /// - /// agave: commit_slot_meta_working_set - fn commitSlotMetaWorkingSet( - allocator: Allocator, - slot_meta_working_set: *const AutoHashMap(u64, SlotMetaWorkingSetEntry), - completed_slots_senders: []const void, // TODO - write_batch: *WriteBatch, - ) !struct { bool, ArrayList(u64) } { - var should_signal = false; - var newly_completed_slots = ArrayList(u64).init(allocator); - - // Check if any metadata was changed, if so, insert the new version of the - // metadata into the write batch - var iter = slot_meta_working_set.iterator(); - while (iter.next()) |entry| { - // Any slot that wasn't written to should have been filtered out by now. - std.debug.assert(entry.value_ptr.did_insert_occur); - const slot_meta = &entry.value_ptr.new_slot_meta; - const backup = &entry.value_ptr.old_slot_meta; - if (completed_slots_senders.len > 0 and isNewlyCompletedSlot(slot_meta, backup)) { - try newly_completed_slots.append(entry.key_ptr.*); - } - // Check if the working copy of the metadata has changed - if (backup.* == null or !(&backup.*.?).eql(slot_meta)) { - should_signal = should_signal or slotHasUpdates(slot_meta, backup); - try write_batch.put(schema.slot_meta, entry.key_ptr.*, slot_meta.*); } } - - return .{ should_signal, newly_completed_slots }; - } -}; - -/// State that lives for a single call to insertShreds. -/// Tracks some items that may be inserted by insertShreds. -const InsertShredsState = struct { - just_inserted_shreds: AutoHashMap(ShredId, Shred), - erasure_metas: SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), - merkle_root_metas: AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)), - slot_meta_working_set: AutoHashMap(u64, SlotMetaWorkingSetEntry), - index_working_set: AutoHashMap(u64, IndexMetaWorkingSetEntry), - duplicate_shreds: ArrayList(PossibleDuplicateShred), - - // TODO unmanaged - - fn init(allocator: Allocator) @This() { - return .{ - .just_inserted_shreds = AutoHashMap(ShredId, Shred).init(allocator), // TODO capacity = shreds.len - .erasure_metas = SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)).init(allocator), - .merkle_root_metas = AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)).init(allocator), - .slot_meta_working_set = AutoHashMap(u64, SlotMetaWorkingSetEntry).init(allocator), - .index_working_set = AutoHashMap(u64, IndexMetaWorkingSetEntry).init(allocator), - .duplicate_shreds = ArrayList(PossibleDuplicateShred).init(allocator), - }; - } - - fn deinit(self: *@This()) void { - self.just_inserted_shreds.deinit(); - self.erasure_metas.deinit(); - self.merkle_root_metas.deinit(); - deinitMapRecursive(&self.slot_meta_working_set); - deinitMapRecursive(&self.index_working_set); - self.duplicate_shreds.deinit(); } }; -// FIXME: the return may be owned by either the hashmap or the database -/// Finds the corresponding shred at `shred_id` in the just inserted -/// shreds or the backing store. Returns None if there is no shred. -/// agave: get_shred_from_just_inserted_or_db -pub fn getShredFromJustInsertedOrDb( - db: *BlockstoreDB, - just_inserted_shreds: *const AutoHashMap(ShredId, Shred), - id: ShredId, -) !?[]const u8 { // TODO consider lifetime -> return must inform a conditional deinit - if (just_inserted_shreds.get(id)) |shred| { - return shred.payload(); // owned by map - } - return switch (id.shred_type) { - // owned by database - .data => if (try db.getBytes(schema.data_shred, .{ id.slot, @intCast(id.index) })) |s| s.data else null, - .code => if (try db.getBytes(schema.code_shred, .{ id.slot, @intCast(id.index) })) |s| s.data else null, - }; -} - -/// agave: is_newly_completed_slot -pub fn isNewlyCompletedSlot(slot_meta: *const SlotMeta, backup_slot_meta: *const ?SlotMeta) bool { - return slot_meta.isFull() and ( // - backup_slot_meta.* == null or - slot_meta.consecutive_received_from_0 != - (backup_slot_meta.* orelse unreachable).consecutive_received_from_0); - // TODO unreachable: explain or fix -} - -/// Returns a boolean indicating whether a slot has received additional shreds -/// that can be replayed since the previous update to the slot's SlotMeta. -/// agave: slot_has_updates -fn slotHasUpdates(slot_meta: *const SlotMeta, slot_meta_backup: *const ?SlotMeta) bool { - // First, this slot's parent must be connected in order to even consider - // starting replay; otherwise, the replayed results may not be valid. - return slot_meta.isParentConnected() and - // Then, - // If the slot didn't exist in the db before, any consecutive shreds - // at the start of the slot are ready to be replayed. - ((slot_meta_backup.* == null and slot_meta.consecutive_received_from_0 != 0) or - // Or, - // If the slot has more consecutive shreds than it last did from the - // last update, those shreds are new and also ready to be replayed. - (slot_meta_backup.* != null and - slot_meta_backup.*.?.consecutive_received_from_0 != - slot_meta.consecutive_received_from_0)); -} - fn verifyShredSlots(slot: Slot, parent: Slot, root: Slot) bool { if (slot == 0 and parent == 0 and root == 0) { return true; // valid write to slot zero. @@ -1324,22 +1040,6 @@ fn updateCompletedDataIndexes( return ret; } -pub fn WorkingEntry(comptime T: type) type { - return union(enum) { - // Value has been modified with respect to the blockstore column - dirty: T, - // Value matches what is currently in the blockstore column - clean: T, - - pub fn asRef(self: *const @This()) *const T { - return switch (self.*) { - .dirty => &self.dirty, - .clean => &self.clean, - }; - } - }; -} - const ShredSource = enum { turbine, repaired, @@ -1359,40 +1059,13 @@ pub const CompletedDataSetInfo = struct { end_index: u32, }; -pub const PossibleDuplicateShred = union(enum) { - Exists: Shred, // Blockstore has another shred in its spot - LastIndexConflict: ShredConflict, // The index of this shred conflicts with `slot_meta.last_index` - ErasureConflict: ShredConflict, // The code shred has a conflict in the erasure_meta - MerkleRootConflict: ShredConflict, // Merkle root conflict in the same fec set - ChainedMerkleRootConflict: ShredConflict, // Merkle root chaining conflict with previous fec set -}; - -const ShredConflict = struct { - original: Shred, - conflict: []const u8, -}; - -pub const IndexMetaWorkingSetEntry = struct { - index: meta.Index, - // true only if at least one shred for this Index was inserted since the time this - // struct was created - did_insert_occur: bool = false, - - pub fn init(allocator: std.mem.Allocator, slot: Slot) IndexMetaWorkingSetEntry { - return .{ .index = meta.Index.init(allocator, slot) }; - } - - pub fn deinit(self: *IndexMetaWorkingSetEntry) void { - self.index.deinit(); - } -}; - pub const BlockstoreInsertionMetrics = struct { insert_lock_elapsed_us: *Counter, // u64 insert_shreds_elapsed_us: *Counter, // u64 shred_recovery_elapsed_us: *Counter, // u64 chaining_elapsed_us: *Counter, // u64 - commit_working_sets_elapsed_us: *Counter, // u64 + merkle_chaining_elapsed_us: *Counter, // u64 + insert_working_sets_elapsed_us: *Counter, // u64 write_batch_elapsed_us: *Counter, // u64 total_elapsed_us: *Counter, // u64 index_meta_time_us: *Counter, // u64 @@ -1424,34 +1097,6 @@ pub const BlockstoreInsertionMetrics = struct { } }; -/// The in-memory data structure for updating entries in the column family -/// [`SlotMeta`]. -pub const SlotMetaWorkingSetEntry = struct { - /// The dirty version of the `SlotMeta` which might not be persisted - /// to the blockstore yet. - new_slot_meta: SlotMeta, - /// The latest version of the `SlotMeta` that was persisted in the - /// blockstore. If None, it means the current slot is new to the - /// blockstore. - old_slot_meta: ?SlotMeta = null, - /// True only if at least one shred for this SlotMeta was inserted since - /// this struct was created. - did_insert_occur: bool = false, - - pub fn deinit(self: *@This()) void { - self.new_slot_meta.deinit(); - if (self.old_slot_meta) |*old| old.deinit(); - } -}; - -pub fn deinitMapRecursive(map: anytype) void { - var iter = map.iterator(); - while (iter.next()) |entry| { - entry.value_ptr.deinit(); - } - map.deinit(); -} - ////////// // Tests @@ -1513,7 +1158,7 @@ const ShredInserterTestState = struct { fn checkInsertCodeShred( self: *ShredInserterTestState, shred: Shred, - state: *InsertShredsState, + state: *InsertShredsWorkingState, write_batch: *WriteBatch, ) !bool { return try self.inserter.checkInsertCodeShred( @@ -1673,7 +1318,7 @@ test "merkle root metas coding" { var write_batch = try state.db.initWriteBatch(); defer write_batch.deinit(); const this_shred = shreds[0]; - var insert_state = InsertShredsState.init(state.allocator()); + var insert_state = try InsertShredsWorkingState.init(state.allocator(), .noop, &state.db); defer insert_state.deinit(); const merkle_root_metas = &insert_state.merkle_root_metas; @@ -1704,7 +1349,7 @@ test "merkle root metas coding" { try state.db.commit(write_batch); } - var insert_state = InsertShredsState.init(state.allocator()); + var insert_state = try InsertShredsWorkingState.init(state.allocator(), .noop, &state.db); defer insert_state.deinit(); { // second shred (same index as first, should conflict with merkle root) diff --git a/src/ledger/insert_shreds_working_state.zig b/src/ledger/insert_shreds_working_state.zig new file mode 100644 index 00000000..b2fd5fa4 --- /dev/null +++ b/src/ledger/insert_shreds_working_state.zig @@ -0,0 +1,440 @@ +const std = @import("std"); +const sig = @import("../sig.zig"); + +const ledger = sig.ledger; +const meta = ledger.meta; +const schema = ledger.schema.schema; + +const Allocator = std.mem.Allocator; +const ArrayList = std.ArrayList; +const AutoHashMap = std.AutoHashMap; + +const Slot = sig.core.Slot; +const SortedMap = sig.utils.collections.SortedMap; +const Timer = sig.time.Timer; + +const BlockstoreDB = ledger.blockstore.BlockstoreDB; +const BlockstoreInsertionMetrics = ledger.insert_shred.BlockstoreInsertionMetrics; +const CodeShred = ledger.shred.CodeShred; +const ColumnFamily = ledger.database.ColumnFamily; +const ErasureSetId = ledger.shred.ErasureSetId; +const Shred = ledger.shred.Shred; +const ShredId = ledger.shred.ShredId; +const WriteBatch = BlockstoreDB.WriteBatch; + +const ErasureMeta = meta.ErasureMeta; +const Index = meta.Index; +const MerkleRootMeta = meta.MerkleRootMeta; +const ShredIndex = meta.ShredIndex; +const SlotMeta = meta.SlotMeta; + +const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; + +/// Working state that lives for a single call to ShredInserter.insertShreds. +/// +/// This struct is responsible for tracking working entries for items that need to be loaded +/// from the database if they are present there, otherwise initialized to some default value. +/// Then, they need to be used throughout the lifetime of the insertShreds call (when they may +/// or may not be mutated). And at the end, they need to be persisted into the database. +/// +/// This struct should not have any business logic about how to verify shreds or anything like +/// that. It is only used for negotiating state between the database and working sets. +/// +/// Only intended for use within a single thread +pub const InsertShredsWorkingState = struct { + allocator: Allocator, + logger: sig.trace.Logger, + db: *BlockstoreDB, + write_batch: WriteBatch, + just_inserted_shreds: AutoHashMap(ShredId, Shred), + erasure_metas: SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), + merkle_root_metas: AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)), + slot_meta_working_set: AutoHashMap(u64, SlotMetaWorkingSetEntry), + index_working_set: AutoHashMap(u64, IndexMetaWorkingSetEntry), + duplicate_shreds: ArrayList(PossibleDuplicateShred), + metrics: BlockstoreInsertionMetrics, + + // TODO unmanaged + + const Self = @This(); + + // TODO add param for metrics + pub fn init(allocator: Allocator, logger: sig.trace.Logger, db: *BlockstoreDB) !Self { + return .{ + .allocator = allocator, + .db = db, + .logger = logger, + .write_batch = try db.initWriteBatch(), + .just_inserted_shreds = AutoHashMap(ShredId, Shred).init(allocator), // TODO capacity = shreds.len + .erasure_metas = SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)).init(allocator), + .merkle_root_metas = AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)).init(allocator), + .slot_meta_working_set = AutoHashMap(u64, SlotMetaWorkingSetEntry).init(allocator), + .index_working_set = AutoHashMap(u64, IndexMetaWorkingSetEntry).init(allocator), + .duplicate_shreds = ArrayList(PossibleDuplicateShred).init(allocator), + .metrics = try BlockstoreInsertionMetrics.init(sig.prometheus.globalRegistry()), + }; + } + + pub fn deinit(self: *Self) void { + self.just_inserted_shreds.deinit(); + self.erasure_metas.deinit(); + self.merkle_root_metas.deinit(); + deinitMapRecursive(&self.slot_meta_working_set); + deinitMapRecursive(&self.index_working_set); + self.duplicate_shreds.deinit(); + self.write_batch.deinit(); + } + + pub fn getOrPutErasureMeta( + self: *Self, + erasure_set_id: ErasureSetId, + code_shred: CodeShred, + ) !*const ErasureMeta { + const erasure_meta_entry = try self.erasure_metas.getOrPut(erasure_set_id); + if (!erasure_meta_entry.found_existing) { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { + erasure_meta_entry.value_ptr.* = .{ .clean = meta_ }; + } else { + erasure_meta_entry.value_ptr.* = .{ + .dirty = ErasureMeta.fromCodeShred(code_shred) orelse return error.Unwrap, + }; + } + } + return erasure_meta_entry.value_ptr.asRef(); + } + + /// agave: get_index_meta_entry + pub fn getIndexMetaEntry(self: *Self, slot: Slot) !*IndexMetaWorkingSetEntry { + var timer = try Timer.start(); + const entry = try self.index_working_set.getOrPut(slot); + if (!entry.found_existing) { + // TODO lifetimes (conflicting?) + if (try self.db.get(self.allocator, schema.index, slot)) |item| { + entry.value_ptr.* = .{ .index = item }; + } else { + entry.value_ptr.* = IndexMetaWorkingSetEntry.init(self.allocator, slot); + } + } + self.metrics.index_meta_time_us.add(timer.read().asMicros()); + return entry.value_ptr; + } + + /// agave: get_slot_meta_entry + pub fn getSlotMetaEntry( + self: *Self, + slot: Slot, + parent_slot: Slot, + ) !*SlotMetaWorkingSetEntry { + const entry = try self.slot_meta_working_set.getOrPut(slot); + if (!entry.found_existing) { + if (try self.db.get(self.allocator, schema.slot_meta, slot)) |backup| { + var slot_meta: SlotMeta = try backup.clone(self.allocator); + // If parent_slot == None, then this is one of the orphans inserted + // during the chaining process, see the function find_slot_meta_in_cached_state() + // for details. Slots that are orphans are missing a parent_slot, so we should + // fill in the parent now that we know it. + if (slot_meta.isOrphan()) { + slot_meta.parent_slot = parent_slot; + } + entry.value_ptr.* = .{ + .new_slot_meta = slot_meta, + .old_slot_meta = backup, + }; + } else { + entry.value_ptr.* = .{ + .new_slot_meta = SlotMeta.init(self.allocator, slot, parent_slot), + }; + } + } + return entry.value_ptr; + } + + pub fn shredStore(self: *Self) WorkingShredStore { + return .{ + .logger = self.logger, + .db = self.db, + .just_inserted_shreds = &self.just_inserted_shreds, + }; + } + + // TODO: should this actually be called externally? + // consider moving this logic into a getOrPut-style method + pub fn loadErasureMeta(self: *Self, erasure_set_id: ErasureSetId) !void { + if (!self.erasure_metas.contains(erasure_set_id)) { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { + try self.erasure_metas.put(erasure_set_id, .{ .clean = meta_ }); + } + } + } + + // TODO: should this actually be called externally? + // consider moving this logic into a getOrPut-style method + pub fn loadMerkleRootMeta(self: *Self, erasure_set_id: ErasureSetId) !void { + if (!self.merkle_root_metas.contains(erasure_set_id)) { + if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { + try self.merkle_root_metas.put(erasure_set_id, .{ .clean = meta_ }); + } + } + } + + // TODO: should this actually be called externally? + pub fn initMerkleRootMetaIfMissing( + self: *Self, + erasure_set_id: ErasureSetId, + shred: anytype, + ) !void { + const entry = try self.merkle_root_metas.getOrPut(erasure_set_id); + if (!entry.found_existing) { + entry.value_ptr.* = .{ .dirty = MerkleRootMeta.fromFirstReceivedShred(shred) }; + } + } + + pub fn commit(self: *Self) !void { + var commit_working_sets_timer = try Timer.start(); + + // TODO: inputs and outputs of this function may need to be fleshed out + // as the blockstore is used more throughout the codebase. + _, const newly_completed_slots = try self.commitSlotMetaWorkingSet(self.allocator, &.{}); + newly_completed_slots.deinit(); + + try persistWorkingEntries(&self.write_batch, schema.erasure_meta, &self.erasure_metas); + try persistWorkingEntries(&self.write_batch, schema.merkle_root_meta, &self.merkle_root_metas); + + var index_working_set_iterator = self.index_working_set.iterator(); + while (index_working_set_iterator.next()) |entry| { + const working_entry = entry.value_ptr; + if (working_entry.did_insert_occur) { + try self.write_batch.put(schema.index, entry.key_ptr.*, working_entry.index); + } + } + + self.metrics.insert_working_sets_elapsed_us.add(commit_working_sets_timer.read().asMicros()); + + var commit_timer = try Timer.start(); + try self.db.commit(self.write_batch); + self.metrics.write_batch_elapsed_us.add(commit_timer.read().asMicros()); + } + + /// For each slot in the slot_meta_working_set which has any change, include + /// corresponding updates to schema.slot_meta via the specified `write_batch`. + /// The `write_batch` will later be atomically committed to the blockstore. + /// + /// Arguments: + /// - `slot_meta_working_set`: a map that maintains slot-id to its `SlotMeta` + /// mapping. + /// - `completed_slot_senders`: the units which are responsible for sending + /// signals for completed slots. + /// - `write_batch`: the write batch which includes all the updates of the + /// the current write and ensures their atomicity. + /// + /// On success, the function returns an Ok result with pair where: + /// - `should_signal`: a boolean flag indicating whether to send signal. + /// - `newly_completed_slots`: a subset of slot_meta_working_set which are + /// newly completed. + /// + /// agave: commit_slot_meta_working_set + fn commitSlotMetaWorkingSet( + self: *Self, + allocator: Allocator, + completed_slots_senders: []const void, // TODO + ) !struct { bool, ArrayList(u64) } { + var should_signal = false; + var newly_completed_slots = ArrayList(u64).init(allocator); + + // Check if any metadata was changed, if so, insert the new version of the + // metadata into the write batch + var iter = self.slot_meta_working_set.iterator(); + while (iter.next()) |entry| { + // Any slot that wasn't written to should have been filtered out by now. + std.debug.assert(entry.value_ptr.did_insert_occur); + const slot_meta = &entry.value_ptr.new_slot_meta; + const backup = &entry.value_ptr.old_slot_meta; + if (completed_slots_senders.len > 0 and isNewlyCompletedSlot(slot_meta, backup)) { + try newly_completed_slots.append(entry.key_ptr.*); + } + // Check if the working copy of the metadata has changed + if (backup.* == null or !(&backup.*.?).eql(slot_meta)) { + should_signal = should_signal or slotHasUpdates(slot_meta, backup); + try self.write_batch.put(schema.slot_meta, entry.key_ptr.*, slot_meta.*); + } + } + + return .{ should_signal, newly_completed_slots }; + } + + fn persistWorkingEntries( + write_batch: *WriteBatch, + cf: ColumnFamily, + /// Map(cf.Key, WorkingEntry(cf.Value)) + working_entries_map: anytype, + ) !void { + var iterator = working_entries_map.iterator(); + while (iterator.next()) |entry| { + const key = entry.key_ptr.*; + const value = entry.value_ptr; + if (value.* == .dirty) { + try write_batch.put(cf, key, value.asRef().*); + } + } + } +}; + +pub fn WorkingEntry(comptime T: type) type { + return union(enum) { + // Value has been modified with respect to the blockstore column + dirty: T, + // Value matches what is currently in the blockstore column + clean: T, + + pub fn asRef(self: *const @This()) *const T { + return switch (self.*) { + .dirty => &self.dirty, + .clean => &self.clean, + }; + } + }; +} + +pub const IndexMetaWorkingSetEntry = struct { + index: meta.Index, + // true only if at least one shred for this Index was inserted since the time this + // struct was created + did_insert_occur: bool = false, + + pub fn init(allocator: std.mem.Allocator, slot: Slot) IndexMetaWorkingSetEntry { + return .{ .index = meta.Index.init(allocator, slot) }; + } + + pub fn deinit(self: *IndexMetaWorkingSetEntry) void { + self.index.deinit(); + } +}; + +/// The in-memory data structure for updating entries in the column family +/// [`SlotMeta`]. +pub const SlotMetaWorkingSetEntry = struct { + /// The dirty version of the `SlotMeta` which might not be persisted + /// to the blockstore yet. + new_slot_meta: SlotMeta, + /// The latest version of the `SlotMeta` that was persisted in the + /// blockstore. If None, it means the current slot is new to the + /// blockstore. + old_slot_meta: ?SlotMeta = null, + /// True only if at least one shred for this SlotMeta was inserted since + /// this struct was created. + did_insert_occur: bool = false, + + pub fn deinit(self: *@This()) void { + self.new_slot_meta.deinit(); + if (self.old_slot_meta) |*old| old.deinit(); + } +}; + +pub const PossibleDuplicateShred = union(enum) { + Exists: Shred, // Blockstore has another shred in its spot + LastIndexConflict: ShredConflict, // The index of this shred conflicts with `slot_meta.last_index` + ErasureConflict: ShredConflict, // The code shred has a conflict in the erasure_meta + MerkleRootConflict: ShredConflict, // Merkle root conflict in the same fec set + ChainedMerkleRootConflict: ShredConflict, // Merkle root chaining conflict with previous fec set +}; + +const ShredConflict = struct { + original: Shred, + conflict: []const u8, +}; + +pub const WorkingShredStore = struct { + logger: sig.trace.Logger, + db: *BlockstoreDB, + just_inserted_shreds: *const AutoHashMap(ShredId, Shred), + + const Self = @This(); + + // TODO consider lifetime -> return must inform a conditional deinit + pub fn get(self: Self, id: ShredId) !?[]const u8 { + if (self.just_inserted_shreds.get(id)) |shred| { + return shred.payload(); // owned by map + } + return switch (id.shred_type) { + // owned by database + .data => self.getFromDb(schema.data_shred, id), + .code => self.getFromDb(schema.code_shred, id), + }; + } + + // TODO consider lifetime -> return may be owned by different contexts + /// This does almost the same thing as `get` and may not actually be necessary. + /// This just adds a check on the index and evaluates the cf at comptime instead of runtime. + pub fn getWithIndex( + self: Self, + allocator: Allocator, + index: *const ShredIndex, + comptime shred_type: sig.ledger.shred.ShredType, + slot: Slot, + shred_index: u64, + ) !?Shred { + const cf = switch (shred_type) { + .data => schema.data_shred, + .code => schema.code_shred, + }; + const id = ShredId{ .slot = slot, .index = @intCast(shred_index), .shred_type = shred_type }; + return if (self.just_inserted_shreds.get(id)) |shred| + shred + else if (index.contains(shred_index)) blk: { + const shred = try self.db.getBytes(cf, .{ slot, @intCast(id.index) }) orelse { + self.logger.errf(&newlinesToSpaces( + \\Unable to read the {s} with slot {}, index {} for shred + \\recovery. The shred is marked present in the slot's index, + \\but the shred could not be found in the column. + ), .{ cf.name, slot, shred_index }); + return null; + }; + defer shred.deinit(); + break :blk try Shred.fromPayload(allocator, shred.data); + } else null; + } + + fn getFromDb(self: Self, comptime cf: ColumnFamily, id: ShredId) !?[]const u8 { + return if (try self.db.getBytes(cf, .{ id.slot, @intCast(id.index) })) |s| + s.data + else + null; + } +}; + +pub fn deinitMapRecursive(map: anytype) void { + var iter = map.iterator(); + while (iter.next()) |entry| { + entry.value_ptr.deinit(); + } + map.deinit(); +} + +/// agave: is_newly_completed_slot +pub fn isNewlyCompletedSlot(slot_meta: *const SlotMeta, backup_slot_meta: *const ?SlotMeta) bool { + return slot_meta.isFull() and ( // + backup_slot_meta.* == null or + slot_meta.consecutive_received_from_0 != + (backup_slot_meta.* orelse unreachable).consecutive_received_from_0); + // TODO unreachable: explain or fix +} + +/// Returns a boolean indicating whether a slot has received additional shreds +/// that can be replayed since the previous update to the slot's SlotMeta. +/// agave: slot_has_updates +fn slotHasUpdates(slot_meta: *const SlotMeta, slot_meta_backup: *const ?SlotMeta) bool { + // First, this slot's parent must be connected in order to even consider + // starting replay; otherwise, the replayed results may not be valid. + return slot_meta.isParentConnected() and + // Then, + // If the slot didn't exist in the db before, any consecutive shreds + // at the start of the slot are ready to be replayed. + ((slot_meta_backup.* == null and slot_meta.consecutive_received_from_0 != 0) or + // Or, + // If the slot has more consecutive shreds than it last did from the + // last update, those shreds are new and also ready to be replayed. + (slot_meta_backup.* != null and + slot_meta_backup.*.?.consecutive_received_from_0 != + slot_meta.consecutive_received_from_0)); +} diff --git a/src/ledger/lib.zig b/src/ledger/lib.zig index d2fa7464..3008ab95 100644 --- a/src/ledger/lib.zig +++ b/src/ledger/lib.zig @@ -3,6 +3,7 @@ pub const cleanup_service = @import("cleanup_service.zig"); pub const database = @import("database.zig"); pub const hashmap_db = @import("hashmap_db.zig"); pub const insert_shred = @import("insert_shred.zig"); +pub const insert_shreds_working_state = @import("insert_shreds_working_state.zig"); pub const merkle_chaining = @import("merkle_chaining.zig"); pub const meta = @import("meta.zig"); pub const reader = @import("reader.zig"); diff --git a/src/ledger/merkle_chaining.zig b/src/ledger/merkle_chaining.zig index d3027716..9812eedb 100644 --- a/src/ledger/merkle_chaining.zig +++ b/src/ledger/merkle_chaining.zig @@ -17,15 +17,15 @@ const BlockstoreDB = ledger.blockstore.BlockstoreDB; const CodeShred = ledger.shred.CodeShred; const ErasureMeta = ledger.meta.ErasureMeta; const MerkleRootMeta = ledger.meta.MerkleRootMeta; +const PossibleDuplicateShred = ledger.insert_shreds_working_state.PossibleDuplicateShred; const Shred = ledger.shred.Shred; const ShredId = ledger.shred.ShredId; -const WorkingEntry = ledger.insert_shred.WorkingEntry; -const PossibleDuplicateShred = ledger.insert_shred.PossibleDuplicateShred; +const WorkingEntry = ledger.insert_shreds_working_state.WorkingEntry; +const WorkingShredStore = ledger.insert_shreds_working_state.WorkingShredStore; const key_serializer = sig.ledger.database.key_serializer; const value_serializer = sig.ledger.database.value_serializer; -const getShredFromJustInsertedOrDb = ledger.insert_shred.getShredFromJustInsertedOrDb; const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; /// Returns true if there is no chaining conflict between @@ -45,7 +45,7 @@ pub fn checkForwardChainedMerkleRootConsistency( db: *BlockstoreDB, shred: CodeShred, erasure_meta: ErasureMeta, - just_inserted_shreds: *const AutoHashMap(ShredId, Shred), + shred_store: WorkingShredStore, merkle_root_metas: *AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)), duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), ) !bool { @@ -75,7 +75,7 @@ pub fn checkForwardChainedMerkleRootConsistency( .index = next_merkle_root_meta.first_received_shred_index, .shred_type = next_merkle_root_meta.first_received_shred_type, }; - const next_shred = if (try getShredFromJustInsertedOrDb(db, just_inserted_shreds, next_shred_id)) |ns| + const next_shred = if (try shred_store.get(next_shred_id)) |ns| ns else { logger.errf(&newlinesToSpaces( @@ -133,7 +133,7 @@ pub fn checkBackwardsChainedMerkleRootConsistency( logger: Logger, db: *BlockstoreDB, shred: Shred, - just_inserted_shreds: *const AutoHashMap(ShredId, Shred), + shred_store: WorkingShredStore, erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), // BTreeMap in agave duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), ) !bool { @@ -167,7 +167,7 @@ pub fn checkBackwardsChainedMerkleRootConsistency( .shred_type = .code, }; const prev_shred = - if (try getShredFromJustInsertedOrDb(db, just_inserted_shreds, prev_shred_id)) |ps| ps else { + if (try shred_store.get(prev_shred_id)) |ps| ps else { logger.warnf(&newlinesToSpaces( \\Shred {any} indicated by the erasure meta {any} \ \\is missing from blockstore. This can happen if you have recently upgraded \ diff --git a/src/ledger/slot_chaining.zig b/src/ledger/slot_chaining.zig index 9ef53340..2ef5673c 100644 --- a/src/ledger/slot_chaining.zig +++ b/src/ledger/slot_chaining.zig @@ -11,11 +11,11 @@ const Slot = sig.core.Slot; const BlockstoreDB = ledger.blockstore.BlockstoreDB; const SlotMeta = ledger.meta.SlotMeta; -const SlotMetaWorkingSetEntry = ledger.insert_shred.SlotMetaWorkingSetEntry; +const SlotMetaWorkingSetEntry = ledger.insert_shreds_working_state.SlotMetaWorkingSetEntry; const WriteBatch = BlockstoreDB.WriteBatch; -const deinitMapRecursive = ledger.insert_shred.deinitMapRecursive; -const isNewlyCompletedSlot = ledger.insert_shred.isNewlyCompletedSlot; +const deinitMapRecursive = ledger.insert_shreds_working_state.deinitMapRecursive; +const isNewlyCompletedSlot = ledger.insert_shreds_working_state.isNewlyCompletedSlot; /// agave: handle_chaining pub fn handleChaining(