Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Fix compiler warnings in util/io and upgrade to edition 2018 Upgrade mio to latest #10953

Merged
merged 6 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions util/io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
[package]
name = "ethcore-io"
description = "Ethcore IO library"
version = "1.12.0"
homepage = "http://parity.io"
license = "GPL-3.0"
name = "ethcore-io"
version = "1.12.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"

[dependencies]
fnv = "1.0"
mio = { version = "0.6.8", optional = true }
mio = { version = "0.6.19", optional = true }
crossbeam-deque = "0.6"
parking_lot = "0.8"
log = "0.4"
Expand All @@ -18,3 +19,4 @@ timer = "0.2"
time = "0.1"
tokio = "0.1"
futures = "0.1"

27 changes: 7 additions & 20 deletions util/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,6 @@
//TODO: use Poll from mio
#![allow(deprecated)]

#[cfg(feature = "mio")]
extern crate mio;
#[macro_use]
extern crate log as rlog;
extern crate slab;
extern crate crossbeam_deque as deque;
extern crate parking_lot;
extern crate num_cpus;
extern crate timer;
extern crate fnv;
extern crate time;
extern crate tokio;
extern crate futures;

#[cfg(feature = "mio")]
mod service_mio;
#[cfg(not(feature = "mio"))]
Expand Down Expand Up @@ -170,22 +156,23 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + 'static {
/// Re-register a stream with the event loop
#[cfg(feature = "mio")]
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
/// Deregister a stream. Called whenstream is removed from event loop
/// Deregister a stream. Called when a stream is removed from the event loop
#[cfg(feature = "mio")]
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
}

#[cfg(feature = "mio")]
pub use service_mio::{TimerToken, StreamToken, IoContext, IoService, IoChannel, IoManager, TOKENS_PER_HANDLER};
#[cfg(not(feature = "mio"))]
pub use service_non_mio::{TimerToken, IoContext, IoService, IoChannel, TOKENS_PER_HANDLER};
pub use crate::service_non_mio::{TimerToken, IoContext, IoService, IoChannel, TOKENS_PER_HANDLER};

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic;
use std::thread;
use std::time::Duration;
use std::{
sync::{Arc, atomic},
thread,
time::Duration,
};
use super::*;

// Mio's behaviour is too unstable for this test. Sometimes we have to wait a few milliseconds,
Expand Down
75 changes: 40 additions & 35 deletions util/io/src/service_mio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,23 @@
// You should have received a copy of the GNU General Public License
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::thread::{self, JoinHandle};
use std::collections::HashMap;
use std::time::Duration;

use crossbeam_deque as deque;
use log::{trace, debug, warn};
use mio::*;
use mio::timer::{Timeout};
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
use deque;
use mio::deprecated::{EventLoop, EventLoopBuilder, Handler, Sender};
use mio::timer::Timeout;
use parking_lot::{Condvar, Mutex, RwLock};
use slab::Slab;
use {IoError, IoHandler};
use worker::{Worker, Work, WorkType};
use parking_lot::{Condvar, RwLock, Mutex};
use std::time::Duration;

use crate::{
IoError, IoHandler,
worker::{Work, Worker, WorkType}
};

/// Timer ID
pub type TimerToken = usize;
Expand All @@ -45,7 +50,7 @@ pub enum IoMessage<Message> where Message: Send + Sized {
Shutdown,
/// Register a new protocol handler.
AddHandler {
handler: Arc<IoHandler<Message>+Send>,
handler: Arc<dyn IoHandler<Message>+Send>,
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
},
RemoveHandler {
handler_id: HandlerId,
Expand Down Expand Up @@ -182,7 +187,7 @@ struct UserTimer {
/// Root IO handler. Manages user handlers, messages and IO timers.
pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
handlers: Arc<RwLock<Slab<Arc<dyn IoHandler<Message>>>>>,
workers: Vec<Worker>,
worker_channel: deque::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
Expand All @@ -192,7 +197,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
/// Creates a new instance and registers it with the event loop.
pub fn start(
event_loop: &mut EventLoop<IoManager<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>
handlers: Arc<RwLock<Slab<Arc<dyn IoHandler<Message>>>>>
) -> Result<(), IoError> {
let (worker, stealer) = deque::fifo();
let num_workers = 4;
Expand Down Expand Up @@ -243,24 +248,6 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Sync + 'stati
}
}

fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
let handler_index = token.0 / TOKENS_PER_HANDLER;
let token_id = token.0 % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.read().get(handler_index) {
let maybe_timer = self.timers.read().get(&token.0).cloned();
if let Some(timer) = maybe_timer {
if timer.once {
self.timers.write().remove(&token_id);
event_loop.clear_timeout(&timer.timeout);
} else {
event_loop.timeout(token, timer.delay).expect("Error re-registering user timer");
}
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
self.work_ready.notify_all();
}
}
}

fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Self::Message) {
match msg {
IoMessage::Shutdown => {
Expand Down Expand Up @@ -331,11 +318,29 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Sync + 'stati
}
}
}

fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
let handler_index = token.0 / TOKENS_PER_HANDLER;
let token_id = token.0 % TOKENS_PER_HANDLER;
if let Some(handler) = self.handlers.read().get(handler_index) {
let maybe_timer = self.timers.read().get(&token.0).cloned();
if let Some(timer) = maybe_timer {
if timer.once {
self.timers.write().remove(&token_id);
event_loop.clear_timeout(&timer.timeout);
} else {
event_loop.timeout(token, timer.delay).expect("Error re-registering user timer");
}
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
self.work_ready.notify_all();
}
}
}
}

enum Handlers<Message> where Message: Send {
SharedCollection(Weak<RwLock<Slab<Arc<IoHandler<Message>>>>>),
Single(Weak<IoHandler<Message>>),
SharedCollection(Weak<RwLock<Slab<Arc<dyn IoHandler<Message>>>>>),
Single(Weak<dyn IoHandler<Message>>),
}

impl<Message: Send> Clone for Handlers<Message> {
Expand Down Expand Up @@ -413,13 +418,13 @@ impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
}

/// Create a new synchronous channel to a given handler.
pub fn to_handler(handler: Weak<IoHandler<Message>>) -> IoChannel<Message> {
pub fn to_handler(handler: Weak<dyn IoHandler<Message>>) -> IoChannel<Message> {
IoChannel {
channel: None,
handlers: Handlers::Single(handler),
}
}
fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<IoHandler<Message>>>>>) -> IoChannel<Message> {
fn new(channel: Sender<IoMessage<Message>>, handlers: Weak<RwLock<Slab<Arc<dyn IoHandler<Message>>>>>) -> IoChannel<Message> {
IoChannel {
channel: Some(channel),
handlers: Handlers::SharedCollection(handlers),
Expand All @@ -432,7 +437,7 @@ impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread: Option<JoinHandle<()>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
handlers: Arc<RwLock<Slab<Arc<dyn IoHandler<Message>>>>>,
}

impl<Message> IoService<Message> where Message: Send + Sync + 'static {
Expand Down Expand Up @@ -469,7 +474,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
}

/// Regiter an IO handler with the event loop.
pub fn register_handler(&self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
pub fn register_handler(&self, handler: Arc<dyn IoHandler<Message>+Send>) -> Result<(), IoError> {
self.host_channel.lock().send(IoMessage::AddHandler {
handler: handler,
})?;
Expand Down
Loading