Skip to content

Commit

Permalink
Add IPC leaking example; code comments to explain what happens
Browse files Browse the repository at this point in the history
  • Loading branch information
gnunicorn committed Jun 5, 2018
1 parent 7adb1ae commit 911ea0d
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 0 deletions.
1 change: 1 addition & 0 deletions ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tokio-service = "0.1"
jsonrpc-core = { version = "8.0", path = "../core" }
jsonrpc-server-utils = { version = "8.0", path = "../server-utils" }
parity-tokio-ipc = { git = "https://github.com/nikvolf/parity-tokio-ipc" }
jsonrpc-pubsub = { path = "../pubsub" }

[dev-dependencies]
env_logger = "0.5"
Expand Down
97 changes: 97 additions & 0 deletions ipc/examples/with-pubsubsession.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#[macro_use]
extern crate log;
extern crate env_logger;

extern crate jsonrpc_ipc_server;
extern crate jsonrpc_pubsub;
extern crate jsonrpc_core;

use std::sync::Arc;
use jsonrpc_ipc_server::{ServerBuilder, MetaExtractor, RequestContext};
use jsonrpc_ipc_server::jsonrpc_core::*;
use jsonrpc_pubsub::{Session, PubSubMetadata};

use std::time::Instant;
// use jsonrpc_core::*;
use jsonrpc_core::futures::Future;


//
//
// HERE IS THE MEAT
//

#[derive(Debug, Clone)]
/// RPC methods metadata.
pub struct Metadata {
/// Request PubSub Session
pub session: Arc<Session>
// ^^^^^^^^^^^^^^^^^^^^^ --- Holding a PubSubSession
// internally holds reference to _sender_
}

pub struct RpcExtractor;

impl MetaExtractor<Metadata> for RpcExtractor {
fn extract(&self, req: &RequestContext) -> Metadata {
trace!(target:"ipc", "ext");
Metadata {
session: Arc::new(Session::new(req.sender.clone()))
// ^^^^^^^^^^^^^^^^^^ --- holding reference to sender
}
}
}





//
//
// THIS IS BOILERPLATE. IGNORE
//
//



impl jsonrpc_core::Metadata for Metadata {}
impl PubSubMetadata for Metadata {
fn session(&self) -> Option<Arc<Session>> {
Some(self.session.clone())
}
}

#[derive(Default)]
struct MyMiddleware {}
impl Middleware<Metadata> for MyMiddleware {
type Future = FutureResponse;

fn on_request<F, X>(&self, request: Request, meta: Metadata, next: F) -> FutureResponse where
F: FnOnce(Request, Metadata) -> X + Send,
X: Future<Item=Option<Response>, Error=()> + Send + 'static,
{
let start = Instant::now();

Box::new(next(request, meta).map(move |res| {
println!("Processing took: {:?}", start.elapsed());
res
}))
}
}




fn main() {

env_logger::init();
let mut io = MetaIoHandler::with_middleware(MyMiddleware::default());
io.add_method("say_hello", |_params| {
Ok(Value::String("hello".into()))
});

ServerBuilder::with_meta_extractor(io, RpcExtractor)
.start("/tmp/json-ipc-test.ipc")
.expect("Couldn't open socket")
.wait()
}
15 changes: 15 additions & 0 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,21 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
endpoint_addr: &remote_id,
session_id,
sender,
// ^^^^^^ passing in sender meanse meta holds `sender`
});
let service = Service::new(rpc_handler.clone(), meta);
// ^^^^^^^ service now holds meta, which holds `sender`
let (writer, reader) = io_stream.framed(
codecs::StreamCodec::new(
incoming_separator.clone(),
outgoing_separator.clone(),
)
).split();
let responses = reader.and_then(move |req| {
// ^^^^^^ reader becomes our main holder of everything
service.call(req).then(move |response| match response {
// ^^^^^^^ service moved here in the fn
// means `responses` now holds a reference to service, which holds `sender`
Err(e) => {
warn!(target: "ipc", "Error while processing request: {:?}", e);
future::ok(None)
Expand All @@ -173,16 +178,26 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
})
.filter_map(|x| x)
.select(receiver.map_err(|e| {
// ^^^^^^^^ now responses also holds a reference to `receiver`, too
// thus `responsens` internally circles by holding receiver and sender
warn!(target: "ipc", "Notification error: {:?}", e);
std::io::ErrorKind::Other.into()
}));

let writer = writer.send_all(responses).then(move |_| {
// ^^^^^^^^^ passing all this to `writer`
// means `writer` now holds the entire circle internally
//
// however, not reader.and_then nor receiver ever fires, as receiver would fire on drop
// of `sender`, which doesn't happen as it is bound to `responses` already, which isn't being
// dropped, as it will be referenced by writer for
trace!(target: "ipc", "Peer: service finished");
session_stats.as_ref().map(|stats| stats.close_session(session_id));
Ok(())
});

// now spawning writer and its internal circle of futures that never fire but because of
// the circle are kept around. leaking connections as they are never internally dropped
remote.spawn(|_| writer);

Ok(())
Expand Down

0 comments on commit 911ea0d

Please sign in to comment.