Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
mio version bump (#2982)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar authored and gavofyork committed Oct 30, 2016
1 parent 70f87ea commit bccc56b
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 64 deletions.
34 changes: 32 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub struct Client {
miner: Arc<Miner>,
sleep_state: Mutex<SleepState>,
liveness: AtomicBool,
io_channel: IoChannel<ClientIoMessage>,
io_channel: Mutex<IoChannel<ClientIoMessage>>,
notify: RwLock<Vec<Weak<ChainNotify>>>,
queue_transactions: AtomicUsize,
last_hashes: RwLock<VecDeque<H256>>,
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Client {
import_lock: Mutex::new(()),
panic_handler: panic_handler,
miner: miner,
io_channel: message_channel,
io_channel: Mutex::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: AtomicUsize::new(0),
last_hashes: RwLock::new(VecDeque::new()),
Expand Down Expand Up @@ -1139,7 +1139,7 @@ impl BlockChainClient for Client {
debug!("Ignoring {} transactions: queue is full", transactions.len());
} else {
let len = transactions.len();
match self.io_channel.send(ClientIoMessage::NewTransactions(transactions)) {
match self.io_channel.lock().send(ClientIoMessage::NewTransactions(transactions)) {
Ok(_) => {
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
}
Expand Down
10 changes: 5 additions & 5 deletions ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub struct Service {
restoration: Mutex<Option<Restoration>>,
snapshot_root: PathBuf,
db_config: DatabaseConfig,
io_channel: Channel,
io_channel: Mutex<Channel>,
pruning: Algorithm,
status: Mutex<RestorationStatus>,
reader: RwLock<Option<LooseReader>>,
Expand All @@ -233,7 +233,7 @@ impl Service {
restoration: Mutex::new(None),
snapshot_root: params.snapshot_root,
db_config: params.db_config,
io_channel: params.channel,
io_channel: Mutex::new(params.channel),
pruning: params.pruning,
status: Mutex::new(RestorationStatus::Inactive),
reader: RwLock::new(None),
Expand Down Expand Up @@ -567,7 +567,7 @@ impl SnapshotService for Service {
}

fn begin_restore(&self, manifest: ManifestData) {
if let Err(e) = self.io_channel.send(ClientIoMessage::BeginRestoration(manifest)) {
if let Err(e) = self.io_channel.lock().send(ClientIoMessage::BeginRestoration(manifest)) {
trace!("Error sending snapshot service message: {:?}", e);
}
}
Expand All @@ -578,13 +578,13 @@ impl SnapshotService for Service {
}

fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
if let Err(e) = self.io_channel.send(ClientIoMessage::FeedStateChunk(hash, chunk)) {
if let Err(e) = self.io_channel.lock().send(ClientIoMessage::FeedStateChunk(hash, chunk)) {
trace!("Error sending snapshot service message: {:?}", e);
}
}

fn restore_block_chunk(&self, hash: H256, chunk: Bytes) {
if let Err(e) = self.io_channel.send(ClientIoMessage::FeedBlockChunk(hash, chunk)) {
if let Err(e) = self.io_channel.lock().send(ClientIoMessage::FeedBlockChunk(hash, chunk)) {
trace!("Error sending snapshot service message: {:?}", e);
}
}
Expand Down
7 changes: 4 additions & 3 deletions ethcore/src/snapshot/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Watcher for snapshot-related chain events.
use util::Mutex;
use client::{BlockChainClient, Client, ChainNotify};
use ids::BlockID;
use service::ClientIoMessage;
Expand Down Expand Up @@ -55,7 +56,7 @@ trait Broadcast: Send + Sync {
fn take_at(&self, num: Option<u64>);
}

impl Broadcast for IoChannel<ClientIoMessage> {
impl Broadcast for Mutex<IoChannel<ClientIoMessage>> {
fn take_at(&self, num: Option<u64>) {
let num = match num {
Some(n) => n,
Expand All @@ -64,7 +65,7 @@ impl Broadcast for IoChannel<ClientIoMessage> {

trace!(target: "snapshot_watcher", "broadcast: {}", num);

if let Err(e) = self.send(ClientIoMessage::TakeSnapshot(num)) {
if let Err(e) = self.lock().send(ClientIoMessage::TakeSnapshot(num)) {
warn!("Snapshot watcher disconnected from IoService: {}", e);
}
}
Expand All @@ -91,7 +92,7 @@ impl Watcher {
client: client,
sync_status: sync_status,
}),
broadcast: Box::new(channel),
broadcast: Box::new(Mutex::new(channel)),
period: period,
history: history,
}
Expand Down
10 changes: 6 additions & 4 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub struct VerificationQueue<K: Kind> {
struct QueueSignal {
deleting: Arc<AtomicBool>,
signalled: AtomicBool,
message_channel: IoChannel<ClientIoMessage>,
message_channel: Mutex<IoChannel<ClientIoMessage>>,
}

impl QueueSignal {
Expand All @@ -121,7 +121,8 @@ impl QueueSignal {
}

if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send_sync(ClientIoMessage::BlockVerified) {
let channel = self.message_channel.lock().clone();
if let Err(e) = channel.send_sync(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
Expand All @@ -135,7 +136,8 @@ impl QueueSignal {
}

if self.signalled.compare_and_swap(false, true, AtomicOrdering::Relaxed) == false {
if let Err(e) = self.message_channel.send(ClientIoMessage::BlockVerified) {
let channel = self.message_channel.lock().clone();
if let Err(e) = channel.send(ClientIoMessage::BlockVerified) {
debug!("Error sending BlockVerified message: {:?}", e);
}
}
Expand Down Expand Up @@ -178,7 +180,7 @@ impl<K: Kind> VerificationQueue<K> {
let ready_signal = Arc::new(QueueSignal {
deleting: deleting.clone(),
signalled: AtomicBool::new(false),
message_channel: message_channel
message_channel: Mutex::new(message_channel),
});
let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc();
Expand Down
2 changes: 1 addition & 1 deletion util/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version = "1.4.0"
authors = ["Ethcore <admin@ethcore.io>"]

[dependencies]
mio = { git = "https://github.com/ethcore/mio", branch = "v0.5.x" }
mio = { git = "https://github.com/carllerche/mio" }
crossbeam = "0.2"
parking_lot = "0.3"
log = "0.3"
Expand Down
7 changes: 4 additions & 3 deletions util/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ mod service;
mod worker;
mod panics;

use mio::{EventLoop, Token};
use mio::{Token};
use mio::deprecated::{EventLoop, NotifyError};
use std::fmt;

pub use worker::LOCAL_STACK_SIZE;
Expand Down Expand Up @@ -96,8 +97,8 @@ impl From<::std::io::Error> for IoError {
}
}

impl<Message> From<::mio::NotifyError<service::IoMessage<Message>>> for IoError where Message: Send + Clone {
fn from(_err: ::mio::NotifyError<service::IoMessage<Message>>) -> IoError {
impl<Message> From<NotifyError<service::IoMessage<Message>>> for IoError where Message: Send + Clone {
fn from(_err: NotifyError<service::IoMessage<Message>>) -> IoError {
IoError::Mio(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error"))
}
}
Expand Down
Loading

0 comments on commit bccc56b

Please sign in to comment.