Skip to content

Commit

Permalink
Refine the schedule of the cleaning task
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 committed Sep 14, 2023
1 parent b63b586 commit 55273ab
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
5 changes: 3 additions & 2 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ impl MiningManager {

pub fn expire_low_priority_transactions(&self, consensus: &dyn ConsensusApi) {
// very fine-grained write locks on mempool
debug!("<> Expiring low priority transactions...");

// orphan pool
if let Err(err) = self.mempool.write().expire_orphan_low_priority_transactions(consensus) {
Expand Down Expand Up @@ -494,9 +495,9 @@ impl MiningManager {
// Prepare a vector with clones of high priority transactions found in the mempool
let mempool = self.mempool.read();
if mempool.has_transactions_with_priority(Priority::High) {
info!("Revalidating high priority transactions...");
debug!("<> Revalidating high priority transactions...");
} else {
debug!("Revalidating high priority transactions found no transactions");
debug!("<> Revalidating high priority transactions found no transactions");
return;
}
let transactions = mempool.all_transactions_with_priority(Priority::High);
Expand Down
8 changes: 7 additions & 1 deletion protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ impl FlowContext {
let mining_manager = self.mining_manager().clone();
let consensus_clone = consensus.clone();
let context = self.clone();
debug!("<> Starting cleaning task #{}...", self.cleaning_count().await);
tokio::spawn(async move {
mining_manager.clone().expire_low_priority_transactions(&consensus_clone).await;
if context.should_rebroadcast().await {
Expand All @@ -396,6 +397,7 @@ impl FlowContext {
}
}
context.cleaning_is_done().await;
debug!("<> Cleaning task is done");
});
}

Expand Down Expand Up @@ -447,7 +449,11 @@ impl FlowContext {

/// Returns true if the time has come for a rebroadcast of the mempool high priority transactions.
async fn should_rebroadcast(&self) -> bool {
self.transactions_spread.write().await.should_rebroadcast()
self.transactions_spread.read().await.should_rebroadcast()
}

async fn cleaning_count(&self) -> u64 {
self.transactions_spread.read().await.cleaning_count()
}

async fn cleaning_is_done(&self) {
Expand Down
17 changes: 11 additions & 6 deletions protocol/flows/src/flowcontext/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,27 @@ impl TransactionsSpread {
if self.cleaning_task_running || Instant::now() < self.last_cleaning_time + CLEANING_TASK_INTERVAL {
return false;
}
// Keep the launching times aligned to exact intervals
let call_time = Instant::now();
while self.last_cleaning_time + CLEANING_TASK_INTERVAL < call_time {
self.last_cleaning_time += CLEANING_TASK_INTERVAL;
}
self.cleaning_count += 1;
self.cleaning_task_running = true;
true
}

/// Returns true if the time for a rebroadcast of the mempool high priority transactions has come.
pub fn should_rebroadcast(&self) -> bool {
Instant::now() >= self.last_cleaning_time + CLEANING_TASK_INTERVAL && self.cleaning_count % REBROADCAST_FREQUENCY == 0
self.cleaning_count % REBROADCAST_FREQUENCY == 0
}

pub fn cleaning_count(&self) -> u64 {
self.cleaning_count
}

pub fn cleaning_is_done(&mut self) {
assert!(self.cleaning_task_running, "no stop without a matching start");
// Keep launching the cleaning task respecting the exact intervals
while self.last_cleaning_time <= Instant::now() {
self.last_cleaning_time += CLEANING_TASK_INTERVAL;
}
self.cleaning_count += 1;
self.cleaning_task_running = false;
}

Expand Down

0 comments on commit 55273ab

Please sign in to comment.