From e1165aaf00575cb2860cfa128d6ecd57bbf0b4eb Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 29 Nov 2023 11:14:24 -0500 Subject: [PATCH] blockstore: populate merkle root metas column (#34097) --- ledger/src/blockstore.rs | 435 ++++++++++++++++++++++++++++++++++++++- ledger/src/shred.rs | 4 + 2 files changed, 430 insertions(+), 9 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 70af2decd0a88c..984932a0e08ab7 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -467,6 +467,10 @@ impl Blockstore { self.erasure_meta_cf.get(erasure_set.store_key()) } + fn merkle_root_meta(&self, erasure_set: ErasureSetId) -> Result> { + self.merkle_root_meta_cf.get(erasure_set.key()) + } + /// Check whether the specified slot is an orphan slot which does not /// have a parent slot. /// @@ -801,6 +805,9 @@ impl Blockstore { /// - [`cf::ErasureMeta`]: the associated ErasureMeta of the coding and data /// shreds inside `shreds` will be updated and committed to /// `cf::ErasureMeta`. + /// - [`cf::MerkleRootMeta`]: the associated MerkleRootMeta of the coding and data + /// shreds inside `shreds` will be updated and committed to + /// `cf::MerkleRootMeta`. /// - [`cf::Index`]: stores (slot id, index to the index_working_set_entry) /// pair to the `cf::Index` column family for each index_working_set_entry /// which insert did occur in this function call. @@ -843,6 +850,7 @@ impl Blockstore { let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); let mut duplicate_shreds = vec![]; @@ -862,6 +870,7 @@ impl Blockstore { match self.check_insert_data_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -899,6 +908,7 @@ impl Blockstore { self.check_insert_coding_shred( shred, &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_inserted_shreds, @@ -945,6 +955,7 @@ impl Blockstore { match self.check_insert_data_shred( shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut slot_meta_working_set, &mut write_batch, @@ -1010,6 +1021,10 @@ impl Blockstore { write_batch.put::(erasure_set.store_key(), &erasure_meta)?; } + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch.put::(erasure_set.key(), &merkle_root_meta)?; + } + for (&slot, index_working_set_entry) in index_working_set.iter() { if index_working_set_entry.did_insert_occur { write_batch.put::(slot, &index_working_set_entry.index)?; @@ -1167,6 +1182,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, just_received_shreds: &mut HashMap, @@ -1183,10 +1199,16 @@ impl Blockstore { self.get_index_meta_entry(slot, index_working_set, index_meta_time_us); let index_meta = &mut index_meta_working_set_entry.index; + let erasure_set = shred.erasure_set(); + + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) { + if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() { + entry.insert(meta); + } + } // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index - if !is_trusted { if index_meta.coding().contains(shred_index) { metrics.num_coding_shreds_exists += 1; @@ -1200,7 +1222,6 @@ impl Blockstore { } } - let erasure_set = shred.erasure_set(); let erasure_meta = erasure_metas.entry(erasure_set).or_insert_with(|| { self.erasure_meta(erasure_set) .expect("Expect database get to succeed") @@ -1263,6 +1284,10 @@ impl Blockstore { if result { index_meta_working_set_entry.did_insert_occur = true; metrics.num_inserted += 1; + + merkle_root_metas + .entry(erasure_set) + .or_insert(MerkleRootMeta::from_shred(&shred)); } if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) { @@ -1311,8 +1336,8 @@ impl Blockstore { /// /// The resulting `write_batch` may include updates to [`cf::DeadSlots`] /// and [`cf::ShredData`]. Note that it will also update the in-memory copy - /// of `erasure_metas` and `index_working_set`, which will later be - /// used to update other column families such as [`cf::ErasureMeta`] and + /// of `erasure_metas`, `merkle_root_metas`, and `index_working_set`, which will + /// later be used to update other column families such as [`cf::ErasureMeta`] and /// [`cf::Index`]. /// /// Arguments: @@ -1320,6 +1345,9 @@ impl Blockstore { /// - `erasure_metas`: the in-memory hash-map that maintains the dirty /// copy of the erasure meta. It will later be written to /// `cf::ErasureMeta` in insert_shreds_handle_duplicate(). + /// - `merkle_root_metas`: the in-memory hash-map that maintains the dirty + /// copy of the merkle root meta. It will later be written to + /// `cf::MerkleRootMeta` in `insert_shreds_handle_duplicate()`. /// - `index_working_set`: the in-memory hash-map that maintains the /// dirty copy of the index meta. It will later be written to /// `cf::Index` in insert_shreds_handle_duplicate(). @@ -1343,6 +1371,7 @@ impl Blockstore { &self, shred: Shred, erasure_metas: &mut HashMap, + merkle_root_metas: &mut HashMap, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -1368,6 +1397,12 @@ impl Blockstore { ); let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); + let erasure_set = shred.erasure_set(); + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(erasure_set) { + if let Some(meta) = self.merkle_root_meta(erasure_set).unwrap() { + entry.insert(meta); + } + } if !is_trusted { if Self::is_data_shred_present(&shred, slot_meta, index_meta.data()) { @@ -1402,7 +1437,6 @@ impl Blockstore { } } - let erasure_set = shred.erasure_set(); let newly_completed_data_sets = self.insert_data_shred( slot_meta, index_meta.data_mut(), @@ -1410,6 +1444,9 @@ impl Blockstore { write_batch, shred_source, )?; + merkle_root_metas + .entry(erasure_set) + .or_insert(MerkleRootMeta::from_shred(&shred)); just_inserted_shreds.insert(shred.id(), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; @@ -6735,6 +6772,374 @@ pub mod tests { ),); } + #[test] + fn test_merkle_root_metas_coding() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let parent_slot = 0; + let slot = 1; + let index = 0; + let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10); + let coding_shred = coding_shreds[index as usize].clone(); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + assert!(blockstore.check_insert_coding_shred( + coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().ok(), + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_type(), + ShredType::Code, + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10); + let new_coding_shred = coding_shreds[(index + 1) as usize].clone(); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut duplicates = vec![]; + + assert!(blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut duplicates, + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().ok() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + coding_shred.merkle_root().ok() + ); + assert_eq!( + blockstore + .merkle_root_meta(coding_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index + ); + + // Add a shred from different fec set + let new_index = index + 31; + let (_, coding_shreds, _) = + setup_erasure_shreds_with_index(slot, parent_slot, 10, new_index); + let new_coding_shred = coding_shreds[0].clone(); + + assert!(blockstore.check_insert_coding_shred( + new_coding_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + &mut vec![], + false, + ShredSource::Turbine, + &mut BlockstoreInsertionMetrics::default(), + )); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + coding_shred.merkle_root().ok() + ); + assert_eq!( + merkle_root_metas + .get(&coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_coding_shred.merkle_root().ok() + ); + assert_eq!( + merkle_root_metas + .get(&new_coding_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index + ); + } + + #[test] + fn test_merkle_root_metas_data() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let parent_slot = 0; + let slot = 1; + let index = 11; + let fec_set_index = 11; + let (data_shreds, _, _) = + setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index); + let data_shred = data_shreds[0].clone(); + + let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); + let mut index_working_set = HashMap::new(); + let mut just_received_shreds = HashMap::new(); + let mut slot_meta_working_set = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut index_meta_time_us = 0; + blockstore + .check_insert_data_shred( + data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().ok() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_type(), + ShredType::Data, + ); + + for (erasure_set, merkle_root_meta) in merkle_root_metas { + write_batch + .put::(erasure_set.key(), &merkle_root_meta) + .unwrap(); + } + blockstore.db.write(write_batch).unwrap(); + + // Add a shred with different merkle root and index + let (data_shreds, _, _) = + setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index); + let new_data_shred = data_shreds[1].clone(); + + erasure_metas.clear(); + index_working_set.clear(); + just_received_shreds.clear(); + let mut merkle_root_metas = HashMap::new(); + let mut write_batch = blockstore.db.batch().unwrap(); + let mut duplicates = vec![]; + + assert!(blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut duplicates, + None, + ShredSource::Turbine, + ) + .is_ok()); + + // Verify that we still have the merkle root meta from the original shred + assert_eq!(merkle_root_metas.len(), 1); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().ok() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + + // Blockstore should also have the merkle root meta of the original shred + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .merkle_root(), + data_shred.merkle_root().ok() + ); + assert_eq!( + blockstore + .merkle_root_meta(data_shred.erasure_set()) + .unwrap() + .unwrap() + .first_received_shred_index(), + index + ); + + // Add a shred from different fec set + let new_index = fec_set_index + 31; + let new_data_shred = Shred::new_from_data( + slot, + new_index, + 1, // parent_offset + &[3, 3, 3], // data + ShredFlags::empty(), + 0, // reference_tick, + 0, // version + fec_set_index + 30, + ); + + blockstore + .check_insert_data_shred( + new_data_shred.clone(), + &mut erasure_metas, + &mut merkle_root_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_received_shreds, + &mut index_meta_time_us, + false, + &mut vec![], + None, + ShredSource::Turbine, + ) + .unwrap(); + + // Verify that we still have the merkle root meta for the original shred + // and the new shred + assert_eq!(merkle_root_metas.len(), 2); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .merkle_root(), + data_shred.merkle_root().ok() + ); + assert_eq!( + merkle_root_metas + .get(&data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + index + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .merkle_root(), + new_data_shred.merkle_root().ok() + ); + assert_eq!( + merkle_root_metas + .get(&new_data_shred.erasure_set()) + .unwrap() + .first_received_shred_index(), + new_index + ); + } + #[test] fn test_check_insert_coding_shred() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -6753,6 +7158,7 @@ pub mod tests { ); let mut erasure_metas = HashMap::new(); + let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); let mut write_batch = blockstore.db.batch().unwrap(); @@ -6760,6 +7166,7 @@ pub mod tests { assert!(blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, @@ -6775,6 +7182,7 @@ pub mod tests { assert!(!blockstore.check_insert_coding_shred( coding_shred.clone(), &mut erasure_metas, + &mut merkle_root_metas, &mut index_working_set, &mut write_batch, &mut just_received_shreds, @@ -9262,6 +9670,15 @@ pub mod tests { slot: u64, parent_slot: u64, num_entries: u64, + ) -> (Vec, Vec, Arc) { + setup_erasure_shreds_with_index(slot, parent_slot, num_entries, 0) + } + + fn setup_erasure_shreds_with_index( + slot: u64, + parent_slot: u64, + num_entries: u64, + fec_set_index: u32, ) -> (Vec, Vec, Arc) { let entries = make_slot_entries_with_transactions(num_entries); let leader_keypair = Arc::new(Keypair::new()); @@ -9269,10 +9686,10 @@ pub mod tests { let (data_shreds, coding_shreds) = shredder.entries_to_shreds( &leader_keypair, &entries, - true, // is_last_in_slot - 0, // next_shred_index - 0, // next_code_index - true, // merkle_variant + true, // is_last_in_slot + fec_set_index, // next_shred_index + fec_set_index, // next_code_index + true, // merkle_variant &ReedSolomonCache::default(), &mut ProcessShredsStats::default(), ); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 4cd4f8b85b918a..b3000d69b773c3 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -287,6 +287,10 @@ impl ErasureSetId { pub(crate) fn store_key(&self) -> (Slot, /*fec_set_index:*/ u64) { (self.0, u64::from(self.1)) } + + pub(crate) fn key(&self) -> (Slot, /*fec_set_index:*/ u32) { + (self.0, self.1) + } } macro_rules! dispatch {