Skip to content

Commit

Permalink
Use a single connection per read batch
Browse files Browse the repository at this point in the history
  • Loading branch information
moshababo committed Jul 20, 2024
1 parent 5a1a259 commit a8779b5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 85 deletions.
4 changes: 1 addition & 3 deletions core/node/consensus/src/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ async fn test_vm_reader() {
let mut reader =
super::vm_reader::VMReader::new(pool.clone(), tx_sender.clone(), registry_address);

let validators = reader.read_validator_committee(ctx, block_id).await;
let (validators, attesters) = reader.read_committees(ctx, block_id).await;
assert_eq!(validators.len(), num_nodes);
let attesters = reader.read_attester_committee(ctx, block_id).await;
assert_eq!(attesters.len(), num_nodes);

for i in 0..nodes.len() {
assert_eq!(
nodes[i][0].clone().into_address().unwrap(),
Expand Down
123 changes: 41 additions & 82 deletions core/node/consensus/src/storage/vm_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::time::Duration;

use zksync_concurrency::ctx::Ctx;
use zksync_contracts::load_contract;
use zksync_node_api_server::{execution_sandbox::BlockStartInfo, tx_sender::TxSender};
use zksync_node_api_server::{
execution_sandbox::{BlockArgs, BlockStartInfo},
tx_sender::TxSender,
};
use zksync_system_constants::DEFAULT_L2_TX_GAS_PER_PUBDATA_BYTE;
use zksync_types::{
api::BlockId,
Expand Down Expand Up @@ -33,35 +36,46 @@ impl VMReader {
}
}

pub async fn read_validator_committee(
&mut self,
pub async fn read_committees(
&self,
ctx: &Ctx,
block_id: BlockId,
) -> Vec<CommitteeValidator> {
) -> (Vec<CommitteeValidator>, Vec<CommitteeAttester>) {
let mut conn = self.pool.connection(ctx).await.unwrap().0;
let start_info = BlockStartInfo::new(&mut conn, Duration::from_secs(10))
.await
.unwrap();
let block_args = BlockArgs::new(&mut conn, block_id, &start_info)
.await
.unwrap();

let validator_committee = self.read_validator_committee(block_args).await;
let attester_committee = self.read_attester_committee(block_args).await;

(validator_committee, attester_committee)
}

pub async fn read_validator_committee(&self, block_args: BlockArgs) -> Vec<CommitteeValidator> {
let mut committee = vec![];
let validator_committee_size = self.read_validator_committee_size(ctx, block_id).await;
let validator_committee_size = self.read_validator_committee_size(block_args).await;
for i in 0..validator_committee_size {
let committee_validator = self.read_committee_validator(ctx, block_id, i).await;
let committee_validator = self.read_committee_validator(block_args, i).await;
committee.push(committee_validator)
}
committee
}

pub async fn read_attester_committee(
&mut self,
ctx: &Ctx,
block_id: BlockId,
) -> Vec<CommitteeAttester> {
pub async fn read_attester_committee(&self, block_args: BlockArgs) -> Vec<CommitteeAttester> {
let mut committee = vec![];
let attester_committee_size = self.read_attester_committee_size(ctx, block_id).await;
let attester_committee_size = self.read_attester_committee_size(block_args).await;
for i in 0..attester_committee_size {
let committee_validator = self.read_committee_attester(ctx, block_id, i).await;
let committee_validator = self.read_committee_attester(block_args, i).await;
committee.push(committee_validator)
}
committee
}

async fn read_validator_committee_size(&mut self, ctx: &Ctx, block_id: BlockId) -> usize {
async fn read_validator_committee_size(&self, block_args: BlockArgs) -> usize {
let func = self
.registry_contract
.function("validatorCommitteeSize")
Expand All @@ -70,7 +84,7 @@ impl VMReader {

let tx = self.gen_l2_call_tx(self.registry_address, func.short_signature().to_vec());

let res = self.eth_call(ctx, block_id, tx).await;
let res = self.eth_call(block_args, tx).await;

func.decode_output(&res).unwrap()[0]
.clone()
Expand All @@ -79,50 +93,25 @@ impl VMReader {
.as_usize()
}

async fn read_attester_committee_size(&mut self, ctx: &Ctx, block_id: BlockId) -> usize {
async fn read_attester_committee_size(&self, block_args: BlockArgs) -> usize {
let func = self
.registry_contract
.function("attesterCommitteeSize")
.unwrap()
.clone();
let tx = self.gen_l2_call_tx(self.registry_address, func.short_signature().to_vec());

let res = self.eth_call(ctx, block_id, tx).await;
let res = self.eth_call(block_args, tx).await;
func.decode_output(&res).unwrap()[0]
.clone()
.into_uint()
.unwrap()
.as_usize()
}

async fn read_attester(
&mut self,
ctx: &Ctx,
block_id: BlockId,
node_owner: Address,
) -> (usize, Vec<u8>, bool) {
let func = self
.registry_contract
.function("attesters")
.unwrap()
.clone();
let tx = self.gen_l2_call_tx(
self.registry_address,
func.encode_input(&[Token::Address(node_owner)]).unwrap(),
);
let res = self.eth_call(ctx, block_id, tx).await;
let tokens = func.decode_output(&res).unwrap();
(
tokens[0].clone().into_uint().unwrap().as_usize(),
tokens[1].clone().into_bytes().unwrap(),
tokens[2].clone().into_bool().unwrap(),
)
}

async fn read_committee_validator(
&mut self,
ctx: &Ctx,
block_id: BlockId,
&self,
block_args: BlockArgs,
idx: usize,
) -> CommitteeValidator {
let func = self
Expand All @@ -136,7 +125,7 @@ impl VMReader {
.unwrap(),
);

let res = self.eth_call(ctx, block_id, tx).await;
let res = self.eth_call(block_args, tx).await;
let tokens = func.decode_output(&res).unwrap();
CommitteeValidator {
node_owner: tokens[0].clone().into_address().unwrap(),
Expand All @@ -147,9 +136,8 @@ impl VMReader {
}

async fn read_committee_attester(
&mut self,
ctx: &Ctx,
block_id: BlockId,
&self,
block_args: BlockArgs,
idx: usize,
) -> CommitteeAttester {
let func = self
Expand All @@ -163,7 +151,7 @@ impl VMReader {
.unwrap(),
);

let res = self.eth_call(ctx, block_id, tx).await;
let res = self.eth_call(block_args, tx).await;
let tokens = func.decode_output(&res).unwrap();
CommitteeAttester {
weight: tokens[0].clone().into_uint().unwrap().as_usize(),
Expand All @@ -172,46 +160,17 @@ impl VMReader {
}
}

async fn read_address(
&mut self,
ctx: &Ctx,
block_id: BlockId,
contract_address: Address,
func: Function,
) -> Address {
let tx = self.gen_l2_call_tx(contract_address, func.encode_input(&vec![]).unwrap());

let res = self.eth_call(ctx, block_id, tx).await;
let tokens = func.decode_output(&res).unwrap();
tokens[0].clone().into_address().unwrap()
}

async fn eth_call(&mut self, ctx: &Ctx, block_id: BlockId, tx: L2Tx) -> Vec<u8> {
let mut conn = self.pool.connection(ctx).await.unwrap().0;
let start_info = BlockStartInfo::new(&mut conn, Duration::from_secs(10))
.await
.unwrap();
let block_args = zksync_node_api_server::execution_sandbox::BlockArgs::new(
&mut conn,
block_id,
&start_info,
)
.await
.unwrap();
async fn eth_call(&self, block_args: BlockArgs, tx: L2Tx) -> Vec<u8> {
let call_overrides = CallOverrides {
enforced_base_fee: None,
};

let res = self
.tx_sender
self.tx_sender
.eth_call(block_args, call_overrides, tx)
.await
.unwrap();

res
.unwrap()
}

fn gen_l2_call_tx(&mut self, contract_address: Address, calldata: Vec<u8>) -> L2Tx {
fn gen_l2_call_tx(&self, contract_address: Address, calldata: Vec<u8>) -> L2Tx {
L2Tx::new(
contract_address,
calldata,
Expand Down

0 comments on commit a8779b5

Please sign in to comment.