Skip to content

Commit 8ed4be7

Browse files
committed
Prefer using tokio_compat everywhere for now
1 parent 224c4a0 commit 8ed4be7

File tree

7 files changed

+21
-38
lines changed

7 files changed

+21
-38
lines changed

http/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub use crate::handler::ServerHandler;
5353
pub use crate::response::Response;
5454
pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin};
5555
pub use crate::server_utils::hosts::{DomainsValidation, Host};
56-
pub use crate::server_utils::{tokio, SuspendableStream};
56+
pub use crate::server_utils::{tokio, tokio_compat, SuspendableStream};
5757
pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed};
5858

5959
/// Action undertaken by a middleware.
@@ -300,7 +300,7 @@ where
300300
/// Utilize existing event loop executor to poll RPC results.
301301
///
302302
/// Applies only to 1 of the threads. Other threads will spawn their own Event Loops.
303-
pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self {
303+
pub fn event_loop_executor(mut self, executor: tokio_compat::runtime::TaskExecutor) -> Self {
304304
self.executor = UninitializedExecutor::Shared(executor);
305305
self
306306
}
@@ -519,7 +519,7 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
519519
mpsc::Sender<io::Result<SocketAddr>>,
520520
oneshot::Sender<()>,
521521
),
522-
executor: tokio::runtime::TaskExecutor,
522+
executor: tokio_compat::runtime::TaskExecutor,
523523
addr: SocketAddr,
524524
cors_domains: CorsDomains,
525525
cors_max_age: Option<u32>,

ipc/src/server.rs

+2-10
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use tokio_service::{self, Service as _};
1111

1212
use crate::server_utils::{
1313
codecs, reactor, session,
14-
tokio::{reactor::Handle, runtime::TaskExecutor},
15-
tokio_compat,
14+
tokio::reactor::Handle,
15+
tokio_compat::runtime::TaskExecutor,
1616
tokio_util,
1717
};
1818

@@ -107,12 +107,6 @@ where
107107
self
108108
}
109109

110-
/// Sets shared different event loop executor.
111-
pub fn event_loop_executor_compat(mut self, executor: tokio_compat::runtime::TaskExecutor) -> Self {
112-
self.executor = reactor::UninitializedExecutor::Compat(executor);
113-
self
114-
}
115-
116110
/// Sets different event loop I/O reactor.
117111
pub fn event_loop_reactor(mut self, reactor: Handle) -> Self {
118112
self.reactor = Some(reactor);
@@ -565,9 +559,7 @@ mod tests {
565559
};
566560

567561
let io = MetaIoHandler::<Arc<SessionEndMeta>>::default();
568-
let mut rt = tokio_compat::runtime::Builder::new().build().unwrap();
569562
let builder = ServerBuilder::with_meta_extractor(io, session_metadata_extractor);
570-
let builder = builder.event_loop_executor_compat(rt.executor());
571563
let server = builder.start(path).expect("Server must run with no issues");
572564
{
573565
let _ = UnixStream::connect(path).wait().expect("Socket should connect");

server-utils/src/reactor.rs

+11-20
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,14 @@
66
//! that `tokio::runtime` can be multi-threaded.
77
88
use std::io;
9-
use tokio;
109

1110
use futures01::Future;
1211

1312
/// Possibly uninitialized event loop executor.
1413
#[derive(Debug)]
1514
pub enum UninitializedExecutor {
1615
/// Shared instance of executor.
17-
Shared(tokio::runtime::TaskExecutor),
18-
/// Shared instance of executor.
19-
Compat(tokio_compat::runtime::TaskExecutor),
16+
Shared(tokio_compat::runtime::TaskExecutor),
2017
/// Event Loop should be spawned by the transport.
2118
Unspawned,
2219
}
@@ -35,7 +32,6 @@ impl UninitializedExecutor {
3532
pub fn init_with_name<T: Into<String>>(self, name: T) -> io::Result<Executor> {
3633
match self {
3734
UninitializedExecutor::Shared(executor) => Ok(Executor::Shared(executor)),
38-
UninitializedExecutor::Compat(executor) => Ok(Executor::Compat(executor)),
3935
UninitializedExecutor::Unspawned => RpcEventLoop::with_name(Some(name.into())).map(Executor::Spawned),
4036
}
4137
}
@@ -45,19 +41,16 @@ impl UninitializedExecutor {
4541
#[derive(Debug)]
4642
pub enum Executor {
4743
/// Shared instance
48-
Shared(tokio::runtime::TaskExecutor),
49-
/// Shared instance
50-
Compat(tokio_compat::runtime::TaskExecutor),
44+
Shared(tokio_compat::runtime::TaskExecutor),
5145
/// Spawned Event Loop
5246
Spawned(RpcEventLoop),
5347
}
5448

5549
impl Executor {
5650
/// Get tokio executor associated with this event loop.
57-
pub fn executor(&self) -> tokio::runtime::TaskExecutor {
58-
match *self {
51+
pub fn executor(&self) -> tokio_compat::runtime::TaskExecutor {
52+
match self {
5953
Executor::Shared(ref executor) => executor.clone(),
60-
Executor::Compat(..) => panic!(),
6154
Executor::Spawned(ref eloop) => eloop.executor(),
6255
}
6356
}
@@ -69,7 +62,6 @@ impl Executor {
6962
{
7063
match self {
7164
Executor::Shared(exe) => exe.spawn(future),
72-
Executor::Compat(exe) => exe.spawn(future),
7365
Executor::Spawned(eloop) => eloop.executor().spawn(future),
7466
}
7567
}
@@ -92,9 +84,9 @@ impl Executor {
9284
/// A handle to running event loop. Dropping the handle will cause event loop to finish.
9385
#[derive(Debug)]
9486
pub struct RpcEventLoop {
95-
executor: tokio::runtime::TaskExecutor,
87+
executor: tokio_compat::runtime::TaskExecutor,
9688
close: Option<futures01::Complete<()>>,
97-
handle: Option<tokio::runtime::Shutdown>,
89+
runtime: Option<tokio_compat::runtime::Runtime>,
9890
}
9991

10092
impl Drop for RpcEventLoop {
@@ -113,34 +105,33 @@ impl RpcEventLoop {
113105
pub fn with_name(name: Option<String>) -> io::Result<Self> {
114106
let (stop, stopped) = futures01::oneshot();
115107

116-
let mut tb = tokio::runtime::Builder::new();
108+
let mut tb = tokio_compat::runtime::Builder::new();
117109
tb.core_threads(1);
118110

119111
if let Some(name) = name {
120112
tb.name_prefix(name);
121113
}
122114

123-
let mut runtime = tb.build()?;
115+
let runtime = tb.build()?;
124116
let executor = runtime.executor();
125117
let terminate = futures01::empty().select(stopped).map(|_| ()).map_err(|_| ());
126118
runtime.spawn(terminate);
127-
let handle = runtime.shutdown_on_idle();
128119

129120
Ok(RpcEventLoop {
130121
executor,
131122
close: Some(stop),
132-
handle: Some(handle),
123+
runtime: Some(runtime),
133124
})
134125
}
135126

136127
/// Get executor for this event loop.
137-
pub fn executor(&self) -> tokio::runtime::TaskExecutor {
128+
pub fn executor(&self) -> tokio_compat::runtime::TaskExecutor {
138129
self.executor.clone()
139130
}
140131

141132
/// Blocks current thread and waits until the event loop is finished.
142133
pub fn wait(mut self) -> Result<(), ()> {
143-
self.handle.take().ok_or(())?.wait()
134+
self.runtime.take().ok_or(())?.shutdown_on_idle().wait()
144135
}
145136

146137
/// Finishes this event loop.

tcp/src/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use futures01::sync::oneshot;
88
use futures01::{future, Future, Sink, Stream};
99

1010
use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
11-
use crate::server_utils::{codecs, reactor, tokio, tokio_codec::Framed, SuspendableStream};
11+
use crate::server_utils::{codecs, reactor, tokio, tokio_compat, tokio_codec::Framed, SuspendableStream};
1212

1313
use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
1414
use crate::meta::{MetaExtractor, NoopExtractor, RequestContext};
@@ -60,7 +60,7 @@ where
6060
}
6161

6262
/// Utilize existing event loop executor.
63-
pub fn event_loop_executor(mut self, handle: tokio::runtime::TaskExecutor) -> Self {
63+
pub fn event_loop_executor(mut self, handle: tokio_compat::runtime::TaskExecutor) -> Self {
6464
self.executor = reactor::UninitializedExecutor::Shared(handle);
6565
self
6666
}

ws/src/metadata.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::{atomic, Arc};
33

44
use crate::core;
55
use crate::core::futures::channel::mpsc;
6-
use crate::server_utils::{session, tokio::runtime::TaskExecutor};
6+
use crate::server_utils::{session, tokio_compat::runtime::TaskExecutor};
77
use crate::ws;
88

99
use crate::error;

ws/src/server_builder.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ where
6969
}
7070

7171
/// Utilize existing event loop executor to poll RPC results.
72-
pub fn event_loop_executor(mut self, executor: server_utils::tokio::runtime::TaskExecutor) -> Self {
72+
pub fn event_loop_executor(mut self, executor: server_utils::tokio_compat::runtime::TaskExecutor) -> Self {
7373
self.executor = UninitializedExecutor::Shared(executor);
7474
self
7575
}

ws/src/session.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use slab::Slab;
1111
use crate::server_utils::cors::Origin;
1212
use crate::server_utils::hosts::Host;
1313
use crate::server_utils::session::{SessionId, SessionStats};
14-
use crate::server_utils::tokio::runtime::TaskExecutor;
14+
use crate::server_utils::tokio_compat::runtime::TaskExecutor;
1515
use crate::server_utils::Pattern;
1616
use crate::ws;
1717

0 commit comments

Comments
 (0)