Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Cleanup stratum a bit #11161

Merged
merged 3 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions ethcore/src/miner/stratum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ impl NotifyWork for Stratum {

self.service.push_work_all(
self.dispatcher.payload(pow_hash, difficulty, number)
).unwrap_or_else(
|e| warn!(target: "stratum", "Error while pushing work: {:?}", e)
);
}
}
Expand All @@ -239,16 +237,13 @@ impl Stratum {

let dispatcher = Arc::new(StratumJobDispatcher::new(miner, client));

let stratum_svc = StratumService::start(
let service = StratumService::start(
&SocketAddr::new(options.listen_addr.parse::<IpAddr>()?, options.port),
dispatcher.clone(),
options.secret.clone(),
)?;

Ok(Stratum {
dispatcher: dispatcher,
service: stratum_svc,
})
Ok(Stratum { dispatcher, service })
}

/// Start STRATUM job dispatcher and register it in the miner
Expand Down
87 changes: 30 additions & 57 deletions miner/stratum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Stratum {

let implementation = Arc::new(StratumImpl {
subscribers: RwLock::default(),
job_que: RwLock::default(),
job_queue: RwLock::default(),
dispatcher,
workers: Arc::new(RwLock::default()),
secret,
Expand Down Expand Up @@ -106,13 +106,9 @@ impl Stratum {
}

impl PushWorkHandler for Stratum {
fn push_work_all(&self, payload: String) -> Result<(), Error> {
fn push_work_all(&self, payload: String) {
self.implementation.push_work_all(payload, &self.tcp_dispatcher)
}

fn push_work(&self, payloads: Vec<String>) -> Result<(), Error> {
self.implementation.push_work(payloads, &self.tcp_dispatcher)
}
}

impl Drop for Stratum {
Expand All @@ -126,14 +122,14 @@ struct StratumImpl {
/// Subscribed clients
subscribers: RwLock<Vec<SocketAddr>>,
/// List of workers supposed to receive job update
job_que: RwLock<HashSet<SocketAddr>>,
job_queue: RwLock<HashSet<SocketAddr>>,
/// Payload manager
dispatcher: Arc<dyn JobDispatcher>,
/// Authorized workers (socket - worker_id)
workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
/// Secret if any
secret: Option<H256>,
/// Dispatch notify couinter
/// Dispatch notify counter
notify_counter: RwLock<u32>,
}

Expand All @@ -143,7 +139,7 @@ impl StratumImpl {
use std::str::FromStr;

self.subscribers.write().push(meta.addr().clone());
self.job_que.write().insert(meta.addr().clone());
self.job_queue.write().insert(meta.addr().clone());
trace!(target: "stratum", "Subscription request from {:?}", meta.addr());

Ok(match self.dispatcher.initial() {
Expand All @@ -160,7 +156,7 @@ impl StratumImpl {

/// rpc method `mining.authorize`
fn authorize(&self, params: Params, meta: SocketMetadata) -> RpcResult {
params.parse::<(String, String)>().map(|(worker_id, secret)|{
params.parse::<(String, String)>().map(|(worker_id, secret)| {
if let Some(valid_secret) = self.secret {
let hash = keccak(secret);
if hash != valid_secret {
Expand All @@ -184,15 +180,15 @@ impl StratumImpl {
_ => None
})
.collect::<Vec<String>>()) {
Ok(()) => {
self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed"));
to_value(true)
},
Err(submit_err) => {
warn!("Error while submitting share: {:?}", submit_err);
to_value(false)
}
Ok(()) => {
self.update_peers(&meta.tcp_dispatcher.expect("tcp_dispatcher is always initialized; qed"));
to_value(true)
},
Err(submit_err) => {
warn!("Error while submitting share: {:?}", submit_err);
to_value(false)
}
}
},
_ => {
trace!(target: "stratum", "Invalid submit work format {:?}", params);
Expand All @@ -204,70 +200,48 @@ impl StratumImpl {
/// Helper method
fn update_peers(&self, tcp_dispatcher: &Dispatcher) {
if let Some(job) = self.dispatcher.job() {
if let Err(e) = self.push_work_all(job, tcp_dispatcher) {
warn!("Failed to update some of the peers: {:?}", e);
}
self.push_work_all(job, tcp_dispatcher)
}
}

fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
fn push_work_all(&self, payload: String, tcp_dispatcher: &Dispatcher) {
let hup_peers = {
let workers = self.workers.read();
let next_request_id = {
let mut counter = self.notify_counter.write();
if *counter == ::std::u32::MAX { *counter = NOTIFY_COUNTER_INITIAL; }
else { *counter = *counter + 1 }
if *counter == ::std::u32::MAX {
*counter = NOTIFY_COUNTER_INITIAL;
} else {
*counter = *counter + 1
}
*counter
};

let mut hup_peers = HashSet::with_capacity(0); // most of the cases won't be needed, hence avoid allocation
let mut hup_peers = HashSet::new();
let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload);
trace!(target: "stratum", "pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
for (ref addr, _) in workers.iter() {
for (addr, _) in workers.iter() {
trace!(target: "stratum", "pusing work to {}", addr);
match tcp_dispatcher.push_message(addr, workers_msg.clone()) {
Err(PushMessageError::NoSuchPeer) => {
trace!(target: "stratum", "Worker no longer connected: {}", &addr);
hup_peers.insert(*addr.clone());
trace!(target: "stratum", "Worker no longer connected: {}", addr);
hup_peers.insert(addr.clone());
},
Err(e) => {
warn!(target: "stratum", "Unexpected transport error: {:?}", e);
},
Ok(_) => { },
Ok(_) => {},
}
}
hup_peers
};

if !hup_peers.is_empty() {
let mut workers = self.workers.write();
for hup_peer in hup_peers { workers.remove(&hup_peer); }
}

Ok(())
}

fn push_work(&self, payloads: Vec<String>, tcp_dispatcher: &Dispatcher) -> Result<(), Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if !payloads.len() > 0 {
return Err(Error::NoWork);
}
let workers = self.workers.read();
let addrs = workers.keys().collect::<Vec<&SocketAddr>>();
if !workers.len() > 0 {
return Err(Error::NoWorkers);
}
let mut que = payloads;
let mut addr_index = 0;
while que.len() > 0 {
let next_worker = addrs[addr_index];
let mut next_payload = que.drain(0..1);
tcp_dispatcher.push_message(
next_worker,
next_payload.nth(0).expect("drained successfully of 0..1, so 0-th element should exist")
)?;
addr_index = addr_index + 1;
for hup_peer in hup_peers {
workers.remove(&hup_peer);
}
}
Ok(())
}
}

Expand Down Expand Up @@ -475,8 +449,7 @@ mod tests {
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
.and_then(move |stream| {
trace!(target: "stratum", "Pusing work to peers");
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
.expect("Pushing work should produce no errors");
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned());
Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
})
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
Expand Down
5 changes: 1 addition & 4 deletions miner/stratum/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ pub trait JobDispatcher: Send + Sync {
/// Interface that can handle requests to push job for workers
pub trait PushWorkHandler: Send + Sync {
/// push the same work package for all workers (`payload`: json of pow-specific set of work specification)
fn push_work_all(&self, payload: String) -> Result<(), Error>;
Copy link
Collaborator

@niklasad1 niklasad1 Oct 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait looks a bit weird after this change because now push_work_all is infallible and push_work can fail (based on the return types only)

I do realize it is because we don't want to short-circuit if one message fails when we need to send x more messages but still I guess it would be possible to keep all failed messages in a buffer and return Err if !buffer.is_empty()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait looks a bit weird

It does, but it's better to get the weirdness "in your face" than hidden behind a comfy Result that is always Ok.

it would be possible to keep all failed messages in a buffer

Yeah, and while that would be more satisfying I still wonder what use calling code can make with such an error? Honest question: would it be useful?

Copy link
Collaborator

@niklasad1 niklasad1 Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, and while that would be more satisfying I still wonder what use calling code can make with such an error? Honest question: would it be useful?

I don't think so currently it is only called from here AFAIU and would require more changes then.

Also, seems like push_work is not used except in the tests.

We can merge this and investigate my points above separately


/// push the work packages worker-wise (`payload`: json of pow-specific set of work specification)
fn push_work(&self, payloads: Vec<String>) -> Result<(), Error>;
fn push_work_all(&self, payload: String);
}

pub struct ServiceConfiguration {
Expand Down