Skip to content

Commit

Permalink
storage-bigtable: Upload entries (solana-labs#34099)
Browse files Browse the repository at this point in the history
* Add entries table to bt init

* Add entries to storage-proto

* Use new Blockstore method in bigtable_upload

* Add LedgerStorage::upload_confirmed_block_with_entries and use in bigtable_upload

* Upload entries to bigtable
  • Loading branch information
Tyera authored Nov 28, 2023
1 parent b9ef204 commit 573ec81
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 9 deletions.
9 changes: 5 additions & 4 deletions ledger/src/bigtable_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ pub async fn upload_confirmed_blocks(
break;
}

let _ = match blockstore.get_rooted_block(slot, true) {
Ok(confirmed_block) => {
let _ = match blockstore.get_rooted_block_with_entries(slot, true) {
Ok(confirmed_block_with_entries) => {
num_blocks_read += 1;
sender.send((slot, Some(confirmed_block)))
sender.send((slot, Some(confirmed_block_with_entries)))
}
Err(err) => {
warn!(
Expand Down Expand Up @@ -227,7 +227,8 @@ pub async fn upload_confirmed_blocks(
Some(confirmed_block) => {
let bt = bigtable.clone();
Some(tokio::spawn(async move {
bt.upload_confirmed_block(slot, confirmed_block).await
bt.upload_confirmed_block_with_entries(slot, confirmed_block)
.await
}))
}
});
Expand Down
2 changes: 1 addition & 1 deletion storage-bigtable/init-bigtable.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
cbt+=(-project emulator)
fi

for table in blocks tx tx-by-addr; do
for table in blocks entries tx tx-by-addr; do
(
set -x
"${cbt[@]}" createtable $table
Expand Down
49 changes: 47 additions & 2 deletions storage-bigtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ use {
timing::AtomicInterval,
transaction::{TransactionError, VersionedTransaction},
},
solana_storage_proto::convert::{generated, tx_by_addr},
solana_storage_proto::convert::{entries, generated, tx_by_addr},
solana_transaction_status::{
extract_and_fmt_memos, ConfirmedBlock, ConfirmedTransactionStatusWithSignature,
ConfirmedTransactionWithStatusMeta, Reward, TransactionByAddrInfo,
TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta,
TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedConfirmedBlockWithEntries,
VersionedTransactionWithStatusMeta,
},
std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -91,6 +92,10 @@ fn slot_to_blocks_key(slot: Slot) -> String {
slot_to_key(slot)
}

fn slot_to_entries_key(slot: Slot) -> String {
slot_to_key(slot)
}

fn slot_to_tx_by_addr_key(slot: Slot) -> String {
slot_to_key(!slot)
}
Expand Down Expand Up @@ -883,7 +888,30 @@ impl LedgerStorage {
"LedgerStorage::upload_confirmed_block request received: {:?}",
slot
);
self.upload_confirmed_block_with_entries(
slot,
VersionedConfirmedBlockWithEntries {
block: confirmed_block,
entries: vec![],
},
)
.await
}

pub async fn upload_confirmed_block_with_entries(
&self,
slot: Slot,
confirmed_block: VersionedConfirmedBlockWithEntries,
) -> Result<()> {
trace!(
"LedgerStorage::upload_confirmed_block_with_entries request received: {:?}",
slot
);
let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
let VersionedConfirmedBlockWithEntries {
block: confirmed_block,
entries,
} = confirmed_block;

let mut tx_cells = Vec::with_capacity(confirmed_block.transactions.len());
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
Expand Down Expand Up @@ -934,6 +962,14 @@ impl LedgerStorage {
})
.collect();

let num_entries = entries.len();
let entry_cell = (
slot_to_entries_key(slot),
entries::Entries {
entries: entries.into_iter().enumerate().map(Into::into).collect(),
},
);

let mut tasks = vec![];

if !tx_cells.is_empty() {
Expand All @@ -955,6 +991,14 @@ impl LedgerStorage {
}));
}

if num_entries > 0 {
let conn = self.connection.clone();
tasks.push(tokio::spawn(async move {
conn.put_protobuf_cells_with_retry::<entries::Entries>("entries", &[entry_cell])
.await
}));
}

let mut bytes_written = 0;
let mut maybe_first_err: Option<Error> = None;

Expand Down Expand Up @@ -995,6 +1039,7 @@ impl LedgerStorage {
"storage-bigtable-upload-block",
("slot", slot, i64),
("transactions", num_transactions, i64),
("entries", num_entries, i64),
("bytes", bytes_written, i64),
);
Ok(())
Expand Down
6 changes: 5 additions & 1 deletion storage-proto/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ fn main() -> Result<(), std::io::Error> {
}

let proto_base_path = std::path::PathBuf::from("proto");
let proto_files = ["confirmed_block.proto", "transaction_by_addr.proto"];
let proto_files = [
"confirmed_block.proto",
"entries.proto",
"transaction_by_addr.proto",
];
let mut protos = Vec::new();
for proto_file in &proto_files {
let proto = proto_base_path.join(proto_file);
Expand Down
15 changes: 15 additions & 0 deletions storage-proto/proto/entries.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package solana.storage.Entries;

message Entries {
repeated Entry entries = 1;
}

message Entry {
uint32 index = 1;
uint64 num_hashes = 2;
bytes hash = 3;
uint64 num_transactions = 4;
uint32 starting_transaction_index = 5;
}
19 changes: 18 additions & 1 deletion storage-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {
transaction_context::TransactionReturnData,
},
solana_transaction_status::{
ConfirmedBlock, InnerInstruction, InnerInstructions, Reward, RewardType,
ConfirmedBlock, EntrySummary, InnerInstruction, InnerInstructions, Reward, RewardType,
TransactionByAddrInfo, TransactionStatusMeta, TransactionTokenBalance,
TransactionWithStatusMeta, VersionedConfirmedBlock, VersionedTransactionWithStatusMeta,
},
Expand All @@ -41,6 +41,11 @@ pub mod tx_by_addr {
));
}

#[allow(clippy::derive_partial_eq_without_eq)]
pub mod entries {
include!(concat!(env!("OUT_DIR"), "/solana.storage.entries.rs"));
}

impl From<Vec<Reward>> for generated::Rewards {
fn from(rewards: Vec<Reward>) -> Self {
Self {
Expand Down Expand Up @@ -1189,6 +1194,18 @@ impl TryFrom<tx_by_addr::TransactionByAddr> for Vec<TransactionByAddrInfo> {
}
}

impl From<(usize, EntrySummary)> for entries::Entry {
fn from((index, entry_summary): (usize, EntrySummary)) -> Self {
entries::Entry {
index: index as u32,
num_hashes: entry_summary.num_hashes,
hash: entry_summary.hash.as_ref().into(),
num_transactions: entry_summary.num_transactions,
starting_transaction_index: entry_summary.starting_transaction_index as u32,
}
}
}

#[cfg(test)]
mod test {
use {super::*, enum_iterator::all};
Expand Down

0 comments on commit 573ec81

Please sign in to comment.