Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e7a7dab
mempool management in jds
GitGab19 Feb 23, 2024
1b66452
cleanup + fmt
GitGab19 Feb 23, 2024
e736e6a
mempool txdata insertion after ProvideMissingTx.Success
GitGab19 Feb 23, 2024
42101ae
println! removal
GitGab19 Feb 23, 2024
6507359
fmt fix
GitGab19 Feb 23, 2024
e911536
last updates after reivews
GitGab19 Feb 26, 2024
4ddc9b6
clippy fix
GitGab19 Feb 26, 2024
b2a808b
self_ cloned as suggested in review
GitGab19 Feb 26, 2024
90d20ba
fix unclosed bracket
GitGab19 Mar 5, 2024
b0788a6
add_tx_data_to_job_and_mempool to asynchronously update job and jds m…
GitGab19 Mar 5, 2024
affb60b
fmt fix
GitGab19 Mar 5, 2024
f42cac0
cleanup
GitGab19 Mar 5, 2024
c878872
check before inserting new tx in declared_mining_job
GitGab19 Mar 5, 2024
ce78421
info! removal
GitGab19 Mar 5, 2024
d88f59b
check before adding provided_missing_txs in declared_mining_job txs list
GitGab19 Mar 5, 2024
ea5c94f
fmt
GitGab19 Mar 5, 2024
7d03248
clippy fix
GitGab19 Mar 5, 2024
f490fe5
rebased on main
GitGab19 Mar 6, 2024
c11b5a0
error propagation
GitGab19 Mar 6, 2024
4244cb6
Renamed rpc method
lorbax Mar 6, 2024
9f63505
JobDeclaratordownstream modified
lorbax Mar 9, 2024
970cd43
error management
lorbax Mar 11, 2024
371d4d4
index used in transactions_with_state fix
GitGab19 Mar 12, 2024
3207c7a
double for cycle removal, missing_indexes reintroduction
GitGab19 Mar 13, 2024
68887cb
fmt and cleanup
GitGab19 Mar 13, 2024
096b30b
error management for transactions decoding
GitGab19 Mar 13, 2024
f5bd5e0
transactions data removed from JobDeclaratorDownstream data structure…
GitGab19 Mar 15, 2024
5768589
fmt
GitGab19 Mar 15, 2024
9fdd74a
Move task on main
lorbax Mar 17, 2024
208e4f5
Verify that all txs in job are present
lorbax Mar 19, 2024
4081324
Error management
lorbax Mar 19, 2024
e577d7a
Fix bug in SendTo output from message_handler
lorbax Mar 21, 2024
c6647cd
Check if a tx is already present in mempool
lorbax Mar 21, 2024
e967617
fix in add_tx_data_to_mempool
GitGab19 Mar 21, 2024
76aa4bd
fmt
GitGab19 Mar 21, 2024
928d475
debug logging
GitGab19 Mar 21, 2024
3793b14
fmt
GitGab19 Mar 21, 2024
909f865
renaming fn from are_all_job_transactions_present to collect_txs_in_job
GitGab19 Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions protocols/v2/roles-logic-sv2/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum Error {
VersionTooBig,
TxVersionTooBig,
TxVersionTooLow,
TxDecodingError(String),
NotFoundChannelId,
NoValidJob,
NoValidTranslatorJob,
Expand All @@ -59,6 +60,7 @@ pub enum Error {
TargetError(InputError),
HashrateError(InputError),
LogicErrorMessage(std::boxed::Box<AllMessages<'static>>),
JDSMissingTransactions,
}

impl From<BinarySv2Error> for Error {
Expand Down Expand Up @@ -138,6 +140,7 @@ impl Display for Error {
VersionTooBig => write!(f, "We are trying to construct a block header with version bigger than i32::MAX"),
TxVersionTooBig => write!(f, "Tx version can not be greater than i32::MAX"),
TxVersionTooLow => write!(f, "Tx version can not be lower than 1"),
TxDecodingError(e) => write!(f, "Impossible to decode tx: {:?}", e),
NotFoundChannelId => write!(f, "No downstream has been registred for this channel id"),
NoValidJob => write!(f, "Impossible to create a standard job for channelA cause no valid job has been received from upstream yet"),
NoValidTranslatorJob => write!(f, "Impossible to create a extended job for channel cause no valid job has been received from upstream yet"),
Expand All @@ -149,6 +152,7 @@ impl Display for Error {
TargetError(e) => write!(f, "Impossible to get Target: {:?}", e),
HashrateError(e) => write!(f, "Impossible to get Hashrate: {:?}", e),
LogicErrorMessage(e) => write!(f, "Message is well formatted but can not be handled: {:?}", e),
JDSMissingTransactions => write!(f, "JD server cannot propagate the block: missing transactions"),
}
}
}
6 changes: 6 additions & 0 deletions roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum JdsError {
Custom(String),
Sv2ProtocolError((u32, Mining<'static>)),
MempoolError(JdsMempoolError),
ImpossibleToReconstructBlock(String),
NoLastDeclaredJob,
}

impl std::fmt::Display for JdsError {
Expand All @@ -42,6 +44,10 @@ impl std::fmt::Display for JdsError {
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
}
MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e),
ImpossibleToReconstructBlock(e) => {
write!(f, "Error in reconstructing the block: {:?}", e)
}
NoLastDeclaredJob => write!(f, "Last declared job not found"),
}
}
}
Expand Down
120 changes: 70 additions & 50 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use roles_logic_sv2::{
parsers::JobDeclaration,
};
use std::{convert::TryInto, io::Cursor};
use stratum_common::bitcoin::Transaction;
use stratum_common::bitcoin::{Transaction, Txid};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use super::{signed_token, TransactionState};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;

use super::signed_token;
use tracing::info;

use super::JobDeclaratorDownstream;

Expand Down Expand Up @@ -53,14 +53,19 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
coinbase_output: self.coinbase_output.clone().try_into().unwrap(),
};
let message_enum = JobDeclaration::AllocateMiningJobTokenSuccess(message_success);
println!(
info!(
"Sending AllocateMiningJobTokenSuccess to proxy {:?}",
message_enum
);
Ok(SendTo::Respond(message_enum))
}

fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result<SendTo, Error> {
// the transactions that are present in the mempool are stored here, that is sent to the
// mempool which use the rpc client to retrieve the whole data for each transaction.
// The unknown transactions is a vector that contains the transactions that are not in the
// jds mempool, and will be non-empty in the ProvideMissingTransactionsSuccess message
let mut known_transactions: Vec<Txid> = vec![];
self.tx_hash_list_hash = Some(message.tx_hash_list_hash.clone().into_static());
if self.verify_job(&message) {
let short_hash_list: Vec<ShortTxId> = message
Expand All @@ -76,22 +81,34 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
.safe_lock(|x| x.to_short_ids(nonce))
.unwrap()
.unwrap();
let mut txs_in_job = vec![];
let mut missing_txs = vec![];
let mut transactions_with_state =
vec![TransactionState::Missing; short_hash_list.len()];
let mut missing_txs: Vec<u16> = Vec::new();

for (i, sid) in short_hash_list.iter().enumerate() {
let sid_: [u8; 6] = sid.to_vec().try_into().unwrap();
if let Some(tx_data) = short_id_mempool.get(&sid_) {
txs_in_job.push(tx_data.clone());
} else {
missing_txs.push(i as u16);
match short_id_mempool.get(&sid_) {
Some(tx_data) => {
transactions_with_state[i] = TransactionState::PresentInMempool(tx_data.id);
known_transactions.push(tx_data.id);
}
None => {
transactions_with_state[i] = TransactionState::Missing;
missing_txs.push(i as u16);
}
}
}
self.declared_mining_job = Some((
message.clone().into_static(),
txs_in_job,
self.declared_mining_job = (
Some(message.clone().into_static()),
transactions_with_state,
missing_txs.clone(),
));
);
// here we send the transactions that we want to be stored in jds mempool with full data

self.add_txs_to_mempool
.add_txs_to_mempool_inner
.known_transactions
.append(&mut known_transactions);

if missing_txs.is_empty() {
let message_success = DeclareMiningJobSuccess {
Expand All @@ -113,7 +130,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
JobDeclaration::ProvideMissingTransactions(
message_provide_missing_transactions,
);
Ok(SendTo_::Respond(message_enum_provide_missing_transactions))
Ok(SendTo::Respond(message_enum_provide_missing_transactions))
}
} else {
let message_error = DeclareMiningJobError {
Expand All @@ -137,43 +154,46 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
&mut self,
message: ProvideMissingTransactionsSuccess,
) -> Result<SendTo, Error> {
match &mut self.declared_mining_job {
Some((_, ref mut transactions, missing_indexes)) => {
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
let mut cursor = Cursor::new(tx);
let tx = Transaction::consensus_decode_from_finite_reader(&mut cursor)
.expect("Invalid tx data from downstream");
let index =
*missing_indexes
.get(i)
.ok_or(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(
JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
),
),
)))? as usize;
transactions.insert(index, tx);
}
// TODO check it
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
let message_success = DeclareMiningJobSuccess {
request_id: message.request_id,
new_mining_job_token: signed_token(
tx_hash_list_hash,
&self.public_key.clone(),
&self.private_key.clone(),
),
};
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
Ok(SendTo::Respond(message_enum_success))
let (_, ref mut transactions_with_state, missing_indexes) = &mut self.declared_mining_job;
let mut unknown_transactions: Vec<Transaction> = vec![];
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
let mut cursor = Cursor::new(tx);
let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor)
.map_err(|e| Error::TxDecodingError(e.to_string()))?;
Vec::push(&mut unknown_transactions, transaction.clone());
let index = *missing_indexes
.get(i)
.ok_or(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
)),
)))? as usize;
// insert the missing transactions in the mempool
transactions_with_state[index] = TransactionState::PresentInMempool(transaction.txid());
}
self.add_txs_to_mempool
.add_txs_to_mempool_inner
.unknown_transactions
.append(&mut unknown_transactions);
// if there still a missing transaction return an error
for tx_with_state in transactions_with_state {
match tx_with_state {
TransactionState::PresentInMempool(_) => continue,
TransactionState::Missing => return Err(Error::JDSMissingTransactions),
}
None => Err(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
)),
))),
}
// TODO check it
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this TODO?

let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
let message_success = DeclareMiningJobSuccess {
request_id: message.request_id,
new_mining_job_token: signed_token(
tx_hash_list_hash,
&self.public_key.clone(),
&self.private_key.clone(),
),
};
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
Ok(SendTo::Respond(message_enum_success))
}

fn handle_submit_solution(&mut self, message: SubmitSolutionJd<'_>) -> Result<SendTo, Error> {
Expand Down
Loading