Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: added rollover logic to forester(once per slot) and test #1134

Merged
merged 3 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 18 additions & 22 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
let epoch_info_clone = epoch_info.clone();
let self_clone = self.clone();
let tree = tree.clone();
// TODO: consider passing global shutdown signal (might be overkill since we have timeouts)
// TODO: consider passing global shutdown signal (might be overkill
// since we have timeouts)
tokio::spawn(async move {
if let Err(e) = self_clone
.process_queue(
Expand All @@ -486,24 +487,6 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
estimated_slot, active_phase_end
);

// TODO: move (Jorrit low prio)
// Should be called every multiple times per epoch for every tree. It is
// tricky because we need to fetch both the Merkle tree and the queue
// (by default we just fetch the queue account).
info!("Checking for rollover eligibility");
for tree in &epoch_info.trees {
let mut rpc = self.rpc_pool.get_connection().await?;
if is_tree_ready_for_rollover(
&mut *rpc,
tree.tree_accounts.merkle_tree,
tree.tree_accounts.tree_type,
)
.await?
{
self.perform_rollover(&tree.tree_accounts).await?;
}
}

info!("Completed active work");
Ok(())
}
Expand Down Expand Up @@ -582,9 +565,22 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
tree.tree_accounts,
&transaction_builder,
epoch_pda.epoch,
);
// Check whether the tree is ready for rollover once per slot.
// Check in parallel with sending transactions.
if is_tree_ready_for_rollover(
&mut *rpc,
tree.tree_accounts.merkle_tree,
tree.tree_accounts.tree_type,
)
.await?;

.await?
{
info!("Starting {} rollover.", tree.tree_accounts.merkle_tree);
self.perform_rollover(&tree.tree_accounts).await?;
}
// Await the result of the batch transactions after the
// potential rollover.
let num_tx_sent = num_tx_sent.await?;
// Prometheus metrics
let chunk_duration = start_time.elapsed();
queue_metric_update(epoch_info.epoch, num_tx_sent, chunk_duration).await;
Expand Down Expand Up @@ -723,7 +719,7 @@ pub async fn run_service<R: RpcConnection, I: Indexer<R>>(
let rpc = rpc_pool.get_connection().await?;
fetch_trees(&*rpc).await
};

info!("Fetched trees: {:?}", trees);
while retry_count < config.max_retries {
debug!("Creating EpochManager (attempt {})", retry_count + 1);
match EpochManager::new(
Expand Down
41 changes: 31 additions & 10 deletions forester/src/rollover/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
.get_anchor_account::<StateMerkleTreeAccount>(&tree_pubkey)
.await?
.unwrap();
info!("Account: {:?}", account);
// let account_info = rpc.get_account(tree_pubkey).await?.unwrap();

let is_already_rolled_over =
account.metadata.rollover_metadata.rolledover_slot != u64::MAX;
if is_already_rolled_over {
Expand All @@ -68,14 +69,24 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
let threshold = ((1 << height) * account.metadata.rollover_metadata.rollover_threshold
/ 100) as usize;

// TODO: (fix) check to avoid processing Merkle trees with rollover threshold 0 which haven't processed any transactions
// let lamports_in_account_are_sufficient_for_rollover = account_info.lamports
// > account.metadata.rollover_metadata.rollover_fee * (1 << height);
Ok(merkle_tree.next_index() >= threshold)
}
TreeType::Address => {
let account = rpc
.get_anchor_account::<AddressMerkleTreeAccount>(&tree_pubkey)
.await?
.unwrap();
info!("Account: {:?}", account);
let queue_account = rpc
.get_anchor_account::<QueueAccount>(&account.metadata.associated_queue)
.await?
.unwrap();
// let account_info = rpc
// .get_account(account.metadata.associated_queue)
// .await?
// .unwrap();
Comment on lines +86 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it on purpose so that we know that we need to take the queue account info not the Merkle tree.
This is different for state and address Merkle trees because for address Merkle trees users only interact with the queue the tree is only updated by the forester.

let is_already_rolled_over =
account.metadata.rollover_metadata.rolledover_slot != u64::MAX;
if is_already_rolled_over {
Expand All @@ -90,15 +101,19 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
.await;

let height = 26;
let threshold = ((1 << height) * account.metadata.rollover_metadata.rollover_threshold
let threshold = ((1 << height)
* queue_account.metadata.rollover_metadata.rollover_threshold
/ 100) as usize;

// TODO: (fix) check to avoid processing Merkle trees with rollover threshold 0 which haven't processed any transactions
// current implementation is returns always true
// let lamports_in_account_are_sufficient_for_rollover = account_info.lamports
// > account.metadata.rollover_metadata.rollover_fee * (1 << height);
Ok(merkle_tree.next_index() >= threshold)
}
}
}

#[allow(dead_code)]
pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
config: Arc<ForesterConfig>,
rpc: &mut R,
Expand All @@ -109,7 +124,7 @@ pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
let new_merkle_tree_keypair = Keypair::new();
let new_cpi_signature_keypair = Keypair::new();

let rollover_signature = perform_state_merkle_tree_roll_over_forester(
let rollover_signature = perform_state_merkle_tree_rollover_forester(
&config.payer_keypair,
rpc,
&new_nullifier_queue_keypair,
Expand All @@ -120,7 +135,7 @@ pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
&Pubkey::default(),
)
.await?;
println!("Rollover signature: {:?}", rollover_signature);
info!("State rollover signature: {:?}", rollover_signature);

let state_bundle = StateMerkleTreeBundle {
// TODO: fetch correct fee when this property is used
Expand All @@ -140,7 +155,7 @@ pub async fn rollover_state_merkle_tree<R: RpcConnection, I: Indexer<R>>(
}

#[allow(clippy::too_many_arguments)]
pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
pub async fn perform_state_merkle_tree_rollover_forester<R: RpcConnection>(
payer: &Keypair,
context: &mut R,
new_queue_keypair: &Keypair,
Expand All @@ -165,7 +180,12 @@ pub async fn perform_state_merkle_tree_roll_over_forester<R: RpcConnection>(
let transaction = Transaction::new_signed_with_payer(
&instructions,
Some(&payer.pubkey()),
&vec![&payer, &new_queue_keypair, &new_address_merkle_tree_keypair],
&vec![
&payer,
&new_queue_keypair,
&new_address_merkle_tree_keypair,
&new_cpi_context_keypair,
],
blockhash,
);
context.process_transaction(transaction).await
Expand All @@ -179,7 +199,7 @@ pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
) -> Result<(), ForesterError> {
let new_nullifier_queue_keypair = Keypair::new();
let new_merkle_tree_keypair = Keypair::new();
perform_address_merkle_tree_roll_over(
let rollover_signature = perform_address_merkle_tree_rollover(
&config.payer_keypair,
rpc,
&new_nullifier_queue_keypair,
Expand All @@ -188,6 +208,7 @@ pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
&tree_data.queue,
)
.await?;
info!("Address rollover signature: {:?}", rollover_signature);

indexer.lock().await.add_address_merkle_tree_accounts(
&new_merkle_tree_keypair,
Expand All @@ -197,7 +218,7 @@ pub async fn rollover_address_merkle_tree<R: RpcConnection, I: Indexer<R>>(
Ok(())
}

pub async fn perform_address_merkle_tree_roll_over<R: RpcConnection>(
pub async fn perform_address_merkle_tree_rollover<R: RpcConnection>(
payer: &Keypair,
context: &mut R,
new_queue_keypair: &Keypair,
Expand Down
105 changes: 82 additions & 23 deletions forester/tests/e2e_test.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use account_compression::AddressMerkleTreeAccount;
use forester::queue_helpers::fetch_queue_item_data;
use forester::rpc_pool::SolanaRpcPool;
use forester::run_pipeline;
Expand Down Expand Up @@ -116,7 +117,7 @@ async fn test_epoch_monitor_with_test_indexer_and_1_forester() {
{
for _ in 0..iterations {
env.transfer_sol(user_index).await;
env.create_address(None).await;
env.create_address(None, None).await;
}

// Asserting non-empty because transfer sol is not deterministic.
Expand Down Expand Up @@ -305,6 +306,17 @@ async fn test_epoch_monitor_with_2_foresters() {
.await
.unwrap();
env.compress_sol(user_index, balance).await;
// Create state and address trees which can be rolled over
env.create_address_tree(Some(0)).await;
env.create_state_tree(Some(0)).await;
let state_tree_with_rollover_threshold_0 = env.indexer.state_merkle_trees[1]
.accounts
.merkle_tree
.clone();
let address_tree_with_rollover_threshold_0 = env.indexer.address_merkle_trees[1]
.accounts
.merkle_tree
.clone();

let state_trees: Vec<StateMerkleTreeAccounts> = env
.indexer
Expand All @@ -318,14 +330,23 @@ async fn test_epoch_monitor_with_2_foresters() {
.iter()
.map(|x| x.accounts)
.collect();
let mut total_expected_work = 0;

// Two rollovers plus other work
let mut total_expected_work = 2;
{
let iterations = 5;
for i in 0..iterations {
println!("Round {} of {}", i, iterations);
env.transfer_sol(user_index).await;
let user_keypair = env.users[0].keypair.insecure_clone();
env.transfer_sol_deterministic(&user_keypair, &user_keypair.pubkey(), Some(1))
.await
.unwrap();
env.transfer_sol_deterministic(&user_keypair, &user_keypair.pubkey().clone(), Some(0))
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
env.create_address(None).await;
env.create_address(None, Some(1)).await;
env.create_address(None, Some(0)).await;
}
assert_queue_len(
&pool,
Expand Down Expand Up @@ -364,18 +385,19 @@ async fn test_epoch_monitor_with_2_foresters() {
// Wait for both foresters to report work for epoch 1
const TIMEOUT_DURATION: Duration = Duration::from_secs(360);
let mut total_processed = 0;
let result = timeout(TIMEOUT_DURATION, async {
let finish_epoch = 1;
let result: Result<usize, tokio::time::error::Elapsed> = timeout(TIMEOUT_DURATION, async {
loop {
tokio::select! {
Some(report) = work_report_receiver1.recv(), if !forester1_reported_work_for_epoch1 => {
total_processed += report.processed_items;
if report.epoch == 1 {
if report.epoch == finish_epoch {
forester1_reported_work_for_epoch1 = true;
}
}
Some(report) = work_report_receiver2.recv(), if !forester2_reported_work_for_epoch1 => {
total_processed += report.processed_items;
if report.epoch == 1 {
if report.epoch == finish_epoch {
forester2_reported_work_for_epoch1 = true;
}
}
Expand All @@ -401,34 +423,40 @@ async fn test_epoch_monitor_with_2_foresters() {
}
}

assert_trees_are_rollledover(
&pool,
&state_tree_with_rollover_threshold_0,
&address_tree_with_rollover_threshold_0,
)
.await;
// assert queues have been emptied
assert_queue_len(&pool, &state_trees, &address_trees, &mut 0, 0, false).await;
let mut rpc = pool.get_connection().await.unwrap();
let forester_pubkeys = [
config1.payer_keypair.pubkey(),
config2.payer_keypair.pubkey(),
];
// assert epoch 0
{
let total_processed_work = assert_foresters_registered(&forester_pubkeys[..], &mut rpc, 0)
.await
.unwrap();
assert_eq!(
total_processed_work, total_expected_work,
"Not all items were processed."
);
}

// assert that foresters registered for epoch 1 and 2 (no new work is emitted after epoch 0)
for epoch in 1..=2 {
// Assert that foresters have registered all processed epochs and the next epoch (+1)
for epoch in 0..=finish_epoch + 1 {
let total_processed_work =
assert_foresters_registered(&forester_pubkeys[..], &mut rpc, epoch)
.await
.unwrap();
assert_eq!(
total_processed_work, 0,
"Not all items were processed in prior epoch."
);
if epoch == 0 {
assert_eq!(
total_processed_work, total_expected_work,
"Not all items were processed."
);
} else {
assert_eq!(
total_processed_work, 0,
"Not all items were processed in prior epoch."
);
}
}

shutdown_sender1
.send(())
.expect("Failed to send shutdown signal to forester 1");
Expand All @@ -438,7 +466,38 @@ async fn test_epoch_monitor_with_2_foresters() {
service_handle1.await.unwrap().unwrap();
service_handle2.await.unwrap().unwrap();
}

pub async fn assert_trees_are_rollledover(
pool: &SolanaRpcPool<SolanaRpcConnection>,
state_tree_with_rollover_threshold_0: &Pubkey,
address_tree_with_rollover_threshold_0: &Pubkey,
) {
let mut rpc = pool.get_connection().await.unwrap();
let address_merkle_tree = rpc
.get_anchor_account::<AddressMerkleTreeAccount>(&address_tree_with_rollover_threshold_0)
.await
.unwrap()
.unwrap();
assert_ne!(
address_merkle_tree
.metadata
.rollover_metadata
.rolledover_slot,
u64::MAX,
"address_merkle_tree: {:?}",
address_merkle_tree
);
let state_merkle_tree = rpc
.get_anchor_account::<AddressMerkleTreeAccount>(&state_tree_with_rollover_threshold_0)
.await
.unwrap()
.unwrap();
assert_ne!(
state_merkle_tree.metadata.rollover_metadata.rolledover_slot,
u64::MAX,
"state_merkle_tree: {:?}",
state_merkle_tree
);
}
async fn assert_foresters_registered(
foresters: &[Pubkey],
rpc: &mut SolanaRpcConnection,
Expand Down
2 changes: 1 addition & 1 deletion forester/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub fn keypair_action_config() -> KeypairActionConfig {
mint_spl: Some(1.0),
transfer_spl: Some(1.0),
max_output_accounts: Some(3),
fee_assert: true,
fee_assert: false,
approve_spl: None,
revoke_spl: None,
freeze_spl: None,
Expand Down
2 changes: 1 addition & 1 deletion test-programs/registry-test/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ async fn test_register_and_update_forester_pda() {
// create work 1 item in address and nullifier queue each
let (mut state_merkle_tree_bundle, mut address_merkle_tree, mut rpc) = {
let mut e2e_env = init_program_test_env(rpc, &env).await;
e2e_env.create_address(None).await;
e2e_env.create_address(None, None).await;
e2e_env
.compress_sol_deterministic(&forester_keypair, 1_000_000, None)
.await;
Expand Down
Loading