Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions clippy-on-all-workspaces.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/sh

WORKSPACES="benches/Cargo.toml common/Cargo.toml protocols/Cargo.toml roles/Cargo.toml
utils/Cargo.toml"

for workspace in $WORKSPACES; do
echo "Executing clippy on: $workspace"
cargo clippy --manifest-path="$workspace" -- -D warnings -A dead-code
if [ $? -ne 0 ]; then
echo "Clippy found some errors in: $workspace"
exit 1
fi
done

echo "Clippy success!"

9 changes: 5 additions & 4 deletions protocols/v2/roles-logic-sv2/src/handlers/job_declaration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ where
Ok(JobDeclaration::SubmitSolution(message)) => {
info!("Received SubmitSolution");
debug!("SubmitSolution: {:?}", message);
self_
.safe_lock(|x| x.handle_submit_solution(message))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?
Self::handle_submit_solution(self_, message)
}

Ok(_) => todo!(),
Expand All @@ -184,5 +182,8 @@ where
&mut self,
message: ProvideMissingTransactionsSuccess,
) -> Result<SendTo, Error>;
fn handle_submit_solution(&mut self, message: SubmitSolutionJd) -> Result<SendTo, Error>;
fn handle_submit_solution(
self_: Arc<Mutex<Self>>,
message: SubmitSolutionJd,
) -> Result<SendTo, Error>;
}
8 changes: 6 additions & 2 deletions roles/jd-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ binary_sv2 = { version = "^0.1.6", path = "../../protocols/v2/binary-sv2/binary-
buffer_sv2 = { version = "^0.1.2", path = "../../utils/buffer" }
codec_sv2 = { version = "*", path = "../../protocols/v2/codec-sv2", features = ["noise_sv2"] }
const_sv2 = { version = "^0.1.2", path = "../../protocols/v2/const-sv2" }
jsonrpc = { version = "^0.16.0", path = "vendored/rust-jsonrpc"}
network_helpers = { version = "0.1", path = "../../utils/network-helpers", features = ["with_tokio"] }
noise_sv2 = { version = "*", path = "../../protocols/v2/noise-sv2" }
rand = "0.8.4"
Expand All @@ -31,4 +30,9 @@ key-utils = { version = "^1.0.0", path = "../../utils/key-utils" }
secp256k1 = { version = "0.27.0", default-features = false, features =["bitcoin_hashes","alloc","rand","rand-std"] }
siphasher = "1"
hex = "0.4.3"

#hyper = { version = "0.14.28", default-features = false, features = ["client"]}
base64 = "0.21.5"
hyper = { version = "1.1.0", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
http-body-util = "0.1"
bytes = "1"
131 changes: 70 additions & 61 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{convert::TryInto, io::Cursor};
use stratum_common::bitcoin::{hashes::Hash, psbt::serialize::Deserialize, Block, Transaction};
use std::{convert::TryInto, io::Cursor, sync::Arc};

use stratum_common::bitcoin::Transaction;

use binary_sv2::ShortTxId;
use roles_logic_sv2::{
Expand All @@ -10,15 +11,14 @@ use roles_logic_sv2::{
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration,
utils::{merkle_root_from_path, u256_to_block_hash},
utils::Mutex,
};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;
use tracing::warn;
//use tracing::warn;

use crate::lib::job_declarator::signed_token;
use stratum_common::bitcoin::consensus::encode::serialize;

use super::JobDeclaratorDownstream;

Expand Down Expand Up @@ -180,62 +180,71 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}
}

fn handle_submit_solution(&mut self, message: SubmitSolutionJd) -> Result<SendTo, Error> {
fn handle_submit_solution(
_self: Arc<Mutex<Self>>,
message: SubmitSolutionJd<'_>,
) -> Result<SendTo, Error> {
//TODO: implement logic for success or error
let (last_declare, mut tx_list, _) = match self.declared_mining_job.take() {
Some((last_declare, tx_list, _x)) => (last_declare, tx_list, _x),
None => {
warn!("Received solution but no job available");
return Ok(SendTo::None(None));
}
};
let coinbase_pre = last_declare.coinbase_prefix.to_vec();
let extranonce = message.extranonce.to_vec();
let coinbase_suf = last_declare.coinbase_suffix.to_vec();
let mut path: Vec<Vec<u8>> = vec![];
for tx in &tx_list {
let id = tx.txid();
let id = id.as_ref().to_vec();
path.push(id);
}
let merkle_root =
merkle_root_from_path(&coinbase_pre[..], &coinbase_suf[..], &extranonce[..], &path)
.expect("Invalid coinbase");
let merkle_root = Hash::from_inner(merkle_root.try_into().unwrap());

let prev_blockhash = u256_to_block_hash(message.prev_hash.into_static());
let header = stratum_common::bitcoin::blockdata::block::BlockHeader {
version: last_declare.version as i32,
prev_blockhash,
merkle_root,
time: message.ntime,
bits: message.nbits,
nonce: message.nonce,
};

let coinbase = [coinbase_pre, extranonce, coinbase_suf].concat();
let coinbase = Transaction::deserialize(&coinbase[..]).unwrap();
tx_list.insert(0, coinbase);

let mut block = Block {
header,
txdata: tx_list.clone(),
};

block.header.merkle_root = block.compute_merkle_root().unwrap();

let serialized_block = serialize(&block);
let hexdata = hex::encode(serialized_block);

// TODO This line blok everything!!
self.mempool
.safe_lock(|x| {
if let Some(client) = x.get_client() {
client.submit_block(hexdata).unwrap();
}
})
.unwrap();

Ok(SendTo::None(None))
//let (last_declare, mut tx_list, _) = match self_.safe_lock(|x| x.declared_mining_job.take()).unwrap() {
// Some((last_declare, tx_list, _x)) => (last_declare, tx_list, _x),
// None => {
// warn!("Received solution but no job available");
// return Ok(SendTo::None(None));
// }
//};
//let coinbase_pre = last_declare.coinbase_prefix.to_vec();
//let extranonce = message.extranonce.to_vec();
//let coinbase_suf = last_declare.coinbase_suffix.to_vec();
//let mut path: Vec<Vec<u8>> = vec![];
//for tx in &tx_list {
// let id = tx.txid();
// let id = id.as_ref().to_vec();
// path.push(id);
//}
//let merkle_root =
// merkle_root_from_path(&coinbase_pre[..], &coinbase_suf[..], &extranonce[..], &path)
// .expect("Invalid coinbase");
//let merkle_root = Hash::from_inner(merkle_root.try_into().unwrap());

//let prev_blockhash = u256_to_block_hash(message.prev_hash.into_static());
//let header = stratum_common::bitcoin::blockdata::block::BlockHeader {
// version: last_declare.version as i32,
// prev_blockhash,
// merkle_root,
// time: message.ntime,
// bits: message.nbits,
// nonce: message.nonce,
//};

//let coinbase = [coinbase_pre, extranonce, coinbase_suf].concat();
//let coinbase = Transaction::deserialize(&coinbase[..]).unwrap();
//tx_list.insert(0, coinbase);

//let mut block = Block {
// header,
// txdata: tx_list.clone(),
//};

//block.header.merkle_root = block.compute_merkle_root().unwrap();

//let serialized_block = serialize(&block);
//let hexdata = hex::encode(serialized_block);

//// TODO This line blok everything!!
//let client = self_.safe_lock(|y|
// y
// .mempool
// .safe_lock(|x| {
// if let Some(client) = x.get_client() {
// client//.submit_block(hexdata).await;
// } else {
// todo!()
// }
// })
// .unwrap()).unwrap();
//client.submit_block(hexdata).await;
let m = JobDeclaration::SubmitSolution(message.clone().into_static());

Ok(SendTo::RelayNewMessage(m))
}
}
89 changes: 83 additions & 6 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ use roles_logic_sv2::{
common_messages_sv2::SetupConnectionSuccess,
handlers::job_declaration::{ParseClientJobDeclarationMessages, SendTo},
job_declaration_sv2::DeclareMiningJob,
parsers::PoolMessages as JdsMessages,
utils::{Id, Mutex},
parsers::{JobDeclaration, PoolMessages as JdsMessages},
utils::{merkle_root_from_path, u256_to_block_hash, Id, Mutex},
};
use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1};
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tokio::net::TcpListener;
use tracing::{error, info};

use stratum_common::bitcoin::{consensus::Encodable, Transaction};
use stratum_common::bitcoin::{
consensus::{encode::serialize, Encodable},
hashes::Hash,
psbt::serialize::Deserialize,
Block, Transaction,
};

#[derive(Debug)]
pub struct JobDeclaratorDownstream {
Expand Down Expand Up @@ -78,7 +83,11 @@ impl JobDeclaratorDownstream {
sender.send(sv2_frame.into()).await.map_err(|_| ())?;
Ok(())
}
pub fn start(self_mutex: Arc<Mutex<Self>>, tx_status: status::Sender) {
pub fn start(
self_mutex: Arc<Mutex<Self>>,
tx_status: status::Sender,
submit_solution_sender: Sender<String>,
) {
let recv = self_mutex.safe_lock(|s| s.receiver.clone()).unwrap();
tokio::spawn(async move {
loop {
Expand All @@ -102,6 +111,68 @@ impl JobDeclaratorDownstream {
Self::send(self_mutex.clone(), message).await.unwrap();
}
Ok(SendTo::None(_)) => (),
Ok(SendTo::RelayNewMessage(JobDeclaration::SubmitSolution(
message,
))) => {
//TODO: implement logic for success or error
let (last_declare, mut tx_list, _) = match self_mutex
.safe_lock(|x| x.declared_mining_job.take())
.unwrap()
{
Some((last_declare, tx_list, _x)) => {
(last_declare, tx_list, _x)
}
None => {
//warn!("Received solution but no job available");
todo!()
}
};
let coinbase_pre = last_declare.coinbase_prefix.to_vec();
let extranonce = message.extranonce.to_vec();
let coinbase_suf = last_declare.coinbase_suffix.to_vec();
let mut path: Vec<Vec<u8>> = vec![];
for tx in &tx_list {
let id = tx.txid();
let id = id.as_ref().to_vec();
path.push(id);
}
let merkle_root = merkle_root_from_path(
&coinbase_pre[..],
&coinbase_suf[..],
&extranonce[..],
&path,
)
.expect("Invalid coinbase");
let merkle_root = Hash::from_inner(merkle_root.try_into().unwrap());

let prev_blockhash =
u256_to_block_hash(message.prev_hash.into_static());
let header =
stratum_common::bitcoin::blockdata::block::BlockHeader {
version: last_declare.version as i32,
prev_blockhash,
merkle_root,
time: message.ntime,
bits: message.nbits,
nonce: message.nonce,
};

let coinbase = [coinbase_pre, extranonce, coinbase_suf].concat();
let coinbase = Transaction::deserialize(&coinbase[..]).unwrap();
tx_list.insert(0, coinbase);

let mut block = Block {
header,
txdata: tx_list.clone(),
};

block.header.merkle_root = block.compute_merkle_root().unwrap();

let serialized_block = serialize(&block);
let hexdata = hex::encode(serialized_block);

let _ = submit_solution_sender.send(hexdata).await;
}
Err(e) => {
error!("{:?}", e);
handle_result!(
Expand Down Expand Up @@ -155,16 +226,18 @@ impl JobDeclarator {
config: Configuration,
status_tx: crate::status::Sender,
mempool: Arc<Mutex<JDsMempool>>,
sender: Sender<String>,
) {
let self_ = Arc::new(Mutex::new(Self {}));
info!("JD INITIALIZED");
Self::accept_incoming_connection(self_, config, status_tx, mempool).await;
Self::accept_incoming_connection(self_, config, status_tx, mempool, sender).await;
}
async fn accept_incoming_connection(
_self_: Arc<Mutex<JobDeclarator>>,
config: Configuration,
status_tx: crate::status::Sender,
mempool: Arc<Mutex<JDsMempool>>,
submit_solution_sender: Sender<String>,
) {
let listner = TcpListener::bind(&config.listen_jd_address).await.unwrap();
while let Ok((stream, _)) = listner.accept().await {
Expand Down Expand Up @@ -205,7 +278,11 @@ impl JobDeclarator {
mempool.clone(),
)));

JobDeclaratorDownstream::start(jddownstream, status_tx.clone());
JobDeclaratorDownstream::start(
jddownstream,
status_tx.clone(),
submit_solution_sender.clone(),
);
} else {
error!("Can not connect {:?}", addr);
}
Expand Down
38 changes: 38 additions & 0 deletions roles/jd-server/src/lib/mempool/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use super::RpcError;
use tokio::task::JoinError;

// TODO this should be includede in JdsError

#[derive(Debug)]
pub enum JdsMempoolError {
EmptyMempool,
NoClient,
Rpc(RpcError),
PoisonLock(String),
TokioJoinError(JoinError),
}

pub fn handle_error(error: JdsMempoolError) {
match error {
JdsMempoolError::EmptyMempool => println!("Empty mempool!"),
JdsMempoolError::NoClient => println!("RPC Client not found"),
JdsMempoolError::Rpc(a) => match a {
RpcError::JsonRpc(m) => {
let id = m.id;
let error = m.error.unwrap();
let code = error.code;
let message = error.message;
println!(
"RPC error: id {:?}, code {:?}, message {:?}",
id, code, message
);
}
RpcError::Deserialization(e) => println!("Deserialization error: {:?}", e),
RpcError::Serialization(e) => println!("Serialization error: {:?}", e),
RpcError::Http(e) => println!("Http error: {:?}", e),
RpcError::Other(e) => println!("Other error: {:?}", e),
},
JdsMempoolError::PoisonLock(e) => println!("Poison lock error: {:?}", e),
JdsMempoolError::TokioJoinError(e) => println!("Tokio Join error: {:?}", e),
}
}
Loading