Skip to content

Commit

Permalink
Fix add_new_tree
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeytimoshin committed Sep 11, 2024
1 parent fc6dc98 commit d983f1d
Showing 1 changed file with 40 additions and 36 deletions.
76 changes: 40 additions & 36 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,53 +194,57 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
}

async fn add_new_tree(&self, new_tree: TreeAccounts) -> Result<()> {
info!("Adding new tree: {:?}", new_tree);
let mut trees = self.trees.lock().await;
trees.push(new_tree);
drop(trees);

info!("New tree added to the list of trees");

let (current_slot, current_epoch) = self.get_current_slot_and_epoch().await?;
let phases = get_epoch_phases(&self.protocol_config, current_epoch);

// Check if we're currently in the active phase
if current_slot >= phases.active.start && current_slot < phases.active.end {
info!("Currently in active phase. Attempting to process the new tree immediately.");
info!("Recovering regitration info...");
if let Ok(mut epoch_info) = self.recover_registration_info(current_epoch).await {
info!("Recovered registration info for current epoch");
let tree_schedule = TreeForesterSchedule::new_with_schedule(
&new_tree,
current_slot,
&epoch_info.forester_epoch_pda,
&epoch_info.epoch_pda,
);
epoch_info.trees.push(tree_schedule.clone());

// Check if we're currently processing an epoch
if let Some(processing_flag) = self.processing_epochs.get(&current_epoch) {
if processing_flag.load(Ordering::SeqCst) {
let phases = get_epoch_phases(&self.protocol_config, current_epoch);
let self_clone = Arc::new(self.clone());

if current_slot >= phases.active.start && current_slot < phases.active.end {
if let Ok(mut epoch_info) = self.recover_registration_info(current_epoch).await
{
let tree_schedule = TreeForesterSchedule::new_with_schedule(
&new_tree,
current_slot,
info!("Spawning task to process new tree in current epoch");
tokio::spawn(async move {
if let Err(e) = self_clone
.process_queue(
&epoch_info.epoch,
&epoch_info.forester_epoch_pda,
&epoch_info.epoch_pda,
);
epoch_info.trees.push(tree_schedule.clone());

let self_clone = Arc::new(self.clone());
tokio::spawn(async move {
if let Err(e) = self_clone
.process_queue(
&epoch_info.epoch,
&epoch_info.forester_epoch_pda,
tree_schedule,
)
.await
{
error!("Error processing queue for new tree: {:?}", e);
}
});

info!(
"Injected new tree into current epoch {}: {:?}",
current_epoch, new_tree
);
tree_schedule,
)
.await
{
error!("Error processing queue for new tree: {:?}", e);
} else {
warn!("Failed to retrieve current epoch info for injecting new tree");
info!("Successfully processed new tree in current epoch");
}
} else {
info!("New tree will be included in the next epoch");
}
});

info!(
"Injected new tree into current epoch {}: {:?}",
current_epoch, new_tree
);
} else {
warn!("Failed to retrieve current epoch info for processing new tree");
}
} else {
info!("Not in active phase. New tree will be processed in the next active phase");
}

Ok(())
Expand Down

0 comments on commit d983f1d

Please sign in to comment.