Skip to content

Commit

Permalink
Merge branch 'development' of github.com:tari-project/sha-p2pool into…
Browse files Browse the repository at this point in the history
… development
  • Loading branch information
stringhandler committed Oct 13, 2024
2 parents 862f2b5 + f607c25 commit 66bae41
Show file tree
Hide file tree
Showing 20 changed files with 617 additions and 495 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ minotari_node_grpc_client = {git = "https://github.com/tari-project/tari.git"}
tari_common = {git = "https://github.com/tari-project/tari.git"}
tari_common_types = {git = "https://github.com/tari-project/tari.git"}
tari_core = {git = "https://github.com/tari-project/tari.git"}
tari_crypto = "0.20.1"
# tari_crypto = "0.20.1"
tari_shutdown = {git = "https://github.com/tari-project/tari.git"}
tari_utilities = {version = "0.7", features = ["borsh"]}

Expand Down
3 changes: 1 addition & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause
#![feature(assert_matches)]
use clap::Parser;
use tari_shutdown::Shutdown;

Expand All @@ -10,7 +9,7 @@ mod cli;
mod server;
mod sharechain;

#[tokio::main]
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
Cli::parse().handle_command(Shutdown::new().to_signal()).await?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/server/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub mod error;
pub mod p2pool;
pub mod util;

pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(100);
pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1000);
139 changes: 77 additions & 62 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
time::{Duration, Instant},
};

use digest::typenum::Diff;
use log::{debug, error, info, warn};
use minotari_app_grpc::tari_rpc::{
base_node_client::BaseNodeClient,
Expand Down Expand Up @@ -37,16 +38,18 @@ use tonic::{Request, Response, Status};
use crate::{
server::{
grpc::{error::Error, util, MAX_ACCEPTABLE_GRPC_TIMEOUT},
http::stats::{
algo_stat_key,
MINER_STAT_ACCEPTED_BLOCKS_COUNT,
MINER_STAT_REJECTED_BLOCKS_COUNT,
P2POOL_STAT_ACCEPTED_BLOCKS_COUNT,
P2POOL_STAT_REJECTED_BLOCKS_COUNT,
http::{
stats::{
self,
algo_stat_key,
MINER_STAT_ACCEPTED_BLOCKS_COUNT,
MINER_STAT_REJECTED_BLOCKS_COUNT,
P2POOL_STAT_ACCEPTED_BLOCKS_COUNT,
P2POOL_STAT_REJECTED_BLOCKS_COUNT,
},
stats_collector::StatsBroadcastClient,
},
p2p,
p2p::Squad,
stats_store::StatsStore,
p2p::{self, Squad},
},
sharechain::{block::Block, BlockValidationParams, ShareChain, SHARE_COUNT},
};
Expand Down Expand Up @@ -77,8 +80,7 @@ where S: ShareChain
share_chain_sha3x: Arc<S>,
/// RandomX share chain
share_chain_random_x: Arc<S>,
/// Stats store
stats_store: Arc<StatsStore>,
stats_broadcast: StatsBroadcastClient,
/// Block validation params to be used when checking block difficulty.
block_validation_params: BlockValidationParams,
sha3_block_height_difficulty_cache: Arc<RwLock<HashMap<u64, Difficulty>>>,
Expand All @@ -99,11 +101,11 @@ where S: ShareChain
p2p_client: p2p::ServiceClient,
share_chain_sha3x: Arc<S>,
share_chain_random_x: Arc<S>,
stats_store: Arc<StatsStore>,
shutdown_signal: ShutdownSignal,
random_x_factory: RandomXFactory,
consensus_manager: ConsensusManager,
genesis_block_hash: FixedHash,
stats_broadcast: StatsBroadcastClient,
squad: Squad,
coinbase_extras_sha3x: Arc<RwLock<HashMap<String, Vec<u8>>>>,
coinbase_extras_random_x: Arc<RwLock<HashMap<String, Vec<u8>>>>,
Expand All @@ -112,10 +114,10 @@ where S: ShareChain
client: Arc::new(RwLock::new(
util::connect_base_node(base_node_address, shutdown_signal).await?,
)),
stats_broadcast,
p2p_client,
share_chain_sha3x,
share_chain_random_x,
stats_store,
block_validation_params: BlockValidationParams::new(
random_x_factory,
consensus_manager.clone(),
Expand All @@ -139,11 +141,13 @@ where S: ShareChain
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let timer = Instant::now();
match share_chain.submit_block(block).await {
Ok(_) => {
self.stats_store
.inc(&algo_stat_key(pow_algo, MINER_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
let _ = self.stats_broadcast.send_miner_block_accepted(pow_algo);
// self.stats_store
// .inc(&algo_stat_key(pow_algo, MINER_STAT_ACCEPTED_BLOCKS_COUNT), 1)
// .await;
let res = self
.p2p_client
.broadcast_block(block)
Expand All @@ -156,9 +160,10 @@ where S: ShareChain
},
Err(error) => {
warn!(target: LOG_TARGET, "Failed to add new block: {error:?}");
self.stats_store
.inc(&algo_stat_key(pow_algo, MINER_STAT_REJECTED_BLOCKS_COUNT), 1)
.await;
let _ = self.stats_broadcast.send_miner_block_rejected(pow_algo);
// self.stats_store
// .inc(&algo_stat_key(pow_algo, MINER_STAT_REJECTED_BLOCKS_COUNT), 1)
// .await;
Ok(())
},
}
Expand All @@ -176,7 +181,7 @@ where S: ShareChain
&self,
request: Request<GetNewBlockRequest>,
) -> Result<Response<GetNewBlockResponse>, Status> {
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT + Duration::from_secs(5);
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT;

let result = timeout(timeout_duration, async {
let timer = Instant::now();
Expand Down Expand Up @@ -272,20 +277,29 @@ where S: ShareChain
.insert(height, actual_diff),
};
let min_difficulty = min_difficulty(&self.consensus_manager, pow_algo, height);
let mut target_difficulty = Difficulty::from_u64(
miner_data
.target_difficulty
.checked_div(SHARE_COUNT)
.expect("Should only fail on div by 0"),
)
.map_err(|error| {
error!(target: LOG_TARGET, "Failed to get target difficulty: {error:?}");
Status::internal(format!("Failed to get target difficulty: {}", error))
})?;
let chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let mut target_difficulty = chain
.get_target_difficulty()
.await
.map_err(|e| Status::internal("Could not get target difficutly"))?;

if target_difficulty < min_difficulty {
target_difficulty = min_difficulty;
}

if target_difficulty > actual_diff {
warn!(
target: LOG_TARGET,
"Target difficulty is higher than actual difficulty. Target: {}, Actual: {}",
target_difficulty,
actual_diff
);
// Never go higher than the network.
target_difficulty = actual_diff;
}
if let Some(miner_data) = response.miner_data.as_mut() {
miner_data.target_difficulty = target_difficulty.as_u64();
}
Expand Down Expand Up @@ -318,15 +332,17 @@ where S: ShareChain
&self,
request: Request<SubmitBlockRequest>,
) -> Result<Response<SubmitBlockResponse>, Status> {
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT + Duration::from_secs(5);
let timeout_duration = MAX_ACCEPTABLE_GRPC_TIMEOUT;

let result = timeout(timeout_duration, async {
let timer = Instant::now();
// Only one submit at a time
let _permit = self.submit_block_semaphore.acquire().await;
// if self.submit_block_semaphore.available_permits() == 0 {
// return Err(Status::resource_exhausted("submit_block semaphore is full"));
// }
// // Only one submit at a time
// let _permit = self.submit_block_semaphore.acquire().await;
debug!(target: LOG_TARGET, "submit_block permit acquired: {}", timer.elapsed().as_millis());

debug!("Trace - getting grpc fields");
// get all grpc request related data
let grpc_block = request.get_ref();
let grpc_request_payload = grpc_block
Expand Down Expand Up @@ -361,7 +377,7 @@ where S: ShareChain
.await
.map_err(|error| Status::internal(error.to_string()))?;

let origin_block_header = &&block.original_block_header.clone();
let origin_block_header = &&block.original_block_header.clone();

debug!(target: LOG_TARGET, "Trace - getting block difficulty: {}", timer.elapsed().as_millis());
// Check block's difficulty compared to the latest network one to increase the probability
Expand Down Expand Up @@ -441,9 +457,10 @@ where S: ShareChain
match self.client.write().await.submit_block(grpc_request).await {
Ok(_resp) => {
*max_difficulty = Difficulty::min();
self.stats_store
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
self.stats_broadcast.send_pool_block_accepted(pow_algo);
// self.stats_store
// .inc(&algo_stat_key(pow_algo, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT), 1)
// .await;
info!(
target: LOG_TARGET,
"💰 New matching block found and sent to network! Block hash: {}",
Expand All @@ -458,9 +475,11 @@ where S: ShareChain
"Failed to submit block {} to Tari network: {error:?}",
origin_block_header.hash()
);
self.stats_store
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_REJECTED_BLOCKS_COUNT), 1)
.await;
warn!(target: LOG_TARGET, "here 1: {}", timer.elapsed().as_millis());
// self.stats_store
// .inc(&algo_stat_key(pow_algo, P2POOL_STAT_REJECTED_BLOCKS_COUNT), 1)
// .await;
self.stats_broadcast.send_pool_block_rejected(pow_algo);
block.sent_to_main_chain = false;
self.submit_share_chain_block(&block).await?;

Expand All @@ -471,7 +490,7 @@ where S: ShareChain
},
}
} else {
debug!(target: LOG_TARGET, "Trace - submitting to share chain: {}", timer.elapsed().as_millis());
debug!(target: LOG_TARGET, "Trace - submitting to share chain: {}", timer.elapsed().as_millis());
block.sent_to_main_chain = false;
// Don't error if we can't submit it.
match self.submit_share_chain_block(&block).await {
Expand All @@ -485,25 +504,15 @@ where S: ShareChain
};
}

debug!(target: LOG_TARGET, "Trace - getting stats:{} ", timer.elapsed().as_millis());
let stats = self
.stats_store
.get_many(&[
algo_stat_key(pow_algo, MINER_STAT_ACCEPTED_BLOCKS_COUNT),
algo_stat_key(pow_algo, MINER_STAT_REJECTED_BLOCKS_COUNT),
algo_stat_key(pow_algo, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT),
algo_stat_key(pow_algo, P2POOL_STAT_REJECTED_BLOCKS_COUNT),
])
.await;
info!(target: LOG_TARGET,
"========= Max difficulty: {}. Network difficulty {}. Miner(A/R): {}/{}. Pool(A/R) {}/{}. ==== ",
max_difficulty.as_u64().to_formatted_string(&Locale::en),
network_difficulty.as_u64().to_formatted_string(&Locale::en),
stats[0],
stats[1],
stats[2],
stats[3]
);
// info!(target: LOG_TARGET,
// "========= Max difficulty: {}. Network difficulty {}. Miner(A/R): {}/{}. Pool(A/R) {}/{}. ==== ",
// max_difficulty.as_u64().to_formatted_string(&Locale::en),
// network_difficulty.as_u64().to_formatted_string(&Locale::en),
// stats[0],
// stats[1],
// stats[2],
// stats[3]
// );

if timer.elapsed() > MAX_ACCEPTABLE_GRPC_TIMEOUT {
warn!(target: LOG_TARGET, "submit_block took {}ms", timer.elapsed().as_millis());
Expand All @@ -515,7 +524,13 @@ where S: ShareChain
}).await;

match result {
Ok(response) => response,
Ok(response) => match response {
Ok(response) => Ok(response),
Err(e) => {
error!(target: LOG_TARGET, "submit_block failed: {e:?}");
Err(Status::internal("submit_block failed"))
},
},
Err(e) => {
error!(target: LOG_TARGET, "submit_block timed out: {e:?}");
Err(Status::deadline_exceeded("submit_block timed out"))
Expand Down
1 change: 1 addition & 0 deletions src/server/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
mod health;
pub mod server;
pub mod stats;
pub mod stats_collector;
mod version;
Loading

0 comments on commit 66bae41

Please sign in to comment.