Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding BufferConfig and interface for specifying CustomAllocator(s) #79

Merged
merged 7 commits into from
Sep 29, 2020
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 68 additions & 5 deletions core/src/component/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;

use crate::prelude::{BufferConfig, ChunkAllocator};
use std::task::Poll;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -320,19 +321,81 @@ where
pub fn suicide(&self) -> () {
self.component().enqueue_control(ControlEvent::Kill);
}

pub(crate) fn with_buffer<R>(&self, f: impl FnOnce(&mut EncodeBuffer) -> R) -> R {
{
// Scoping the borrow
if let Some(buffer) = self.buffer.borrow_mut().as_mut() {
return f(buffer);
}
}
self.init_buffers(None, None);
self.with_buffer(f)
}

/// May be used for manual initialization of a components local `EncodeBuffer`.
/// If this method is never called explicitly the actor will implicitly call it when it tries
/// to serialise its first message.
/// If buffers have already been initialized, explicitly or implicitly, the method does nothing.
///
/// A custom `BufferConfig` may be specified, if `None` is given the actor will first try
/// to parse the configuration from the system wide Configuration,
/// If that fails, it will use the default config.
///
/// A custom `ChunkAllocator` may also be specified.
/// if `None` is given the actor will use the default allocator
///
/// Note: The NetworkConfig never affects the Actors Buffers.
pub fn init_buffers(
&self,
buffer_config: Option<BufferConfig>,
custom_allocator: Option<Arc<dyn ChunkAllocator>>,
) -> () {
let mut buffer_location = self.buffer.borrow_mut();
match buffer_location.as_mut() {
Some(buffer) => f(buffer),
Some(_) => return, // already initialized, do nothing
None => {
let mut buffer = EncodeBuffer::with_dispatcher_ref(self.dispatcher_ref());
let res = f(&mut buffer);
// Need to create a new Buffer, fetch BufferConfig
let cfg = {
if let Some(cfg) = buffer_config {
cfg
} else {
self.get_buffer_config()
}
};

if custom_allocator.is_some() {
debug!(
self.log(),
"init_buffers with custom allocator, config {:?}.", &cfg
);
} else {
debug!(
self.log(),
"init_buffers with default allocator, config {:?}.", &cfg
);
}

let buffer = EncodeBuffer::with_dispatcher_ref(
self.dispatcher_ref(),
&cfg,
custom_allocator,
);
*buffer_location = Some(buffer);
res
}
}
}

/// Returns a BufferConfig from the global configuration or the default configuration.
fn get_buffer_config(&self) -> BufferConfig {
BufferConfig::from_config(self.config())
}

/// We use this method for assertions in tests
#[allow(dead_code)]
pub(crate) fn get_buffer_location(&self) -> &RefCell<Option<EncodeBuffer>> {
&self.buffer
}
}

impl<CD> ActorRefFactory for ComponentContext<CD>
Expand Down
86 changes: 59 additions & 27 deletions core/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use crate::{
SerialisedFrame,
},
net::{
buffer::{BufferEncoder, EncodeBuffer},
buffer::{BufferChunk, BufferConfig, BufferEncoder, EncodeBuffer},
events::NetworkEvent,
ConnectionState,
NetworkBridgeErr,
},
prelude::ChunkAllocator,
timer::timer_manager::Timer,
};
use arc_swap::ArcSwap;
Expand All @@ -35,9 +36,7 @@ use futures::{
task::{Context, Poll},
};
use rustc_hash::FxHashMap;
use std::{io::ErrorKind, time::Duration};
use std::collections::VecDeque;
use crate::net::buffer::BufferChunk;
use std::{collections::VecDeque, io::ErrorKind, time::Duration};

pub mod lookup;
pub mod queue_manager;
Expand All @@ -61,18 +60,51 @@ type NetHashMap<K, V> = FxHashMap<K, V>;
/// let system = conf.build().expect("system");
/// # system.shutdown().expect("shutdown");
/// ```
#[derive(Clone, PartialEq, Debug)]
#[derive(Clone, Debug)]
pub struct NetworkConfig {
addr: SocketAddr,
transport: Transport,
buffer_config: BufferConfig,
custom_allocator: Option<Arc<dyn ChunkAllocator>>,
}

impl NetworkConfig {
/// Create a new config with `addr` and protocol [TCP](Transport::TCP)
/// NetworkDispatcher and NetworkThread will use the default `BufferConfig`
pub fn new(addr: SocketAddr) -> Self {
NetworkConfig {
addr,
transport: Transport::TCP,
buffer_config: BufferConfig::default(),
custom_allocator: None,
}
}

/// Create a new config with `addr` and protocol [TCP](Transport::TCP)
/// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors
pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self {
buffer_config.validate();
NetworkConfig {
addr,
transport: Transport::TCP,
buffer_config,
custom_allocator: None,
}
}

/// Create a new config with `addr` and protocol [TCP](Transport::TCP)
/// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors
pub fn with_custom_allocator(
addr: SocketAddr,
buffer_config: BufferConfig,
custom_allocator: Arc<dyn ChunkAllocator>,
) -> Self {
buffer_config.validate();
NetworkConfig {
addr,
transport: Transport::TCP,
buffer_config,
custom_allocator: Some(custom_allocator),
}
}

Expand All @@ -89,6 +121,16 @@ impl NetworkConfig {
pub fn build(self) -> impl Fn(KPromise<()>) -> NetworkDispatcher {
move |notify_ready| NetworkDispatcher::with_config(self.clone(), notify_ready)
}

/// Returns a pointer to the configurations `BufferConfig`
pub fn get_buffer_config(&self) -> &BufferConfig {
&self.buffer_config
}

/// Returns a pointer to the CustomAllocator option so that it can be cloned by the caller.
pub fn get_custom_allocator(&self) -> &Option<Arc<dyn ChunkAllocator>> {
return &self.custom_allocator;
}
}

/// Socket defaults to `127.0.0.1:0` (i.e. a random local port) and protocol is [TCP](Transport::TCP)
Expand All @@ -97,6 +139,8 @@ impl Default for NetworkConfig {
NetworkConfig {
addr: "127.0.0.1:0".parse().unwrap(),
transport: Transport::TCP,
buffer_config: BufferConfig::default(),
custom_allocator: None,
}
}
}
Expand Down Expand Up @@ -167,7 +211,10 @@ impl NetworkDispatcher {
pub fn with_config(cfg: NetworkConfig, notify_ready: KPromise<()>) -> Self {
let lookup = Arc::new(ArcSwap::from_pointee(ActorStore::new()));
let reaper = lookup::gc::ActorRefReaper::default();
let encode_buffer = crate::net::buffer::EncodeBuffer::new();
let encode_buffer = crate::net::buffer::EncodeBuffer::with_config(
&cfg.buffer_config,
&cfg.custom_allocator,
);

NetworkDispatcher {
ctx: ComponentContext::uninitialised(),
Expand Down Expand Up @@ -220,6 +267,7 @@ impl NetworkDispatcher {
bridge_logger,
self.cfg.addr,
dispatcher.clone(),
&self.cfg,
);

let deadletter: DynActorRef = self.ctx.system().deadletter_ref().dyn_ref();
Expand Down Expand Up @@ -276,7 +324,9 @@ impl NetworkDispatcher {

let mut retry_queue = VecDeque::new();
for mut trash in self.garbage_buffers.drain(..) {
if !trash.free() { retry_queue.push_back(trash); }
if !trash.free() {
retry_queue.push_back(trash);
}
}
// info!(self.ctx().log(), "tried to clean {} buffer(s)", retry_queue.len()); // manual verification in testing
self.garbage_buffers.append(&mut retry_queue);
Expand Down Expand Up @@ -807,23 +857,6 @@ mod dispatch_tests {
));
println!("Got path: {}", named_path);
}
/*
#[test]
fn tokio_cleanup() {
use tokio::net::TcpListener;

let addr = "127.0.0.1:0"
.parse::<SocketAddr>()
.expect("Could not parse address!");
let listener = TcpListener::bind(&addr).expect("Could not bind socket!");
let actual_addr = listener.local_addr().expect("Could not get real address!");
println!("Bound on {}", actual_addr);
drop(listener);
let listener2 = TcpListener::bind(&actual_addr).expect("Could not re-bind socket!");
let actual_addr2 = listener2.local_addr().expect("Could not get real address!");
println!("Bound again on {}", actual_addr2);
assert_eq!(actual_addr, actual_addr2);
} */

#[test]
fn network_cleanup() {
Expand Down Expand Up @@ -1576,7 +1609,7 @@ mod dispatch_tests {
match sc.downcast::<CustomComponents<DeadletterBox, NetworkDispatcher>>() {
Some(cc) => {
garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len());
},
}
_ => {}
}
assert_ne!(0, garbage_len);
Expand All @@ -1599,7 +1632,7 @@ mod dispatch_tests {
match sc.downcast::<CustomComponents<DeadletterBox, NetworkDispatcher>>() {
Some(cc) => {
garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len());
},
}
_ => {}
}
assert_eq!(0, garbage_len);
Expand All @@ -1613,7 +1646,6 @@ mod dispatch_tests {
}

const PING_COUNT: u64 = 10;

#[test]
fn local_delivery() {
let mut cfg = KompactConfig::new();
Expand Down
6 changes: 4 additions & 2 deletions core/src/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use std::{any::Any, convert::TryFrom};
use uuid::Uuid;

use crate::{
net::{buffer::ChunkLease, frames::FRAME_HEAD_LEN},
net::{
buffer::{BufferChunk, ChunkLease},
frames::FRAME_HEAD_LEN,
},
serialisation::ser_helpers::deserialise_msg,
};
use std::ops::Deref;
use crate::net::buffer::BufferChunk;

pub mod framing;

Expand Down
Loading