Skip to content

Commit

Permalink
fix: many debugging fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Oct 11, 2024
1 parent 15fda94 commit 05b2443
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 43 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ moka = {version = "0.12.7", features = ["future"]}
num = {version = "0.4.3", features = ["default", "num-bigint", "serde"]}
num-format = "*"
rand = "0.8.0"
ringbuffer = "0.15.0"
serde = "1.0.203"
serde_cbor = "0.11.2"
serde_json = "1.0.122"
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod cli;
mod server;
mod sharechain;

#[tokio::main]
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
Cli::parse().handle_command(Shutdown::new().to_signal()).await?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/server/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub mod error;
pub mod p2pool;
pub mod util;

pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(20000);
72 changes: 55 additions & 17 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,21 @@ where S: ShareChain
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let timer = Instant::now();
warn!(target: LOG_TARGET, "dbg 2. {}", timer.elapsed().as_millis());
match share_chain.submit_block(block).await {
Ok(_) => {
warn!(target: LOG_TARGET, "dbg 2. {}", timer.elapsed().as_millis());
self.stats_store
.inc(&algo_stat_key(pow_algo, MINER_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
warn!(target: LOG_TARGET, "dbg 2. {}", timer.elapsed().as_millis());
let res = self
.p2p_client
.broadcast_block(block)
.await
.map_err(|error| Status::internal(error.to_string()));
warn!(target: LOG_TARGET, "dbg 2. {}", timer.elapsed().as_millis());
if res.is_ok() {
info!(target: LOG_TARGET, "Broadcast new block: {:?}", block.hash.to_hex());
}
Expand Down Expand Up @@ -176,7 +181,7 @@ where S: ShareChain
&self,
request: Request<GetNewBlockRequest>,
) -> Result<Response<GetNewBlockResponse>, Status> {
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT + Duration::from_secs(5);
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT;

let result = timeout(timeout_duration, async {
let timer = Instant::now();
Expand Down Expand Up @@ -272,20 +277,29 @@ where S: ShareChain
.insert(height, actual_diff),
};
let min_difficulty = min_difficulty(&self.consensus_manager, pow_algo, height);
let mut target_difficulty = Difficulty::from_u64(
miner_data
.target_difficulty
.checked_div(SHARE_COUNT)
.expect("Should only fail on div by 0"),
)
.map_err(|error| {
error!(target: LOG_TARGET, "Failed to get target difficulty: {error:?}");
Status::internal(format!("Failed to get target difficulty: {}", error))
})?;
let chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let mut target_difficulty = chain
.get_target_difficulty()
.await
.map_err(|e| Status::internal("Could not get target difficutly"))?;

if target_difficulty < min_difficulty {
target_difficulty = min_difficulty;
}

if target_difficulty > actual_diff {
warn!(
target: LOG_TARGET,
"Target difficulty is higher than actual difficulty. Target: {}, Actual: {}",
target_difficulty,
actual_diff
);
// Never go higher than the network.
target_difficulty = actual_diff;
}
if let Some(miner_data) = response.miner_data.as_mut() {
miner_data.target_difficulty = target_difficulty.as_u64();
}
Expand Down Expand Up @@ -318,14 +332,18 @@ where S: ShareChain
&self,
request: Request<SubmitBlockRequest>,
) -> Result<Response<SubmitBlockResponse>, Status> {
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT + Duration::from_secs(5);
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT;

let result = timeout(timeout_duration, async {
let timer = Instant::now();
// Only one submit at a time
let _permit = self.submit_block_semaphore.acquire().await;
// if self.submit_block_semaphore.available_permits() == 0 {
// return Err(Status::resource_exhausted("submit_block semaphore is full"));
// }
// // Only one submit at a time
// let _permit = self.submit_block_semaphore.acquire().await;
debug!(target: LOG_TARGET, "submit_block permit acquired: {}", timer.elapsed().as_millis());

warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
debug!("Trace - getting grpc fields");
// get all grpc request related data
let grpc_block = request.get_ref();
Expand All @@ -346,6 +364,7 @@ where S: ShareChain
})?)
.ok_or_else(|| Status::internal("invalid block header pow algo in request"))?;

warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
debug!(target: LOG_TARGET, "Trace - getting new block from share chain: {}", timer.elapsed().as_millis());
// get new share chain block
let pow_algo = match grpc_pow_algo {
Expand All @@ -361,7 +380,8 @@ where S: ShareChain
.await
.map_err(|error| Status::internal(error.to_string()))?;

let origin_block_header = &&block.original_block_header.clone();
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
let origin_block_header = &&block.original_block_header.clone();

debug!(target: LOG_TARGET, "Trace - getting block difficulty: {}", timer.elapsed().as_millis());
// Check block's difficulty compared to the latest network one to increase the probability
Expand All @@ -383,6 +403,7 @@ where S: ShareChain
"Submitted {} block difficulty: {}",
origin_block_header.pow.pow_algo, request_block_difficulty
);
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
// TODO: Cache this so that we don't ask each time. If we have a block we should not
// waste time before submitting it, or we might lose a share
// let mut network_difficulty_stream = self
Expand All @@ -406,6 +427,7 @@ where S: ShareChain
// }
// }
debug!(target: LOG_TARGET, "Trace - getting network difficulty: {}", timer.elapsed().as_millis());
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
let network_difficulty = match origin_block_header.pow.pow_algo {
PowAlgorithm::Sha3x => self
.sha3_block_height_difficulty_cache
Expand All @@ -422,6 +444,7 @@ where S: ShareChain
.copied()
.unwrap_or_else(Difficulty::min),
};
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
let network_difficulty_matches = request_block_difficulty >= network_difficulty;
debug!(target: LOG_TARGET, "Trace - saving max difficulty: {}", timer.elapsed().as_millis());
let mut max_difficulty = self.stats_max_difficulty_since_last_success.write().await;
Expand All @@ -432,15 +455,19 @@ where S: ShareChain
block.achieved_difficulty = request_block_difficulty;

debug!(target: LOG_TARGET, "Trace - checking if can submit to main chain: {}", timer.elapsed().as_millis());
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
if network_difficulty_matches {
// submit block to base node
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
let (metadata, extensions, _inner) = request.into_parts();
info!(target: LOG_TARGET, "🔗 Submitting block {} to base node...", origin_block_header.hash());

let grpc_request = Request::from_parts(metadata, extensions, grpc_request_payload);
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
match self.client.write().await.submit_block(grpc_request).await {
Ok(_resp) => {
*max_difficulty = Difficulty::min();
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
self.stats_store
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
Expand All @@ -458,6 +485,7 @@ where S: ShareChain
"Failed to submit block {} to Tari network: {error:?}",
origin_block_header.hash()
);
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
self.stats_store
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_REJECTED_BLOCKS_COUNT), 1)
.await;
Expand All @@ -471,7 +499,8 @@ where S: ShareChain
},
}
} else {
debug!(target: LOG_TARGET, "Trace - submitting to share chain: {}", timer.elapsed().as_millis());
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
debug!(target: LOG_TARGET, "Trace - submitting to share chain: {}", timer.elapsed().as_millis());
block.sent_to_main_chain = false;
// Don't error if we can't submit it.
match self.submit_share_chain_block(&block).await {
Expand All @@ -485,6 +514,7 @@ where S: ShareChain
};
}

warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
debug!(target: LOG_TARGET, "Trace - getting stats:{} ", timer.elapsed().as_millis());
let stats = self
.stats_store
Expand All @@ -495,6 +525,7 @@ where S: ShareChain
algo_stat_key(pow_algo, P2POOL_STAT_REJECTED_BLOCKS_COUNT),
])
.await;
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
info!(target: LOG_TARGET,
"========= Max difficulty: {}. Network difficulty {}. Miner(A/R): {}/{}. Pool(A/R) {}/{}. ==== ",
max_difficulty.as_u64().to_formatted_string(&Locale::en),
Expand All @@ -505,6 +536,7 @@ where S: ShareChain
stats[3]
);

warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
if timer.elapsed() > MAX_ACCEPTABLE_GRPC_TIMEOUT {
warn!(target: LOG_TARGET, "submit_block took {}ms", timer.elapsed().as_millis());
}
Expand All @@ -515,7 +547,13 @@ where S: ShareChain
}).await;

match result {
Ok(response) => response,
Ok(response) => match response {
Ok(response) => Ok(response),
Err(e) => {
error!(target: LOG_TARGET, "submit_block failed: {e:?}");
Err(Status::internal("submit_block failed"))
},
},
Err(e) => {
error!(target: LOG_TARGET, "submit_block timed out: {e:?}");
Err(Status::deadline_exceeded("submit_block timed out"))
Expand Down
3 changes: 2 additions & 1 deletion src/server/http/stats/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::{collections::HashMap, sync::Arc};

use axum::{extract::State, http::StatusCode, Json};
use log::error;
use log::{error, info};
use serde::Serialize;
use tari_core::{proof_of_work::PowAlgorithm, transactions::tari_amount::MicroMinotari};
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
Expand Down Expand Up @@ -137,6 +137,7 @@ pub(crate) async fn handle_miners_with_shares(

pub(crate) async fn handle_get_stats(State(state): State<AppState>) -> Result<Json<Stats>, StatusCode> {
let timer = std::time::Instant::now();
info!(target: LOG_TARGET, "handle_get_stats");

let sha3x_stats = get_chain_stats(state.clone(), PowAlgorithm::Sha3x).await?;
let randomx_stats = get_chain_stats(state.clone(), PowAlgorithm::RandomX).await?;
Expand Down
5 changes: 5 additions & 0 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ where S: ShareChain
}

async fn add_peer_and_try_sync(&mut self, payload: PeerInfo, peer: PeerId) -> ControlFlow<()> {
dbg!("Try to sync");
let current_randomx_height = payload.current_random_x_height;
let current_sha3x_height = payload.current_sha3x_height;
for addr in &payload.public_addresses {
Expand Down Expand Up @@ -722,6 +723,7 @@ where S: ShareChain
channel: ResponseChannel<ShareChainSyncResponse>,
request: ShareChainSyncRequest,
) {
dbg!("Trying to sync from user request");
debug!(target: MESSAGE_LOGGING_LOG_TARGET, "Share chain sync request: {request:?}");

debug!(target: LOG_TARGET, squad = &self.config.squad; "Incoming Share chain sync request: {request:?}");
Expand Down Expand Up @@ -1118,6 +1120,7 @@ where S: ShareChain
}
_ = publish_peer_info_interval.tick() => {
dbg!("pub peer");
info!(target: LOG_TARGET, "pub peer info");
// handle case when we have some peers removed
let expired_peers = self.network_peer_store.cleanup().await;
for exp_peer in expired_peers {
Expand All @@ -1140,6 +1143,7 @@ where S: ShareChain
}
},
res = self.snooze_block_rx.recv() => {
info!(target: LOG_TARGET, "snooze block");
dbg!("snooze");
if let Some((snoozes_left, block)) = res {
let snooze_sender = self.snooze_block_tx.clone();
Expand Down Expand Up @@ -1182,6 +1186,7 @@ where S: ShareChain
// },
_ = kademlia_bootstrap_interval.tick() => {
dbg!("kad boot");
info!(target: LOG_TARGET, "kad bootstrap event");
if let Err(error) = self.bootstrap_kademlia() {
warn!(target: LOG_TARGET, squad = &self.config.squad; "Failed to do kademlia bootstrap: {error:?}");
}
Expand Down
1 change: 1 addition & 0 deletions src/server/p2p/peer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl PeerStore {
/// Add a new peer to store.
/// If a peer already exists, just replaces it.
pub async fn add(&self, peer_id: PeerId, peer_info: PeerInfo) {
dbg!(&peer_info);
// if self.banned_peers.contains_key(&peer_id) {
// return;
// }
Expand Down
1 change: 1 addition & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ where S: ShareChain
info!(target: LOG_TARGET, "Starting gRPC server on port {}!", &grpc_port);

tonic::transport::Server::builder()
.concurrency_limit_per_connection(1)
.add_service(base_node_service)
.add_service(p2pool_service)
.serve_with_shutdown(
Expand Down
2 changes: 2 additions & 0 deletions src/sharechain/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum Error {
TariAddress(#[from] TariAddressError),
#[error("Invalid block: {0:?}")]
InvalidBlock(Block),
#[error("Too many blocks in this level")]
TooManyBlocksInThisLevel,
#[error("Number conversion error: {0}")]
FromIntConversion(#[from] TryFromIntError),
#[error("Difficulty calculation error: {0}")]
Expand Down
Loading

0 comments on commit 05b2443

Please sign in to comment.