Skip to content

Commit

Permalink
chore: added rollover logic to forester(once per slot) and test
Browse files Browse the repository at this point in the history
  • Loading branch information
ananas-block committed Sep 1, 2024
1 parent f0a8a4e commit 78e3bfe
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 67 deletions.
40 changes: 18 additions & 22 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
let epoch_info_clone = epoch_info.clone();
let self_clone = self.clone();
let tree = tree.clone();
// TODO: consider passing global shutdown signal (might be overkill since we have timeouts)
// TODO: consider passing global shutdown signal (might be overkill
// since we have timeouts)
tokio::spawn(async move {
if let Err(e) = self_clone
.process_queue(
Expand All @@ -486,24 +487,6 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
estimated_slot, active_phase_end
);

// TODO: move (Jorrit low prio)
// Should be called every multiple times per epoch for every tree. It is
// tricky because we need to fetch both the Merkle tree and the queue
// (by default we just fetch the queue account).
info!("Checking for rollover eligibility");
for tree in &epoch_info.trees {
let mut rpc = self.rpc_pool.get_connection().await?;
if is_tree_ready_for_rollover(
&mut *rpc,
tree.tree_accounts.merkle_tree,
tree.tree_accounts.tree_type,
)
.await?
{
self.perform_rollover(&tree.tree_accounts).await?;
}
}

info!("Completed active work");
Ok(())
}
Expand Down Expand Up @@ -582,9 +565,22 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
tree.tree_accounts,
&transaction_builder,
epoch_pda.epoch,
);
// Check whether the tree is ready for rollover once per slot.
// Check in parallel with sending transactions.
if is_tree_ready_for_rollover(
&mut *rpc,
tree.tree_accounts.merkle_tree,
tree.tree_accounts.tree_type,
)
.await?;

.await?
{
info!("Starting {} rollover.", tree.tree_accounts.merkle_tree);
self.perform_rollover(&tree.tree_accounts).await?;
}
// Await the result of the batch transactions after the
// potential rollover.
let num_tx_sent = num_tx_sent.await?;
// Prometheus metrics
let chunk_duration = start_time.elapsed();
queue_metric_update(epoch_info.epoch, num_tx_sent, chunk_duration).await;
Expand Down Expand Up @@ -723,7 +719,7 @@ pub async fn run_service<R: RpcConnection, I: Indexer<R>>(
let rpc = rpc_pool.get_connection().await?;
fetch_trees(&*rpc).await
};

info!("Fetched trees: {:?}", trees);
while retry_count < config.max_retries {
debug!("Creating EpochManager (attempt {})", retry_count + 1);
match EpochManager::new(
Expand Down
41 changes: 31 additions & 10 deletions forester/src/rollover/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
.get_anchor_account::<StateMerkleTreeAccount>(&tree_pubkey)
.await?
.unwrap();
info!("Account: {:?}", account);
// let account_info = rpc.get_account(tree_pubkey).await?.unwrap();

let is_already_rolled_over =
account.metadata.rollover_metadata.rolledover_slot != u64::MAX;
if is_already_rolled_over {
Expand All @@ -68,14 +69,24 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
let threshold = ((1 << height) * account.metadata.rollover_metadata.rollover_threshold
/ 100) as usize;

// TODO: (fix) check to avoid processing Merkle trees with rollover threshold 0 which haven't processed any transactions
// let lamports_in_account_are_sufficient_for_rollover = account_info.lamports
// > account.metadata.rollover_metadata.rollover_fee * (1 << height);
Ok(merkle_tree.next_index() >= threshold)
}
TreeType::Address => {
let account = rpc
.get_anchor_account::<AddressMerkleTreeAccount>(&tree_pubkey)
.await?
.unwrap();
info!("Account: {:?}", account);
let queue_account = rpc
.get_anchor_account::<QueueAccount>(&account.metadata.associated_queue)
.await?
.unwrap();
// let account_info = rpc
// .get_account(account.metadata.associated_queue)
// .await?
// .unwrap();
let is_already_rolled_over =
account.metadata.rollover_metadata.rolledover_slot != u64::MAX;
if is_already_rolled_over {
Expand All @@ -90,15 +101,19 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
.await;

let height = 26;
let threshold = ((1 << height) * account.metadata.rollover_metadata.rollover_threshold
let threshold = ((1 << height)
* queue_account.metadata.rollover_metadata.rollover_threshold
/ 100) as usize;

// TODO: (fix) check to avoid processing Merkle trees with rollover threshold 0 which haven't processed any transactions
// current implementation is returns always true
// let lamports_in_account_are_sufficient_for_rollover = account_info.lamports
// > account.metadata.rollover_metadata.rollover_fee * (1 << height);
Ok(merkle_tree.next_index() >= threshold)
}
}
}

#[allow(dead_code)]
pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
config: Arc<ForesterConfig>,
rpc: &mut R,
Expand All @@ -109,7 +124,7 @@ pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
let new_merkle_tree_keypair = Keypair::new();
let new_cpi_signature_keypair = Keypair::new();

let rollover_signature = perform_state_merkle_tree_roll_over_forester(
let rollover_signature = perform_state_merkle_tree_rollover_forester(
&config.payer_keypair,
rpc,
&new_nullifier_queue_keypair,
Expand All @@ -120,7 +135,7 @@ pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
&Pubkey::default(),
)
.await?;
println!("Rollover signature: {:?}", rollover_signature);
info!("State rollover signature: {:?}", rollover_signature);

let state_bundle = StateMerkleTreeBundle {
// TODO: fetch correct fee when this property is used
Expand All @@ -140,7 +155,7 @@ pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
}

#[allow(clippy::too_many_arguments)]
pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
pub async fn perform_state_merkle_tree_rollover_forester<R: RpcConnection>(
payer: &Keypair,
context: &mut R,
new_queue_keypair: &Keypair,
Expand All @@ -165,7 +180,12 @@ pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
let transaction = Transaction::new_signed_with_payer(
&instructions,
Some(&payer.pubkey()),
&vec![&payer, &new_queue_keypair, &new_address_merkle_tree_keypair],
&vec![
&payer,
&new_queue_keypair,
&new_address_merkle_tree_keypair,
&new_cpi_context_keypair,
],
blockhash,
);
context.process_transaction(transaction).await
Expand All @@ -179,7 +199,7 @@ pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
) -> Result<(), ForesterError> {
let new_nullifier_queue_keypair = Keypair::new();
let new_merkle_tree_keypair = Keypair::new();
perform_address_merkle_tree_roll_over(
let rollover_signature = perform_address_merkle_tree_rollover(
&config.payer_keypair,
rpc,
&new_nullifier_queue_keypair,
Expand All @@ -188,6 +208,7 @@ pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
&tree_data.queue,
)
.await?;
info!("Address rollover signature: {:?}", rollover_signature);

indexer.lock().await.add_address_merkle_tree_accounts(
&new_merkle_tree_keypair,
Expand All @@ -197,7 +218,7 @@ pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
Ok(())
}

pub async fn perform_address_merkle_tree_roll_over<R: RpcConnection>(
pub async fn perform_address_merkle_tree_rollover<R: RpcConnection>(
payer: &Keypair,
context: &mut R,
new_queue_keypair: &Keypair,
Expand Down
106 changes: 83 additions & 23 deletions forester/tests/e2e_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use account_compression::{AddressMerkleTreeAccount, StateMerkleTreeAccount};
use forester::queue_helpers::fetch_queue_item_data;
use forester::rpc_pool::SolanaRpcPool;
use forester::run_pipeline;
Expand All @@ -11,6 +12,7 @@ use light_registry::{EpochPda, ForesterEpochPda};
use light_test_utils::e2e_test_env::E2ETestEnv;
use light_test_utils::indexer::TestIndexer;
use light_test_utils::test_env::EnvAccounts;
use log::info;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::native_token::LAMPORTS_PER_SOL;
use solana_sdk::pubkey::Pubkey;
Expand Down Expand Up @@ -116,7 +118,7 @@ async fn test_epoch_monitor_with_test_indexer_and_1_forester() {
{
for _ in 0..iterations {
env.transfer_sol(user_index).await;
env.create_address(None).await;
env.create_address(None, None).await;
}

// Asserting non-empty because transfer sol is not deterministic.
Expand Down Expand Up @@ -305,6 +307,17 @@ async fn test_epoch_monitor_with_2_foresters() {
.await
.unwrap();
env.compress_sol(user_index, balance).await;
// Create state and address trees which can be rolled over
env.create_address_tree(Some(0)).await;
env.create_state_tree(Some(0)).await;
let state_tree_with_rollover_threshold_0 = env.indexer.state_merkle_trees[1]
.accounts
.merkle_tree
.clone();
let address_tree_with_rollover_threshold_0 = env.indexer.address_merkle_trees[1]
.accounts
.merkle_tree
.clone();

let state_trees: Vec<StateMerkleTreeAccounts> = env
.indexer
Expand All @@ -318,14 +331,23 @@ async fn test_epoch_monitor_with_2_foresters() {
.iter()
.map(|x| x.accounts)
.collect();
let mut total_expected_work = 0;

// Two rollovers plus other work
let mut total_expected_work = 2;
{
let iterations = 5;
for i in 0..iterations {
println!("Round {} of {}", i, iterations);
env.transfer_sol(user_index).await;
let user_keypair = env.users[0].keypair.insecure_clone();
env.transfer_sol_deterministic(&user_keypair, &user_keypair.pubkey(), Some(1))
.await
.unwrap();
env.transfer_sol_deterministic(&user_keypair, &user_keypair.pubkey().clone(), Some(0))
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
env.create_address(None).await;
env.create_address(None, Some(1)).await;
env.create_address(None, Some(0)).await;
}
assert_queue_len(
&pool,
Expand Down Expand Up @@ -364,18 +386,19 @@ async fn test_epoch_monitor_with_2_foresters() {
// Wait for both foresters to report work for epoch 1
const TIMEOUT_DURATION: Duration = Duration::from_secs(360);
let mut total_processed = 0;
let result = timeout(TIMEOUT_DURATION, async {
let finish_epoch = 1;
let result: Result<usize, tokio::time::error::Elapsed> = timeout(TIMEOUT_DURATION, async {
loop {
tokio::select! {
Some(report) = work_report_receiver1.recv(), if !forester1_reported_work_for_epoch1 => {
total_processed += report.processed_items;
if report.epoch == 1 {
if report.epoch == finish_epoch {
forester1_reported_work_for_epoch1 = true;
}
}
Some(report) = work_report_receiver2.recv(), if !forester2_reported_work_for_epoch1 => {
total_processed += report.processed_items;
if report.epoch == 1 {
if report.epoch == finish_epoch {
forester2_reported_work_for_epoch1 = true;
}
}
Expand All @@ -401,34 +424,40 @@ async fn test_epoch_monitor_with_2_foresters() {
}
}

assert_trees_are_rollledover(
&pool,
&state_tree_with_rollover_threshold_0,
&address_tree_with_rollover_threshold_0,
)
.await;
// assert queues have been emptied
assert_queue_len(&pool, &state_trees, &address_trees, &mut 0, 0, false).await;
let mut rpc = pool.get_connection().await.unwrap();
let forester_pubkeys = [
config1.payer_keypair.pubkey(),
config2.payer_keypair.pubkey(),
];
// assert epoch 0
{
let total_processed_work = assert_foresters_registered(&forester_pubkeys[..], &mut rpc, 0)
.await
.unwrap();
assert_eq!(
total_processed_work, total_expected_work,
"Not all items were processed."
);
}

// assert that foresters registered for epoch 1 and 2 (no new work is emitted after epoch 0)
for epoch in 1..=2 {
// Assert that foresters have registered all processed epochs and the next epoch (+1)
for epoch in 0..=finish_epoch + 1 {
let total_processed_work =
assert_foresters_registered(&forester_pubkeys[..], &mut rpc, epoch)
.await
.unwrap();
assert_eq!(
total_processed_work, 0,
"Not all items were processed in prior epoch."
);
if epoch == 0 {
assert_eq!(
total_processed_work, total_expected_work,
"Not all items were processed."
);
} else {
assert_eq!(
total_processed_work, 0,
"Not all items were processed in prior epoch."
);
}
}

shutdown_sender1
.send(())
.expect("Failed to send shutdown signal to forester 1");
Expand All @@ -438,7 +467,38 @@ async fn test_epoch_monitor_with_2_foresters() {
service_handle1.await.unwrap().unwrap();
service_handle2.await.unwrap().unwrap();
}

pub async fn assert_trees_are_rollledover(
pool: &SolanaRpcPool<SolanaRpcConnection>,
state_tree_with_rollover_threshold_0: &Pubkey,
address_tree_with_rollover_threshold_0: &Pubkey,
) {
let mut rpc = pool.get_connection().await.unwrap();
let address_merkle_tree = rpc
.get_anchor_account::<AddressMerkleTreeAccount>(&address_tree_with_rollover_threshold_0)
.await
.unwrap()
.unwrap();
assert_ne!(
address_merkle_tree
.metadata
.rollover_metadata
.rolledover_slot,
u64::MAX,
"address_merkle_tree: {:?}",
address_merkle_tree
);
let state_merkle_tree = rpc
.get_anchor_account::<AddressMerkleTreeAccount>(&state_tree_with_rollover_threshold_0)
.await
.unwrap()
.unwrap();
assert_ne!(
state_merkle_tree.metadata.rollover_metadata.rolledover_slot,
u64::MAX,
"state_merkle_tree: {:?}",
state_merkle_tree
);
}
async fn assert_foresters_registered(
foresters: &[Pubkey],
rpc: &mut SolanaRpcConnection,
Expand Down
2 changes: 1 addition & 1 deletion forester/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub fn keypair_action_config() -> KeypairActionConfig {
mint_spl: Some(1.0),
transfer_spl: Some(1.0),
max_output_accounts: Some(3),
fee_assert: true,
fee_assert: false,
approve_spl: None,
revoke_spl: None,
freeze_spl: None,
Expand Down
Loading

0 comments on commit 78e3bfe

Please sign in to comment.