Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Stratum IPC service #1959

Merged
merged 4 commits into from
Aug 24, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dapps = ["ethcore-dapps"]
ipc = ["ethcore/ipc"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"]
json-tests = ["ethcore/json-tests"]
stratum = ["ipc"]

[[bin]]
path = "parity/main.rs"
Expand Down
123 changes: 123 additions & 0 deletions parity/boot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

//! Parity micro-service helpers

use nanoipc;
use ipc;
use std;
use std::sync::Arc;
use hypervisor::{HypervisorServiceClient, HYPERVISOR_IPC_URL};
use hypervisor::service::IpcModuleId;
use ctrlc::CtrlC;
use std::sync::atomic::{AtomicBool, Ordering};
use nanoipc::{IpcInterface, GuardedSocket, NanoSocket};
use ipc::WithSocket;
use ethcore_logger::{Config as LogConfig, setup_log};
use docopt::Docopt;

#[derive(Debug)]
pub enum BootError {
ReadArgs(std::io::Error),
DecodeArgs(ipc::binary::BinaryError),
DependencyConnect(nanoipc::SocketError),
}

pub fn host_service<T: ?Sized + Send + Sync + 'static>(addr: &str, stop_guard: Arc<AtomicBool>, service: Arc<T>) where T: IpcInterface {
let socket_url = addr.to_owned();
std::thread::spawn(move || {
let mut worker = nanoipc::Worker::<T>::new(&service);
worker.add_reqrep(&socket_url).unwrap();

while !stop_guard.load(Ordering::Relaxed) {
worker.poll();
}
});
}

pub fn payload<B: ipc::BinaryConvertable>() -> Result<B, BootError> {
use std::io;
use std::io::Read;

let mut buffer = Vec::new();
try!(
io::stdin().read_to_end(&mut buffer)
.map_err(|io_err| BootError::ReadArgs(io_err))
);

ipc::binary::deserialize::<B>(&buffer)
.map_err(|binary_error| BootError::DecodeArgs(binary_error))
}

pub fn register(module_id: IpcModuleId) -> GuardedSocket<HypervisorServiceClient<NanoSocket>>{
let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(HYPERVISOR_IPC_URL).unwrap();
hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(module_id);

hypervisor_client
}

pub fn dependency<C: WithSocket<NanoSocket>>(url: &str)
-> Result<GuardedSocket<C>, BootError>
{
nanoipc::init_client::<C>(url).map_err(|socket_err| BootError::DependencyConnect(socket_err))
}

pub fn main_thread() -> Arc<AtomicBool> {
let stop = Arc::new(AtomicBool::new(false));
let ctrc_stop = stop.clone();
CtrlC::set_handler(move || {
ctrc_stop.store(true, Ordering::Relaxed);
});
stop
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a condvar so you don't have check the flag in a loop. probably for another PR.


pub fn setup_cli_logger(svc_name: &str) {
let usage = format!("
Ethcore {} service
Usage:
parity {} [options]

Options:
-l --logging LOGGING Specify the logging level. Must conform to the same
format as RUST_LOG.
Copy link
Contributor

@gavofyork gavofyork Aug 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tabs used in usage format string - should be spaces to ensure proper display on terminals.

--log-file FILENAME Specify a filename into which logging should be
directed.
--no-color Don't use terminal color codes in output.
", svc_name, svc_name);

#[derive(Debug, RustcDecodable)]
struct Args {
flag_logging: Option<String>,
flag_log_file: Option<String>,
flag_no_color: bool,
}

impl Args {
pub fn log_settings(&self) -> LogConfig {
LogConfig {
color: self.flag_no_color || cfg!(windows),
mode: self.flag_logging.clone(),
file: self.flag_log_file.clone(),
}
}
}

let args: Args = Docopt::new(usage)
.and_then(|d| d.decode())
.unwrap_or_else(|e| e.exit());
setup_log(&args.log_settings()).expect("Log initialization failure");
}
40 changes: 40 additions & 0 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,24 @@ extern crate lazy_static;
extern crate regex;
extern crate isatty;

#[cfg(feature="stratum")]
extern crate ethcore_stratum;

#[cfg(feature = "dapps")]
extern crate ethcore_dapps;

macro_rules! dependency {
($dep_ty:ident, $url:expr) => {
{
let dep = boot::dependency::<$dep_ty<_>>($url)
.unwrap_or_else(|e| panic!("Fatal: error connecting service ({:?})", e));
dep.handshake()
.unwrap_or_else(|e| panic!("Fatal: error in connected service ({:?})", e));
dep
}
}
}

mod cache;
mod upgrade;
mod rpc;
Expand All @@ -83,6 +98,10 @@ mod presale;
mod run;
mod sync;
mod snapshot;
mod boot;

#[cfg(feature="stratum")]
mod stratum;

use std::{process, env};
use cli::print_version;
Expand Down Expand Up @@ -116,13 +135,34 @@ fn start() -> Result<String, String> {
execute(cmd)
}

#[cfg(feature="stratum")]
mod stratum_optional {
pub fn probably_run() -> bool {
// just redirect to the stratum::main()
if ::std::env::args().nth(1).map_or(false, |arg| arg == "stratum") {
super::stratum::main();
true
}
else { false }
}
}

#[cfg(not(feature="stratum"))]
mod stratum_optional {
pub fn probably_run() -> bool {
false
}
}

fn main() {
// just redirect to the sync::main()
if std::env::args().nth(1).map_or(false, |arg| arg == "sync") {
sync::main();
return;
}

if stratum_optional::probably_run() { return; }

match start() {
Ok(result) => {
println!("{}", result);
Expand Down
4 changes: 4 additions & 0 deletions parity/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub mod service_urls {
pub const SYNC: &'static str = "ipc:///tmp/parity-sync.ipc";
pub const SYNC_NOTIFY: &'static str = "ipc:///tmp/parity-sync-notify.ipc";
pub const NETWORK_MANAGER: &'static str = "ipc:///tmp/parity-manage-net.ipc";
#[cfg(feature="stratum")]
pub const STRATUM: &'static str = "ipc:///tmp/parity-stratum.ipc";
#[cfg(feature="stratum")]
pub const MINING_JOB_DISPATCHER: &'static str = "ipc:///tmp/parity-mining-jobs.ipc";
}

#[cfg(not(feature="ipc"))]
Expand Down
57 changes: 57 additions & 0 deletions parity/stratum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

//! Parity sync service

use std;
use std::sync::Arc;
use ethcore_stratum::{Stratum as StratumServer, PushWorkHandler, RemoteJobDispatcher, ServiceConfiguration};
use std::thread;
use modules::service_urls;
use boot;
use hypervisor::service::IpcModuleId;
use std::net::SocketAddr;
use std::str::FromStr;

const STRATUM_MODULE_ID: IpcModuleId = 8000;

pub fn main() {
boot::setup_cli_logger("stratum");

let service_config: ServiceConfiguration = boot::payload()
.unwrap_or_else(|e| panic!("Fatal: error reading boot arguments ({:?})", e));

let job_dispatcher = dependency!(RemoteJobDispatcher, service_urls::MINING_JOB_DISPATCHER);

let stop = boot::main_thread();
let server =
StratumServer::start(
&SocketAddr::from_str(&service_config.listen_addr)
.unwrap_or_else(|e| panic!("Fatal: invalid listen address ({:?})", e)),
job_dispatcher.service().clone(),
service_config.secret
).unwrap_or_else(
|e| panic!("Fatal: cannot start stratum server({:?})", e)
);

boot::host_service(service_urls::STRATUM, stop.clone(), server.clone() as Arc<PushWorkHandler>);

let _ = boot::register(STRATUM_MODULE_ID);

while !stop.load(::std::sync::atomic::Ordering::Relaxed) {
thread::park_timeout(std::time::Duration::from_millis(1000));
}
}
89 changes: 12 additions & 77 deletions parity/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,97 +16,32 @@

//! Parity sync service

use nanoipc;
use ipc;
use std;
use std::sync::Arc;
use hypervisor::{HypervisorServiceClient, SYNC_MODULE_ID, HYPERVISOR_IPC_URL};
use ctrlc::CtrlC;
use std::sync::atomic::{AtomicBool, Ordering};
use docopt::Docopt;
use hypervisor::SYNC_MODULE_ID;
use ethcore::client::{RemoteClient, ChainNotify};
use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration};
use std::thread;
use nanoipc::IpcInterface;
use modules::service_urls;
use ethcore_logger::{Config as LogConfig, setup_log};

const USAGE: &'static str = "
Ethcore sync service
Usage:
parity sync [options]

Options:
-l --logging LOGGING Specify the logging level. Must conform to the same
format as RUST_LOG.
--log-file FILENAME Specify a filename into which logging should be
directed.
--no-color Don't use terminal color codes in output.
";

#[derive(Debug, RustcDecodable)]
struct Args {
flag_logging: Option<String>,
flag_log_file: Option<String>,
flag_no_color: bool,
}

impl Args {
pub fn log_settings(&self) -> LogConfig {
LogConfig {
color: self.flag_no_color || cfg!(windows),
mode: self.flag_logging.clone(),
file: self.flag_log_file.clone(),
}
}
}

fn run_service<T: ?Sized + Send + Sync + 'static>(addr: &str, stop_guard: Arc<AtomicBool>, service: Arc<T>) where T: IpcInterface {
let socket_url = addr.to_owned();
std::thread::spawn(move || {
let mut worker = nanoipc::Worker::<T>::new(&service);
worker.add_reqrep(&socket_url).unwrap();

while !stop_guard.load(Ordering::Relaxed) {
worker.poll();
}
});
}
use boot;

pub fn main() {
use std::io::{self, Read};

let args: Args = Docopt::new(USAGE)
.and_then(|d| d.decode())
.unwrap_or_else(|e| e.exit());

setup_log(&args.log_settings()).expect("Log initialization failure");
boot::setup_cli_logger("sync");

let mut buffer = Vec::new();
io::stdin().read_to_end(&mut buffer).expect("Failed to read initialisation payload");
let service_config = ipc::binary::deserialize::<ServiceConfiguration>(&buffer).expect("Failed deserializing initialisation payload");
let service_config: ServiceConfiguration = boot::payload()
.unwrap_or_else(|e| panic!("Fatal: error reading boot arguments ({:?})", e));

let remote_client = nanoipc::init_client::<RemoteClient<_>>(service_urls::CLIENT).unwrap();
let remote_client = dependency!(RemoteClient, service_urls::CLIENT);

remote_client.handshake().unwrap();

let stop = Arc::new(AtomicBool::new(false));
let stop = boot::main_thread();
let sync = EthSync::new(service_config.sync, remote_client.service().clone(), service_config.net).unwrap();
boot::host_service(service_urls::SYNC, stop.clone(), sync.clone() as Arc<SyncProvider>);
boot::host_service(service_urls::NETWORK_MANAGER, stop.clone(), sync.clone() as Arc<ManageNetwork>);
boot::host_service(service_urls::SYNC_NOTIFY, stop.clone(), sync.clone() as Arc<ChainNotify>);

run_service(service_urls::SYNC, stop.clone(), sync.clone() as Arc<SyncProvider>);
run_service(service_urls::NETWORK_MANAGER, stop.clone(), sync.clone() as Arc<ManageNetwork>);
run_service(service_urls::SYNC_NOTIFY, stop.clone(), sync.clone() as Arc<ChainNotify>);

let hypervisor_client = nanoipc::init_client::<HypervisorServiceClient<_>>(HYPERVISOR_IPC_URL).unwrap();
hypervisor_client.handshake().unwrap();
hypervisor_client.module_ready(SYNC_MODULE_ID);

let terminate_stop = stop.clone();
CtrlC::set_handler(move || {
terminate_stop.store(true, Ordering::Relaxed);
});
let _ = boot::register(SYNC_MODULE_ID);

while !stop.load(Ordering::Relaxed) {
while !stop.load(::std::sync::atomic::Ordering::Relaxed) {
thread::park_timeout(std::time::Duration::from_millis(1000));
}
}
Loading