From 016f56d8710d07c53f6ee571f6c0718a957ab6d9 Mon Sep 17 00:00:00 2001 From: pauldelucia Date: Mon, 25 Nov 2024 18:23:09 +0700 Subject: [PATCH 1/3] fix: limit 100 in dpns end time query --- .../contested_names/query_ending_times.rs | 60 ++++++++++--------- src/logging.rs | 2 +- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/backend_task/contested_names/query_ending_times.rs b/src/backend_task/contested_names/query_ending_times.rs index 6c927ba..8263809 100644 --- a/src/backend_task/contested_names/query_ending_times.rs +++ b/src/backend_task/contested_names/query_ending_times.rs @@ -1,7 +1,6 @@ use crate::context::AppContext; use crate::model::proof_log_item::{ProofLogItem, RequestType}; use chrono::{DateTime, Duration, Utc}; -use dash_sdk::dpp::data_contract::accessors::v0::DataContractV0Getters; use dash_sdk::dpp::voting::vote_polls::contested_document_resource_vote_poll::ContestedDocumentResourceVotePoll; use dash_sdk::dpp::voting::vote_polls::VotePoll; use dash_sdk::drive::query::VotePollsByEndDateDriveQuery; @@ -22,54 +21,44 @@ impl AppContext { let now: DateTime = Utc::now(); let start_time_dt = now - Duration::weeks(2); let end_time_dt = now + Duration::weeks(2); - let start_time = Some((start_time_dt.timestamp_millis() as u64, true)); + let mut start_time = Some((start_time_dt.timestamp_millis() as u64, true)); let end_time = Some((end_time_dt.timestamp_millis() as u64, true)); - let end_time_query = VotePollsByEndDateDriveQuery { - start_time, - end_time, - limit: None, - offset: None, - order_ascending: true, - }; + let mut contests_end_times = BTreeMap::new(); const MAX_RETRIES: usize = 3; let mut retries = 0; - let mut contests_end_times = BTreeMap::new(); - loop { - match VotePoll::fetch_many(&sdk, end_time_query.clone()).await { + let end_time_query = VotePollsByEndDateDriveQuery { + start_time, + end_time, + limit: Some(100), + offset: None, + order_ascending: true, + }; + + let new_end_times = match VotePoll::fetch_many(&sdk, end_time_query.clone()).await { Ok(vote_polls) => { + let mut end_times = BTreeMap::new(); for (timestamp, vote_poll_list) in vote_polls { let contests = vote_poll_list.into_iter().filter_map(|vote_poll| { let VotePoll::ContestedDocumentResourceVotePoll( ContestedDocumentResourceVotePoll { - contract_id, - document_type_name, - index_name, + contract_id: _, + document_type_name: _, + index_name: _, index_values, }, ) = vote_poll; - if contract_id != self.dpns_contract.id() { - return None; - } - if document_type_name != "domain" { - return None; - } - if index_name != "parentNameAndLabel" { - return None; - } - if index_values.len() != 2 { - return None; - } + index_values .get(1) .and_then(|a| a.to_str().ok().map(|a| (a.to_string(), timestamp))) }); - contests_end_times.extend(contests); + end_times.extend(contests); } - break; + end_times } Err(e) => { tracing::error!("Error fetching vote polls: {}", e); @@ -140,6 +129,19 @@ impl AppContext { return Err(format!("Error fetching vote polls: {}", e)); } } + }; + + contests_end_times.extend(new_end_times.clone()); + + if new_end_times.len() == 0 { + break; + } + + let last_found_ending_time = new_end_times.values().max(); + if let Some(last_found_ending_time) = last_found_ending_time { + start_time = Some((*last_found_ending_time, false)); + } else { + break; } } diff --git a/src/logging.rs b/src/logging.rs index 37d6409..3fe2f55 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -13,7 +13,7 @@ pub fn initialize_logger() { }; let filter = EnvFilter::try_new( - "error,dash_sdk=debug,tenderdash_abci=debug,drive=debug,drive_proof_verifier=debug,rs_dapi_client=debug", + "error,info,dash_sdk=debug,tenderdash_abci=debug,drive=debug,drive_proof_verifier=debug,rs_dapi_client=debug", ) .unwrap_or_else(|e| panic!("Failed to create EnvFilter: {:?}", e)); From 67e368d082aff7647d29fed63b6af9a6ac2e97b5 Mon Sep 17 00:00:00 2001 From: pauldelucia Date: Mon, 25 Nov 2024 18:24:31 +0700 Subject: [PATCH 2/3] remove info log --- src/logging.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/logging.rs b/src/logging.rs index 3fe2f55..37d6409 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -13,7 +13,7 @@ pub fn initialize_logger() { }; let filter = EnvFilter::try_new( - "error,info,dash_sdk=debug,tenderdash_abci=debug,drive=debug,drive_proof_verifier=debug,rs_dapi_client=debug", + "error,dash_sdk=debug,tenderdash_abci=debug,drive=debug,drive_proof_verifier=debug,rs_dapi_client=debug", ) .unwrap_or_else(|e| panic!("Failed to create EnvFilter: {:?}", e)); From 00d41179db84f78465131ff966796dc645c01818 Mon Sep 17 00:00:00 2001 From: pauldelucia Date: Tue, 26 Nov 2024 19:03:07 +0700 Subject: [PATCH 3/3] fix: we were repeatedly querying dpns end times and contenders --- .../query_dpns_contested_resources.rs | 158 +++++++++--------- 1 file changed, 79 insertions(+), 79 deletions(-) diff --git a/src/backend_task/contested_names/query_dpns_contested_resources.rs b/src/backend_task/contested_names/query_dpns_contested_resources.rs index 3a66b32..a1659aa 100644 --- a/src/backend_task/contested_names/query_dpns_contested_resources.rs +++ b/src/backend_task/contested_names/query_dpns_contested_resources.rs @@ -29,6 +29,7 @@ impl AppContext { }; const MAX_RETRIES: usize = 3; let mut start_at_value = None; + let mut names_to_be_updated = Vec::new(); loop { let query = VotePollsByDocumentTypeQuery { contract_id: data_contract.id(), @@ -140,11 +141,13 @@ impl AppContext { let last_found_name = contested_resources_as_strings.last().unwrap().clone(); - let names_to_be_updated = self + let new_names_to_be_updated = self .db .insert_name_contests_as_normalized_names(contested_resources_as_strings, &self) .map_err(|e| format!("Contested resource query failed. Failed to insert name contests into database: {}", e.to_string()))?; + names_to_be_updated.extend(new_names_to_be_updated); + sender.send(TaskResult::Refresh).await.map_err(|e| { format!( "Contested resource query failed. Sender failed to send TaskResult: {}", @@ -152,94 +155,91 @@ impl AppContext { ) })?; - // Create a semaphore with 15 permits - let semaphore = Arc::new(Semaphore::new(24)); + if contested_resources_len < 100 { + break; + } + start_at_value = Some((Value::Text(last_found_name), false)) + } - let mut handles = Vec::new(); + // Create a semaphore with 15 permits + let semaphore = Arc::new(Semaphore::new(24)); - let handle = { - let semaphore = semaphore.clone(); - let sdk = sdk.clone(); - let sender = sender.clone(); - let self_ref = self.clone(); + let mut handles = Vec::new(); - tokio::spawn(async move { - // Acquire a permit from the semaphore - let _permit: OwnedSemaphorePermit = semaphore.acquire_owned().await.unwrap(); + let handle = { + let semaphore = semaphore.clone(); + let sdk = sdk.clone(); + let sender = sender.clone(); + let self_ref = self.clone(); - match self_ref.query_dpns_ending_times(sdk, sender.clone()).await { - Ok(_) => { - // Send a refresh message if the query succeeded - sender - .send(TaskResult::Refresh) - .await - .expect("expected to send refresh"); - } - Err(e) => { - tracing::error!("Error querying dpns end times: {}", e); - sender - .send(TaskResult::Error(e)) - .await - .expect("expected to send error"); - } - } - }) - }; + tokio::spawn(async move { + // Acquire a permit from the semaphore + let _permit: OwnedSemaphorePermit = semaphore.acquire_owned().await.unwrap(); - handles.push(handle); + match self_ref.query_dpns_ending_times(sdk, sender.clone()).await { + Ok(_) => { + // Send a refresh message if the query succeeded + sender + .send(TaskResult::Refresh) + .await + .expect("expected to send refresh"); + } + Err(e) => { + tracing::error!("Error querying dpns end times: {}", e); + sender + .send(TaskResult::Error(e)) + .await + .expect("expected to send error"); + } + } + }) + }; - for name in names_to_be_updated { - // Clone the semaphore, sdk, and sender for each task - let semaphore = semaphore.clone(); - let sdk = sdk.clone(); - let sender = sender.clone(); - let self_ref = self.clone(); // Assuming self is cloneable - - // Spawn each task with a permit from the semaphore - let handle = tokio::spawn(async move { - // Acquire a permit from the semaphore - let _permit: OwnedSemaphorePermit = semaphore.acquire_owned().await.unwrap(); - - // Perform the query - match self_ref - .query_dpns_vote_contenders(&name, &sdk, sender.clone()) - .await - { - Ok(_) => { - // Send a refresh message if the query succeeded - sender - .send(TaskResult::Refresh) - .await - .expect("expected to send refresh"); - } - Err(e) => { - tracing::error!( - "Error querying dpns vote contenders for {}: {}", - name, - e - ); - sender - .send(TaskResult::Error(e)) - .await - .expect("expected to send error"); - } + handles.push(handle); + + for name in names_to_be_updated { + // Clone the semaphore, sdk, and sender for each task + let semaphore = semaphore.clone(); + let sdk = sdk.clone(); + let sender = sender.clone(); + let self_ref = self.clone(); // Assuming self is cloneable + + // Spawn each task with a permit from the semaphore + let handle = tokio::spawn(async move { + // Acquire a permit from the semaphore + let _permit: OwnedSemaphorePermit = semaphore.acquire_owned().await.unwrap(); + + // Perform the query + match self_ref + .query_dpns_vote_contenders(&name, &sdk, sender.clone()) + .await + { + Ok(_) => { + // Send a refresh message if the query succeeded + sender + .send(TaskResult::Refresh) + .await + .expect("expected to send refresh"); + } + Err(e) => { + tracing::error!("Error querying dpns vote contenders for {}: {}", name, e); + sender + .send(TaskResult::Error(e)) + .await + .expect("expected to send error"); } - }); + } + }); - // Collect all task handles - handles.push(handle); - } + // Collect all task handles + handles.push(handle); + } - // Await all tasks - for handle in handles { - if let Err(e) = handle.await { - tracing::error!("Task failed: {:?}", e); - } + // Await all tasks + for handle in handles { + if let Err(e) = handle.await { + tracing::error!("Task failed: {:?}", e); } - if contested_resources_len < 100 { - break; - } - start_at_value = Some((Value::Text(last_found_name), false)) } sender