From e92218c381d7d7882c17dde77b9f9b2ae52d1d67 Mon Sep 17 00:00:00 2001 From: adamhass Date: Wed, 16 Sep 2020 16:44:42 +0200 Subject: [PATCH 1/6] Adding BufferConfig and adding the ability to specify CustomAllocator for the network components and actors. --- Cargo.lock | 4 +- core/Cargo.toml | 2 +- core/src/component/context.rs | 73 +++++- core/src/dispatch/mod.rs | 86 ++++--- core/src/messaging/mod.rs | 6 +- core/src/net/buffer.rs | 417 +++++++++++++++++++++++++++++---- core/src/net/buffer_pool.rs | 91 ++++--- core/src/net/mod.rs | 3 + core/src/net/network_thread.rs | 52 +++- core/src/runtime/system.rs | 7 +- macros/macro-test/src/main.rs | 1 - rustfmt.toml | 2 +- 12 files changed, 604 insertions(+), 140 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88f0ea5f..3dd1fe6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,9 +811,9 @@ dependencies = [ [[package]] name = "hocon" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cfc0d901c68845ba7f01691089d48b49d3527134e3a94bd7cab9c0a78c79f74" +checksum = "39554b1a6dc84511d3f8c0287ad63a78de60444b4032aec529d9e303ceaf0641" dependencies = [ "failure", "java-properties", diff --git a/core/Cargo.toml b/core/Cargo.toml index a9c3aeb0..b156ca96 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -54,7 +54,7 @@ slog = "2" slog-async = "2" slog-term = "2" rustc-hash = "1.1" -hocon = {version = "0.3", default-features = false} +hocon = {version = "0.3.5", default-features = false} hierarchical_hash_wheel_timer = "1.0" owning_ref = "0.4" futures = "0.3" diff --git a/core/src/component/context.rs b/core/src/component/context.rs index d6f564d9..3c8f6cc1 100644 --- a/core/src/component/context.rs +++ b/core/src/component/context.rs @@ -1,5 +1,6 @@ use super::*; +use crate::prelude::{BufferConfig, ChunkAllocator}; use std::task::Poll; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -320,19 +321,81 @@ where pub fn suicide(&self) -> () { self.component().enqueue_control(ControlEvent::Kill); } - + pub(crate) fn with_buffer(&self, f: impl FnOnce(&mut EncodeBuffer) -> R) -> R { + { + // Scoping the borrow + if let Some(buffer) = self.buffer.borrow_mut().as_mut() { + return f(buffer); + } + } + self.init_buffers(None, None); + self.with_buffer(f) + } + + /// May be used for manual initialization of a components local `EncodeBuffer`. + /// If this method is never called explicitly the actor will implicitly call it when it tries + /// to serialise its first message. + /// If buffers have already been initialized, explicitly or implicitly, the method does nothing. + /// + /// A custom `BufferConfig` may be specified, if `None` is given the actor will first try + /// to parse the configuration from the system wide Configuration, + /// If that fails, it will use the default config. + /// + /// A custom `ChunkAllocator` may also be specified. + /// if `None` is given the actor will use the default allocator + /// + /// Note: The NetworkConfig never affects the Actors Buffers. + pub fn init_buffers( + &self, + buffer_config: Option, + custom_allocator: Option>, + ) -> () { let mut buffer_location = self.buffer.borrow_mut(); match buffer_location.as_mut() { - Some(buffer) => f(buffer), + Some(_) => return, // already initialized, do nothing None => { - let mut buffer = EncodeBuffer::with_dispatcher_ref(self.dispatcher_ref()); - let res = f(&mut buffer); + // Need to create a new Buffer, fetch BufferConfig + let cfg = { + if let Some(cfg) = buffer_config { + cfg + } else { + self.get_buffer_config() + } + }; + + if custom_allocator.is_some() { + debug!( + self.log(), + "init_buffers with custom allocator, config {:?}.", &cfg + ); + } else { + debug!( + self.log(), + "init_buffers with default allocator, config {:?}.", &cfg + ); + } + + let buffer = EncodeBuffer::with_dispatcher_ref( + self.dispatcher_ref(), + &cfg, + custom_allocator, + ); *buffer_location = Some(buffer); - res } } } + + /// Returns a BufferConfig from the global configuration or the default configuration. + fn get_buffer_config(&self) -> BufferConfig { + BufferConfig::from_config(self.config()) + } + + /// We use this method for assertions in tests + #[allow(dead_code)] + pub(crate) fn get_buffer_location(&self) -> &RefCell> { + &self.buffer + } } impl ActorRefFactory for ComponentContext diff --git a/core/src/dispatch/mod.rs b/core/src/dispatch/mod.rs index db510529..7aeeb439 100644 --- a/core/src/dispatch/mod.rs +++ b/core/src/dispatch/mod.rs @@ -22,11 +22,12 @@ use crate::{ SerialisedFrame, }, net::{ - buffer::{BufferEncoder, EncodeBuffer}, + buffer::{BufferChunk, BufferConfig, BufferEncoder, EncodeBuffer}, events::NetworkEvent, ConnectionState, NetworkBridgeErr, }, + prelude::ChunkAllocator, timer::timer_manager::Timer, }; use arc_swap::ArcSwap; @@ -35,9 +36,7 @@ use futures::{ task::{Context, Poll}, }; use rustc_hash::FxHashMap; -use std::{io::ErrorKind, time::Duration}; -use std::collections::VecDeque; -use crate::net::buffer::BufferChunk; +use std::{collections::VecDeque, io::ErrorKind, time::Duration}; pub mod lookup; pub mod queue_manager; @@ -61,18 +60,51 @@ type NetHashMap = FxHashMap; /// let system = conf.build().expect("system"); /// # system.shutdown().expect("shutdown"); /// ``` -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, Debug)] pub struct NetworkConfig { addr: SocketAddr, transport: Transport, + buffer_config: BufferConfig, + custom_allocator: Option>, } impl NetworkConfig { /// Create a new config with `addr` and protocol [TCP](Transport::TCP) + /// NetworkDispatcher and NetworkThread will use the default `BufferConfig` pub fn new(addr: SocketAddr) -> Self { NetworkConfig { addr, transport: Transport::TCP, + buffer_config: BufferConfig::default(), + custom_allocator: None, + } + } + + /// Create a new config with `addr` and protocol [TCP](Transport::TCP) + /// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors + pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self { + buffer_config.validate(); + NetworkConfig { + addr, + transport: Transport::TCP, + buffer_config, + custom_allocator: None, + } + } + + /// Create a new config with `addr` and protocol [TCP](Transport::TCP) + /// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors + pub fn with_custom_allocator( + addr: SocketAddr, + buffer_config: BufferConfig, + custom_allocator: Arc, + ) -> Self { + buffer_config.validate(); + NetworkConfig { + addr, + transport: Transport::TCP, + buffer_config, + custom_allocator: Some(custom_allocator), } } @@ -89,6 +121,16 @@ impl NetworkConfig { pub fn build(self) -> impl Fn(KPromise<()>) -> NetworkDispatcher { move |notify_ready| NetworkDispatcher::with_config(self.clone(), notify_ready) } + + /// Returns a pointer to the configurations `BufferConfig` + pub fn get_buffer_config(&self) -> &BufferConfig { + &self.buffer_config + } + + /// Returns a pointer to the CustomAllocator option so that it can be cloned by the caller. + pub fn get_custom_allocator(&self) -> &Option> { + return &self.custom_allocator; + } } /// Socket defaults to `127.0.0.1:0` (i.e. a random local port) and protocol is [TCP](Transport::TCP) @@ -97,6 +139,8 @@ impl Default for NetworkConfig { NetworkConfig { addr: "127.0.0.1:0".parse().unwrap(), transport: Transport::TCP, + buffer_config: BufferConfig::default(), + custom_allocator: None, } } } @@ -167,7 +211,10 @@ impl NetworkDispatcher { pub fn with_config(cfg: NetworkConfig, notify_ready: KPromise<()>) -> Self { let lookup = Arc::new(ArcSwap::from_pointee(ActorStore::new())); let reaper = lookup::gc::ActorRefReaper::default(); - let encode_buffer = crate::net::buffer::EncodeBuffer::new(); + let encode_buffer = crate::net::buffer::EncodeBuffer::with_config( + &cfg.buffer_config, + &cfg.custom_allocator, + ); NetworkDispatcher { ctx: ComponentContext::uninitialised(), @@ -220,6 +267,7 @@ impl NetworkDispatcher { bridge_logger, self.cfg.addr, dispatcher.clone(), + &self.cfg, ); let deadletter: DynActorRef = self.ctx.system().deadletter_ref().dyn_ref(); @@ -276,7 +324,9 @@ impl NetworkDispatcher { let mut retry_queue = VecDeque::new(); for mut trash in self.garbage_buffers.drain(..) { - if !trash.free() { retry_queue.push_back(trash); } + if !trash.free() { + retry_queue.push_back(trash); + } } // info!(self.ctx().log(), "tried to clean {} buffer(s)", retry_queue.len()); // manual verification in testing self.garbage_buffers.append(&mut retry_queue); @@ -807,23 +857,6 @@ mod dispatch_tests { )); println!("Got path: {}", named_path); } - /* - #[test] - fn tokio_cleanup() { - use tokio::net::TcpListener; - - let addr = "127.0.0.1:0" - .parse::() - .expect("Could not parse address!"); - let listener = TcpListener::bind(&addr).expect("Could not bind socket!"); - let actual_addr = listener.local_addr().expect("Could not get real address!"); - println!("Bound on {}", actual_addr); - drop(listener); - let listener2 = TcpListener::bind(&actual_addr).expect("Could not re-bind socket!"); - let actual_addr2 = listener2.local_addr().expect("Could not get real address!"); - println!("Bound again on {}", actual_addr2); - assert_eq!(actual_addr, actual_addr2); - } */ #[test] fn network_cleanup() { @@ -1576,7 +1609,7 @@ mod dispatch_tests { match sc.downcast::>() { Some(cc) => { garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len()); - }, + } _ => {} } assert_ne!(0, garbage_len); @@ -1599,7 +1632,7 @@ mod dispatch_tests { match sc.downcast::>() { Some(cc) => { garbage_len = cc.dispatcher.on_definition(|nd| nd.garbage_buffers.len()); - }, + } _ => {} } assert_eq!(0, garbage_len); @@ -1613,7 +1646,6 @@ mod dispatch_tests { } const PING_COUNT: u64 = 10; - #[test] fn local_delivery() { let mut cfg = KompactConfig::new(); diff --git a/core/src/messaging/mod.rs b/core/src/messaging/mod.rs index d7772173..035bff93 100644 --- a/core/src/messaging/mod.rs +++ b/core/src/messaging/mod.rs @@ -11,11 +11,13 @@ use std::{any::Any, convert::TryFrom}; use uuid::Uuid; use crate::{ - net::{buffer::ChunkLease, frames::FRAME_HEAD_LEN}, + net::{ + buffer::{BufferChunk, ChunkLease}, + frames::FRAME_HEAD_LEN, + }, serialisation::ser_helpers::deserialise_msg, }; use std::ops::Deref; -use crate::net::buffer::BufferChunk; pub mod framing; diff --git a/core/src/net/buffer.rs b/core/src/net/buffer.rs index 7ba8eeab..96d3fe6e 100644 --- a/core/src/net/buffer.rs +++ b/core/src/net/buffer.rs @@ -1,16 +1,98 @@ use super::*; -use crate::net::{buffer_pool::BufferPool, frames}; +use crate::{ + messaging::DispatchEnvelope, + net::{ + buffer_pool::{BufferPool, ChunkAllocator}, + frames, + }, +}; use bytes::{Buf, BufMut}; use core::{cmp, ptr}; -use std::{io::Cursor, mem::MaybeUninit, sync::Arc}; -use std::fmt::{Debug, Formatter}; -use crate::messaging::DispatchEnvelope; +use hocon::Hocon; +use std::{ + fmt::{Debug, Formatter}, + io::Cursor, + mem::MaybeUninit, + sync::Arc, +}; const FRAME_HEAD_LEN: usize = frames::FRAME_HEAD_LEN as usize; -const BUFFER_SIZE: usize = ENCODEBUFFER_MIN_REMAINING * 2000; -// Assume 64 byte cache lines -> 2000 cache lines per chunk -// (1000 are too small for a UDP datagram) -const ENCODEBUFFER_MIN_REMAINING: usize = 64; // Always have at least a full cache line available + +/// The configuration for the network buffers +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct BufferConfig { + /// Specifies the length of `BufferChunk`s + pub(crate) chunk_size: usize, + /// Specifies the amount of `BufferChunk` a `BufferPool` will pre-allocate on creation + pub(crate) initial_pool_count: usize, + /// Specifies the maximum amount of `BufferChunk`s individual `BufferPool`s will allocate + pub(crate) max_pool_count: usize, + /// Minimum number of bytes an `EncodeBuffer` must have available before serialisation into it + pub(crate) encode_min_remaining: usize, +} + +impl BufferConfig { + /// Create a new BufferConfig with reasonable values + pub fn default() -> Self { + BufferConfig { + chunk_size: 128 * 1000, // 128Kb chunks + initial_pool_count: 2, // 256Kb initial/minimum BufferPools + max_pool_count: 1000, // 128Mb maximum BufferPools + encode_min_remaining: 64, // typical L1 cache line size + } + } + + /// Creates a new BufferConfig with specified values and performs validation + pub fn new( + chunk_size: usize, + initial_pool_count: usize, + max_pool_count: usize, + encode_min_remaining: usize, + ) -> Self { + let buffer_config = BufferConfig { + chunk_size, + initial_pool_count, + max_pool_count, + encode_min_remaining, + }; + buffer_config.validate(); + buffer_config + } + + /// Tries to deserialize a BufferConfig from the specified in the given `config`. + /// Returns a default BufferConfig if it fails to read from the config. + pub fn from_config(config: &Hocon) -> BufferConfig { + let mut buffer_config = BufferConfig::default(); + if let Some(chunk_size) = config["buffer_config"]["chunk_size"].as_i64() { + buffer_config.chunk_size = chunk_size as usize; + } + if let Some(initial_pool_count) = config["buffer_config"]["initial_pool_count"].as_i64() { + buffer_config.initial_pool_count = initial_pool_count as usize; + } + if let Some(max_pool_count) = config["buffer_config"]["max_pool_count"].as_i64() { + buffer_config.max_pool_count = max_pool_count as usize; + } + if let Some(encode_min_remaining) = config["buffer_config"]["encode_min_remaining"].as_i64() + { + buffer_config.encode_min_remaining = encode_min_remaining as usize; + } + buffer_config.validate(); + buffer_config + } + + /// Performs basic sanity checks on the config parameters + pub fn validate(&self) { + if self.initial_pool_count > self.max_pool_count { + panic!("initial_pool_count may not be greater than max_pool_count") + } + if self.chunk_size <= self.encode_min_remaining { + panic!("chunk_size must be greater than encode_min_remaining") + } + if self.chunk_size < 128 { + panic!("chunk_size smaller than 128 is not allowed") + } + } +} /// Required methods for a Chunk pub trait Chunk { @@ -30,8 +112,8 @@ pub(crate) struct DefaultChunk { } impl DefaultChunk { - pub fn new() -> DefaultChunk { - let v = vec![0u8; BUFFER_SIZE]; + pub fn new(size: usize) -> DefaultChunk { + let v = vec![0u8; size]; let slice = v.into_boxed_slice(); DefaultChunk { chunk: slice } } @@ -63,7 +145,7 @@ impl Debug for BufferChunk { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("BufferChunk") .field("Lock", &self.locked) -// .field("Length", &self.chunk.len()) + .field("Length", &self.len()) .field("RefCount", &Arc::strong_count(&self.ref_count)) .finish() } @@ -82,9 +164,9 @@ unsafe impl Send for BufferChunk {} impl BufferChunk { /// Allocate a new Default BufferChunk - pub fn new() -> Self { + pub fn new(size: usize) -> Self { BufferChunk { - chunk: Box::into_raw(Box::new(DefaultChunk::new())), + chunk: Box::into_raw(Box::new(DefaultChunk::new(size))), ref_count: Arc::new(0), locked: false, } @@ -207,8 +289,11 @@ impl<'a> BufMut for BufferEncoder<'a> { unsafe fn advance_mut(&mut self, cnt: usize) { self.encode_buffer.write_offset += cnt; - if self.encode_buffer.remaining() < ENCODEBUFFER_MIN_REMAINING { - self.encode_buffer.swap_buffer(); + if self.encode_buffer.write_offset > self.encode_buffer.buffer.len() { + panic!( + "Fatal error in buffers, write-pointer exceeding buffer length,\ + this should not happen" + ); } } @@ -227,7 +312,10 @@ impl<'a> BufMut for BufferEncoder<'a> { where Self: Sized, { - assert!(src.remaining() <= BUFFER_SIZE, "src too big for buffering"); + assert!( + src.remaining() <= self.encode_buffer.buffer.len(), + "src too big for buffering" + ); if self.remaining_mut() < src.remaining() { // Not enough space in current chunk, need to swap it self.encode_buffer.swap_buffer(); @@ -252,7 +340,10 @@ impl<'a> BufMut for BufferEncoder<'a> { /// Override default impl to allow for large slices to be put in at a time fn put_slice(&mut self, src: &[u8]) { - assert!(src.remaining() <= BUFFER_SIZE, "src too big for buffering"); + assert!( + src.remaining() <= self.encode_buffer.buffer.len(), + "src too big for buffering" + ); if self.remaining_mut() < src.len() { // Not enough space in current chunk, need to swap it self.encode_buffer.swap_buffer(); @@ -260,8 +351,10 @@ impl<'a> BufMut for BufferEncoder<'a> { assert!( self.remaining_mut() >= src.len(), - "EncodeBuffer trying to write too big of a slice, len: {}", - src.len() + "EncodeBuffer trying to write too big, slice len: {}, Chunk len: {}, Remaining: {}", + src.len(), + self.encode_buffer.buffer.len(), + self.remaining_mut() ); unsafe { let dst = self.bytes_mut(); @@ -280,12 +373,16 @@ pub struct EncodeBuffer { write_offset: usize, read_offset: usize, dispatcher_ref: Option, + min_remaining: usize, } impl EncodeBuffer { /// Creates a new EncodeBuffer, allocates a new BufferPool. - pub fn new() -> Self { - let mut buffer_pool = BufferPool::new(); + pub fn with_config( + config: &BufferConfig, + custom_allocator: &Option>, + ) -> Self { + let mut buffer_pool = BufferPool::with_config(config, custom_allocator); if let Some(buffer) = buffer_pool.get_buffer() { EncodeBuffer { buffer, @@ -293,6 +390,7 @@ impl EncodeBuffer { write_offset: 0, read_offset: 0, dispatcher_ref: None, + min_remaining: config.encode_min_remaining, } } else { panic!("Couldn't initialize EncodeBuffer"); @@ -300,8 +398,12 @@ impl EncodeBuffer { } /// Creates a new EncodeBuffer with a DispatcherRef used for safe destruction of the buffers. - pub fn with_dispatcher_ref(dispatcher_ref: DispatcherRef) -> Self { - let mut buffer_pool = BufferPool::new(); + pub fn with_dispatcher_ref( + dispatcher_ref: DispatcherRef, + config: &BufferConfig, + custom_allocator: Option>, + ) -> Self { + let mut buffer_pool = BufferPool::with_config(config, &custom_allocator); if let Some(buffer) = buffer_pool.get_buffer() { EncodeBuffer { buffer, @@ -309,6 +411,7 @@ impl EncodeBuffer { write_offset: 0, read_offset: 0, dispatcher_ref: Some(dispatcher_ref), + min_remaining: config.encode_min_remaining, } } else { panic!("Couldn't initialize EncodeBuffer"); @@ -338,7 +441,7 @@ impl EncodeBuffer { } /// Extracts the bytes between the read-pointer and the write-pointer and advances the read-pointer - /// Ensures there's a minimum of `ENCODEBUFFER_MIN_REMAINING` left in the current buffer which minimizes overflow during swap + /// Ensures there's a configurable-minimum left in the current buffer, minimizes overflow during swap pub(crate) fn get_chunk_lease(&mut self) -> Option { let cnt = self.write_offset - self.read_offset; if cnt > 0 { @@ -347,7 +450,7 @@ impl EncodeBuffer { .buffer .get_lease(self.write_offset - cnt, self.write_offset); - if self.remaining() < ENCODEBUFFER_MIN_REMAINING { + if self.remaining() < self.min_remaining { self.swap_buffer(); } @@ -695,23 +798,123 @@ unsafe impl Send for ChunkLease {} #[cfg(test)] mod tests { - // This is very non-exhaustive testing, just a + // This is very non-exhaustive testing, just basic sanity checks use super::*; + use crate::prelude::NetMessage; + use hocon::HoconLoader; + use std::{borrow::Borrow, time::Duration}; + + #[test] + #[should_panic(expected = "initial_pool_count may not be greater than max_pool_count")] + fn invalid_pool_counts_config_validation() { + let hocon = HoconLoader::new() + .load_str( + r#"{ + buffer_config { + chunk_size: 64, + initial_pool_count: 3, + max_pool_count: 2, + encode_min_remaining: 2, + } + }"#, + ) + .unwrap() + .hocon(); + // This should succeed + let cfg = hocon.unwrap(); + + // Validation should panic + let _ = BufferConfig::from_config(&cfg); + } + + #[test] + #[should_panic(expected = "chunk_size must be greater than encode_min_remaining")] + fn invalid_encode_min_remaining_validation() { + // The BufferConfig should panic because encode_min_remain too high + let _ = BufferConfig::new(128, 10, 10, 128); + } // This test instantiates an EncodeBuffer and writes the same string into it enough times that - // the EncodeBuffer should overload multiple times and will have to try to free at least 1 Chunk + // the EncodeBuffer should overload multiple times and will have to succeed in reusing >=1 Chunk + #[test] + fn encode_buffer_overload_reuse_default_config() { + let buffer_config = BufferConfig::default(); + let encode_buffer = encode_buffer_overload_reuse(&buffer_config); + // Check the buffer pool sizes + assert_eq!( + encode_buffer.buffer_pool.get_pool_sizes(), + ( + buffer_config.initial_pool_count, // No additional has been allocated + buffer_config.initial_pool_count - 1 // Currently in the pool + ) + ); + } + + // As above, except we create a much larger BufferPool with larger Chunks manually configured + #[test] + fn encode_buffer_overload_reuse_manually_configured_large_buffers() { + let buffer_config = BufferConfig::new( + 50000000, // 50 MB chunk_size + 10, // 500 MB pool init value + 20, // 1 GB pool max size + 256, // 256 B min_remaining + ); + let encode_buffer = encode_buffer_overload_reuse(&buffer_config); + assert_eq!(encode_buffer.buffer.len(), 50000000); + // Check the buffer pool sizes + assert_eq!( + encode_buffer.buffer_pool.get_pool_sizes(), + ( + 10, // No additional has been allocated + 10 - 1 // Currently in the pool + ) + ); + } + + // As above, except we create small buffers from a Hocon config. #[test] - fn encode_buffer_overload() { + fn encode_buffer_overload_reuse_hocon_configured_small_buffers() { + let hocon = HoconLoader::new() + .load_str( + r#"{ + buffer_config { + chunk_size: 128, + initial_pool_count: 2, + max_pool_count: 2, + encode_min_remaining: 2, + } + }"#, + ) + .unwrap() + .hocon(); + let buffer_config = BufferConfig::from_config(&hocon.unwrap()); + // Ensure we successfully parsed the Config + assert_eq!(buffer_config.encode_min_remaining, 2 as usize); + let encode_buffer = encode_buffer_overload_reuse(&buffer_config); + // Ensure that the + assert_eq!(encode_buffer.buffer.len(), 128); + // Check the buffer pool sizes + assert_eq!( + encode_buffer.buffer_pool.get_pool_sizes(), + ( + 2, // No additional has been allocated + 2 - 1 // Currently in the pool + ) + ); + } + + // Creates an EncodeBuffer from the given config and uses the configured values to ensure + // The pool is exhausted (or would be if no reuse) and reuses buffers at least once. + fn encode_buffer_overload_reuse(buffer_config: &BufferConfig) -> EncodeBuffer { // Instantiate an encode buffer with default values - let mut encode_buffer = EncodeBuffer::new(); + let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None); { let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer); // Create a string that's bigger than the ENCODEBUFFER_MIN_REMAINING - use std::string::ToString; let mut test_string = "".to_string(); - for i in 0..=ENCODEBUFFER_MIN_REMAINING + 1 { + for i in 0..=&buffer_config.encode_min_remaining + 1 { // Make sure the string isn't just the same char over and over again, // Means we test for some more correctness in buffers test_string.push((i.to_string()).chars().next().unwrap()); @@ -719,9 +922,9 @@ mod tests { let mut cnt = 0; // Make sure we churn through all the initial buffers. - for _ in 0..=crate::net::buffer_pool::INITIAL_BUFFER_LEN + 1 { + for _ in 0..=&buffer_config.initial_pool_count + 1 { // Make sure we fill up a chunk per iteration: - for _ in 0..=(BUFFER_SIZE / ENCODEBUFFER_MIN_REMAINING) + 1 { + for _ in 0..=(&buffer_config.chunk_size / &buffer_config.encode_min_remaining) + 1 { buffer_encoder.put_slice(test_string.clone().as_bytes()); let chunk = buffer_encoder.get_chunk_lease(); assert_eq!( @@ -734,25 +937,19 @@ mod tests { } } } - // Check the buffer pool sizes - assert_eq!( - encode_buffer.buffer_pool.get_pool_sizes(), - ( - crate::net::buffer_pool::INITIAL_BUFFER_LEN, // Total allocated - 0, // Available - crate::net::buffer_pool::INITIAL_BUFFER_LEN - 1 - ) - ); // Returned + encode_buffer } #[test] #[should_panic(expected = "src too big for buffering")] fn encode_buffer_panic() { - let mut encode_buffer = EncodeBuffer::new(); + // Instantiate an encode buffer with default values + let buffer_config = BufferConfig::default(); + let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None); let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer); use std::string::ToString; let mut test_string = "".to_string(); - for i in 0..=BUFFER_SIZE + 10 { + for i in 0..=buffer_config.chunk_size + 10 { test_string.push((i.to_string()).chars().next().unwrap()); } buffer_encoder.put(test_string.as_bytes()); @@ -760,13 +957,141 @@ mod tests { #[test] fn aligned_after_drop() { - let mut encode_buffer = EncodeBuffer::new(); + // Instantiate an encode buffer with default values + let buffer_config = BufferConfig::default(); + let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None); { let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer); buffer_encoder.put_i32(16); } - // Check the buffer pool sizes + // Check the read and write pointers are aligned despite no read call assert_eq!(encode_buffer.write_offset, encode_buffer.read_offset); assert_eq!(encode_buffer.write_offset, 4); } + + #[derive(ComponentDefinition)] + struct BufferTestActor { + ctx: ComponentContext, + custom_buf: bool, + } + impl BufferTestActor { + fn with_custom_buffer() -> BufferTestActor { + BufferTestActor { + ctx: ComponentContext::uninitialised(), + custom_buf: true, + } + } + + fn without_custom_buffer() -> BufferTestActor { + BufferTestActor { + ctx: ComponentContext::uninitialised(), + custom_buf: false, + } + } + } + impl Actor for BufferTestActor { + type Message = (); + + fn receive_local(&mut self, _: Self::Message) -> Handled { + Handled::Ok + } + + fn receive_network(&mut self, _: NetMessage) -> Handled { + Handled::Ok + } + } + impl ComponentLifecycle for BufferTestActor { + fn on_start(&mut self) -> Handled { + if self.custom_buf { + // Initialize the buffers + self.ctx + .borrow() + .init_buffers(Some(BufferConfig::new(128, 4, 5, 30)), None); + } + // Use the Buffer + let _ = self.ctx.actor_path().clone().tell_serialised(120, self); + Handled::Ok + } + + fn on_stop(&mut self) -> Handled { + Handled::Ok + } + + fn on_kill(&mut self) -> Handled { + Handled::Ok + } + } + fn buffer_config_testing_system() -> KompactSystem { + let mut cfg = KompactConfig::new(); + cfg.load_config_str( + r#"{ + buffer_config { + chunk_size: 256, + initial_pool_count: 3, + max_pool_count: 4, + encode_min_remaining: 20, + } + }"#, + ); + cfg.system_components(DeadletterBox::new, { + NetworkConfig::with_buffer_config( + "127.0.0.1:0".parse().expect("Address should work"), + BufferConfig::new(512, 2, 3, 10), + ) + .build() + }); + cfg.build().expect("KompactSystem") + } + // This integration test sets up a KompactSystem with a Hocon BufferConfig, + // then runs init_buffers on an Actor with different settings (in the on_start method of dummy), + // and finally asserts that the actors buffers were set up using the on_start parameters. + #[test] + fn buffer_config_init_buffers_overrides_hocon_and_default() { + let system = buffer_config_testing_system(); + let (dummy, _df) = system.create_and_register(BufferTestActor::with_custom_buffer); + let dummy_f = system.register_by_alias(&dummy, "dummy"); + let _ = dummy_f.wait_expect(Duration::from_millis(1000), "dummy failed"); + system.start(&dummy); + // TODO maybe we could do this a bit more reliable? + thread::sleep(Duration::from_millis(100)); + + // Read the buffer_len + let mut buffer_len = 0; + let mut buffer_write_pointer = 0; + dummy.on_definition(|c| { + if let Some(encode_buffer) = c.ctx.get_buffer_location().borrow().as_ref() { + buffer_len = encode_buffer.buffer.len(); + buffer_write_pointer = encode_buffer.write_offset; + } + }); + // Assert that the buffer was initialized with parameter in the actors on_start method + assert_eq!(buffer_len, 128); + // Check that the buffer was used + assert_ne!(buffer_write_pointer, 0); + } + + #[test] + fn buffer_config_hocon_overrides_default() { + let system = buffer_config_testing_system(); + let (dummy, _df) = system.create_and_register(BufferTestActor::without_custom_buffer); + let dummy_f = system.register_by_alias(&dummy, "dummy"); + let _ = dummy_f.wait_expect(Duration::from_millis(1000), "dummy failed"); + system.start(&dummy); + // TODO maybe we could do this a bit more reliable? + thread::sleep(Duration::from_millis(100)); + + // Read the buffer_len + let mut buffer_len = 0; + let mut buffer_write_pointer = 0; + dummy.on_definition(|c| { + if let Some(encode_buffer) = c.ctx.get_buffer_location().borrow().as_ref() { + buffer_len = encode_buffer.buffer.len(); + buffer_write_pointer = encode_buffer.write_offset; + } + }); + // Assert that the buffer was initialized with parameters in the hocon config string + assert_eq!(buffer_len, 256); + // Check that the buffer was used + assert_ne!(buffer_write_pointer, 0); + } } diff --git a/core/src/net/buffer_pool.rs b/core/src/net/buffer_pool.rs index d0470620..0610bd41 100644 --- a/core/src/net/buffer_pool.rs +++ b/core/src/net/buffer_pool.rs @@ -1,13 +1,12 @@ -use crate::net::buffer::{BufferChunk, Chunk, DefaultChunk}; -use std::collections::VecDeque; -use std::collections::vec_deque::Drain; - -/// The number of Buffers each pool will Pre-allocate -pub const INITIAL_BUFFER_LEN: usize = 5; -const MAX_POOL_SIZE: usize = 10000; +use crate::net::buffer::{BufferChunk, BufferConfig, Chunk, DefaultChunk}; +use std::{ + collections::{vec_deque::Drain, VecDeque}, + fmt::Debug, + sync::Arc, +}; /// Methods required by a ChunkAllocator -pub trait ChunkAllocator: Send + 'static { +pub trait ChunkAllocator: Send + Sync + Debug + 'static { /// ChunkAllocators deliver Chunk by raw pointers fn get_chunk(&self) -> *mut dyn Chunk; /// This method tells the allocator that the [Chunk](Chunk) may be deallocated @@ -22,12 +21,14 @@ pub trait ChunkAllocator: Send + 'static { /// A default allocator for Kompact /// /// Heap allocates chunks through the normal Rust system allocator -#[derive(Default)] -pub(crate) struct DefaultAllocator {} +#[derive(Default, Debug)] +pub(crate) struct DefaultAllocator { + chunk_size: usize, +} impl ChunkAllocator for DefaultAllocator { fn get_chunk(&self) -> *mut dyn Chunk { - Box::into_raw(Box::new(DefaultChunk::new())) + Box::into_raw(Box::new(DefaultChunk::new(self.chunk_size))) } unsafe fn release(&self, ptr: *mut dyn Chunk) -> () { @@ -37,38 +38,36 @@ impl ChunkAllocator for DefaultAllocator { pub(crate) struct BufferPool { pool: VecDeque, - returned: VecDeque, + /// Counts the number of BufferChunks allocated by this pool pool_size: usize, - chunk_allocator: Box, + chunk_allocator: Arc, + max_pool_size: usize, } impl BufferPool { - pub fn new() -> Self { - let mut pool = VecDeque::::new(); - let chunk_allocator = Box::new(DefaultAllocator::default()); - for _ in 0..INITIAL_BUFFER_LEN { - pool.push_front(BufferChunk::from_chunk(chunk_allocator.get_chunk())); - } - BufferPool { - pool, - returned: VecDeque::new(), - pool_size: INITIAL_BUFFER_LEN, - chunk_allocator, - } - } - - /// Creates a BufferPool with a custom ChunkAllocator - #[allow(dead_code)] - pub fn with_allocator(chunk_allocator: Box) -> Self { + /// Creates a BufferPool from a BufferConfig + pub fn with_config( + config: &BufferConfig, + custom_allocator: &Option>, + ) -> Self { + let chunk_allocator = { + if let Some(allocator) = custom_allocator { + allocator.clone() + } else { + Arc::new(DefaultAllocator { + chunk_size: config.chunk_size, + }) + } + }; let mut pool = VecDeque::::new(); - for _ in 0..INITIAL_BUFFER_LEN { + for _ in 0..config.initial_pool_count { pool.push_front(BufferChunk::from_chunk(chunk_allocator.get_chunk())); } BufferPool { pool, - returned: VecDeque::new(), - pool_size: INITIAL_BUFFER_LEN, + pool_size: config.initial_pool_count, chunk_allocator, + max_pool_size: config.max_pool_count, } } @@ -77,30 +76,28 @@ impl BufferPool { } pub fn get_buffer(&mut self) -> Option { - if let Some(new_buffer) = self.pool.pop_front() { - return Some(new_buffer); - } self.try_reclaim() } - pub fn return_buffer(&mut self, buffer: BufferChunk) -> () { - self.returned.push_back(buffer); + pub fn return_buffer(&mut self, mut buffer: BufferChunk) -> () { + buffer.lock(); + self.pool.push_back(buffer); } pub fn drain_returned(&mut self) -> Drain { - self.returned.drain(0..) + self.pool.drain(0..) } /// Iterates of returned buffers from oldest to newest trying to reclaim /// until it successfully finds an available one /// If it fails it will attempt to create a new buffer instead fn try_reclaim(&mut self) -> Option { - for _i in 0..self.returned.len() { - if let Some(mut returned_buffer) = self.returned.pop_front() { - if returned_buffer.free() { - return Some(returned_buffer); + for _i in 0..self.pool.len() { + if let Some(mut buffer) = self.pool.pop_front() { + if buffer.free() { + return Some(buffer); } else { - self.returned.push_back(returned_buffer); + self.pool.push_back(buffer); } } } @@ -108,7 +105,7 @@ impl BufferPool { } fn increase_pool(&mut self) -> Option { - if self.pool_size >= MAX_POOL_SIZE { + if self.pool_size >= self.max_pool_size { return None; }; self.pool_size += 1; @@ -117,7 +114,7 @@ impl BufferPool { /// We use this method for assertions in tests #[allow(dead_code)] - pub(crate) fn get_pool_sizes(&self) -> (usize, usize, usize) { - (self.pool_size, self.pool.len(), self.returned.len()) + pub(crate) fn get_pool_sizes(&self) -> (usize, usize) { + (self.pool_size, self.pool.len()) } } diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs index af52a68a..876a23ba 100644 --- a/core/src/net/mod.rs +++ b/core/src/net/mod.rs @@ -9,6 +9,7 @@ use std::{io, net::SocketAddr, sync::Arc, thread}; use crate::{ messaging::SerialisedFrame, net::{events::DispatchEvent, frames::*, network_thread::NetworkThread}, + prelude::NetworkConfig, }; use bytes::{Buf, BufMut, BytesMut}; use crossbeam_channel::{unbounded as channel, RecvError, SendError, Sender}; @@ -168,6 +169,7 @@ impl Bridge { bridge_log: KompactLogger, addr: SocketAddr, dispatcher_ref: DispatcherRef, + network_config: &NetworkConfig, ) -> (Self, SocketAddr) { let (sender, receiver) = channel(); let (shutdown_p, shutdown_f) = promise(); @@ -178,6 +180,7 @@ impl Bridge { receiver, shutdown_p, dispatcher_ref.clone(), + network_config.clone(), ); let bound_addr = network_thread.addr; let bridge = Bridge { diff --git a/core/src/net/network_thread.rs b/core/src/net/network_thread.rs index 86cc0ed3..6354c8c1 100644 --- a/core/src/net/network_thread.rs +++ b/core/src/net/network_thread.rs @@ -1,5 +1,6 @@ use super::*; use crate::{ + dispatch::NetworkConfig, messaging::{DispatchEnvelope, EventEnvelope}, net::{ buffer_pool::BufferPool, @@ -87,6 +88,7 @@ impl NetworkThread { input_queue: Recv, shutdown_promise: KPromise<()>, dispatcher_ref: DispatcherRef, + config: NetworkConfig, ) -> (NetworkThread, Waker) { // Set-up the Listener debug!( @@ -120,7 +122,10 @@ impl NetworkThread { let waker = Waker::new(poll.registry(), DISPATCHER) .expect("failed to create Waker for DISPATCHER"); - let mut buffer_pool = BufferPool::new(); + let mut buffer_pool = BufferPool::with_config( + &config.get_buffer_config(), + &config.get_custom_allocator(), + ); let udp_buffer = buffer_pool .get_buffer() @@ -154,7 +159,10 @@ impl NetworkThread { ) } Err(e) => { - panic!("NetworkThread failed to bind to address: {:?}", e); + panic!( + "NetworkThread failed to bind to address: {:?}, addr {:?}", + e, &addr + ); } } } @@ -793,7 +801,7 @@ fn bind_with_retries( #[allow(unused_must_use)] mod tests { use super::*; - use crate::dispatch::NetworkConfig; + use crate::{dispatch::NetworkConfig, prelude::BufferConfig}; use std::net::{IpAddr, Ipv4Addr}; // Cleaner test-cases for manually running the thread @@ -839,6 +847,7 @@ mod tests { input_queue_1_receiver, dispatch_shutdown_sender1, dispatcher_ref.clone(), + NetworkConfig::default(), ); let (network_thread2, _) = NetworkThread::new( @@ -848,6 +857,7 @@ mod tests { input_queue_2_receiver, dispatch_shutdown_sender2, dispatcher_ref, + NetworkConfig::default(), ); ( @@ -1003,6 +1013,42 @@ mod tests { .unwrap() ); } + + #[test] + fn network_thread_custom_buffer_config() -> () { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9788); + let buffer_config = BufferConfig::new(128, 13, 14, 10); + let network_config = NetworkConfig::with_buffer_config(addr, buffer_config); + let mut cfg = KompactConfig::new(); + cfg.system_components(DeadletterBox::new, NetworkConfig::default().build()); + let system = cfg.build().expect("KompactSystem"); + + // Set-up the the threads arguments + // TODO: Mock this properly instead + let lookup = Arc::new(ArcSwap::from_pointee(ActorStore::new())); + //network_thread_registration.set_readiness(Interest::empty()); + let (_, input_queue_1_receiver) = channel(); + let (dispatch_shutdown_sender1, _) = promise(); + let logger = system.logger().clone(); + let dispatcher_ref = system.dispatcher_ref(); + + // Set up the two network threads + let (mut network_thread, _) = NetworkThread::new( + logger.clone(), + addr, + lookup.clone(), + input_queue_1_receiver, + dispatch_shutdown_sender1, + dispatcher_ref.clone(), + network_config, + ); + // Assert that the buffer_pool is created correctly + let (pool_size, _) = network_thread.buffer_pool.get_pool_sizes(); + assert_eq!(pool_size, 13); // initial_pool_size + assert_eq!(network_thread.buffer_pool.get_buffer().unwrap().len(), 128); + network_thread.stop(); + } + /* #[test] fn graceful_network_shutdown() -> () { diff --git a/core/src/runtime/system.rs b/core/src/runtime/system.rs index 8f67fbdd..caaf1aa7 100644 --- a/core/src/runtime/system.rs +++ b/core/src/runtime/system.rs @@ -15,8 +15,7 @@ use crate::{ }; use hocon::{Hocon, HoconLoader}; use oncemutex::{OnceMutex, OnceMutexGuard}; -use std::{fmt, sync::Mutex}; -use std::any::TypeId; +use std::{any::TypeId, fmt, sync::Mutex}; /// A Kompact system is a collection of components and services /// @@ -1558,9 +1557,7 @@ impl KompactRuntime { pub(crate) fn get_internal_components(&self) -> &InternalComponents { match *self.internal_components { - Some(ref ic) => { - ic - } + Some(ref ic) => ic, None => panic!("KompactRuntime was not properly initialised!"), } } diff --git a/macros/macro-test/src/main.rs b/macros/macro-test/src/main.rs index 5af09ab8..8c92233e 100644 --- a/macros/macro-test/src/main.rs +++ b/macros/macro-test/src/main.rs @@ -15,7 +15,6 @@ impl Port for PingPongPort { type Request = Ping; } - #[derive(ComponentDefinition, Actor)] struct Pinger { ctx: ComponentContext, diff --git a/rustfmt.toml b/rustfmt.toml index 80b19873..60c673b4 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -43,7 +43,7 @@ use_field_init_shorthand = false force_explicit_abi = true condense_wildcard_suffixes = false color = "Auto" -required_version = "1.4.18" +required_version = "1.4.21" unstable_features = false disable_all_formatting = false skip_children = false From 8e9815aef1ddc6f94849b35f400be9b0fd49f2e0 Mon Sep 17 00:00:00 2001 From: adamhass Date: Wed, 23 Sep 2020 14:38:40 +0200 Subject: [PATCH 2/6] downgrade travis nightly version to avoid the external library bug --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index d9fa8978..946b2e57 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ jobs: name: "Build and test root: nightly" rust: nightly script: + - rustup default nightly-2020-09-21 # remove this line when current external bug is fixed - cargo build --verbose --all - cargo test --verbose --all - stage: test-features From 56a616a2283d2c663f2e2b25936101ce56cd7f88 Mon Sep 17 00:00:00 2001 From: adamhass Date: Wed, 23 Sep 2020 15:41:07 +0200 Subject: [PATCH 3/6] changed BufferConfig creation and other minor changes according PR to comments --- .travis.yml | 15 ++++++- core/Cargo.toml | 2 +- core/src/dispatch/mod.rs | 4 +- core/src/net/buffer.rs | 75 +++++++++++++++++++--------------- core/src/net/buffer_pool.rs | 1 + core/src/net/network_thread.rs | 17 ++++---- 6 files changed, 71 insertions(+), 43 deletions(-) diff --git a/.travis.yml b/.travis.yml index 946b2e57..d2ac3327 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,8 @@ os: linux dist: xenial language: rust jobs: + allow_failures: + - if: rust = nightly include: - stage: test name: "Build and test root: stable" @@ -17,11 +19,16 @@ jobs: script: - cargo build --verbose --all - cargo test --verbose --all + - stage: test # remove this stage when nightly passes + name: "Build and test root: nightly-2020-09-21" + rust: nightly-2020-09-21 + script: + - cargo build --verbose --all + - cargo test --verbose --all - stage: test name: "Build and test root: nightly" rust: nightly script: - - rustup default nightly-2020-09-21 # remove this line when current external bug is fixed - cargo build --verbose --all - cargo test --verbose --all - stage: test-features @@ -36,6 +43,12 @@ jobs: script: - cd core - ./test-feature-matrix.sh + - stage: test-features # remove this stage when nightly passes + rust: nightly-2020-09-21 + name: "Test feature matrix: nightly" + script: + - cd core + - ./test-feature-matrix.sh - stage: deploy rust: nightly name: "Github Release" diff --git a/core/Cargo.toml b/core/Cargo.toml index b156ca96..a9c3aeb0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -54,7 +54,7 @@ slog = "2" slog-async = "2" slog-term = "2" rustc-hash = "1.1" -hocon = {version = "0.3.5", default-features = false} +hocon = {version = "0.3", default-features = false} hierarchical_hash_wheel_timer = "1.0" owning_ref = "0.4" futures = "0.3" diff --git a/core/src/dispatch/mod.rs b/core/src/dispatch/mod.rs index 7aeeb439..8902a1bf 100644 --- a/core/src/dispatch/mod.rs +++ b/core/src/dispatch/mod.rs @@ -75,7 +75,7 @@ impl NetworkConfig { NetworkConfig { addr, transport: Transport::TCP, - buffer_config: BufferConfig::default(), + buffer_config: BufferConfig::new(), custom_allocator: None, } } @@ -139,7 +139,7 @@ impl Default for NetworkConfig { NetworkConfig { addr: "127.0.0.1:0".parse().unwrap(), transport: Transport::TCP, - buffer_config: BufferConfig::default(), + buffer_config: BufferConfig::new(), custom_allocator: None, } } diff --git a/core/src/net/buffer.rs b/core/src/net/buffer.rs index 96d3fe6e..6d3bf114 100644 --- a/core/src/net/buffer.rs +++ b/core/src/net/buffer.rs @@ -32,8 +32,8 @@ pub struct BufferConfig { } impl BufferConfig { - /// Create a new BufferConfig with reasonable values - pub fn default() -> Self { + /// Create a new BufferConfig with default values which may be overwritten + pub fn new() -> Self { BufferConfig { chunk_size: 128 * 1000, // 128Kb chunks initial_pool_count: 2, // 256Kb initial/minimum BufferPools @@ -42,27 +42,27 @@ impl BufferConfig { } } - /// Creates a new BufferConfig with specified values and performs validation - pub fn new( - chunk_size: usize, - initial_pool_count: usize, - max_pool_count: usize, - encode_min_remaining: usize, - ) -> Self { - let buffer_config = BufferConfig { - chunk_size, - initial_pool_count, - max_pool_count, - encode_min_remaining, - }; - buffer_config.validate(); - buffer_config + /// Sets the BufferConfigs chunk_size to the given value + pub fn chunk_size(&mut self, size: usize) -> () { + self.chunk_size = size; + } + /// Sets the BufferConfigs initial_pool_count to the given value + pub fn initial_pool_count(&mut self, count: usize) -> () { + self.initial_pool_count = count; + } + /// Sets the BufferConfigs max_pool_count to the given value + pub fn max_pool_count(&mut self, count: usize) -> () { + self.max_pool_count = count; + } + /// Sets the BufferConfigs encode_min_remaining to the given value + pub fn encode_min_remaining(&mut self, size: usize) -> () { + self.encode_min_remaining = size; } /// Tries to deserialize a BufferConfig from the specified in the given `config`. /// Returns a default BufferConfig if it fails to read from the config. pub fn from_config(config: &Hocon) -> BufferConfig { - let mut buffer_config = BufferConfig::default(); + let mut buffer_config = BufferConfig::new(); if let Some(chunk_size) = config["buffer_config"]["chunk_size"].as_i64() { buffer_config.chunk_size = chunk_size as usize; } @@ -832,14 +832,17 @@ mod tests { #[should_panic(expected = "chunk_size must be greater than encode_min_remaining")] fn invalid_encode_min_remaining_validation() { // The BufferConfig should panic because encode_min_remain too high - let _ = BufferConfig::new(128, 10, 10, 128); + let mut buffer_config = BufferConfig::new(); + buffer_config.chunk_size(128); + buffer_config.encode_min_remaining(128); + buffer_config.validate(); } // This test instantiates an EncodeBuffer and writes the same string into it enough times that // the EncodeBuffer should overload multiple times and will have to succeed in reusing >=1 Chunk #[test] fn encode_buffer_overload_reuse_default_config() { - let buffer_config = BufferConfig::default(); + let buffer_config = BufferConfig::new(); let encode_buffer = encode_buffer_overload_reuse(&buffer_config); // Check the buffer pool sizes assert_eq!( @@ -854,12 +857,12 @@ mod tests { // As above, except we create a much larger BufferPool with larger Chunks manually configured #[test] fn encode_buffer_overload_reuse_manually_configured_large_buffers() { - let buffer_config = BufferConfig::new( - 50000000, // 50 MB chunk_size - 10, // 500 MB pool init value - 20, // 1 GB pool max size - 256, // 256 B min_remaining - ); + let mut buffer_config = BufferConfig::new(); + buffer_config.chunk_size(50000000); // 50 MB chunk_size + buffer_config.initial_pool_count(10); // 500 MB pool init value + buffer_config.max_pool_count(20); // 1 GB pool max size + buffer_config.encode_min_remaining(256);// 256 B min_remaining + let encode_buffer = encode_buffer_overload_reuse(&buffer_config); assert_eq!(encode_buffer.buffer.len(), 50000000); // Check the buffer pool sizes @@ -944,7 +947,7 @@ mod tests { #[should_panic(expected = "src too big for buffering")] fn encode_buffer_panic() { // Instantiate an encode buffer with default values - let buffer_config = BufferConfig::default(); + let buffer_config = BufferConfig::new(); let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None); let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer); use std::string::ToString; @@ -958,7 +961,7 @@ mod tests { #[test] fn aligned_after_drop() { // Instantiate an encode buffer with default values - let buffer_config = BufferConfig::default(); + let buffer_config = BufferConfig::new(); let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None); { let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer); @@ -1003,10 +1006,13 @@ mod tests { impl ComponentLifecycle for BufferTestActor { fn on_start(&mut self) -> Handled { if self.custom_buf { + let mut buffer_config = BufferConfig::new(); + buffer_config.encode_min_remaining(30); + buffer_config.max_pool_count(5); + buffer_config.initial_pool_count(4); + buffer_config.chunk_size(128); // Initialize the buffers - self.ctx - .borrow() - .init_buffers(Some(BufferConfig::new(128, 4, 5, 30)), None); + self.ctx.borrow().init_buffers(Some(buffer_config), None); } // Use the Buffer let _ = self.ctx.actor_path().clone().tell_serialised(120, self); @@ -1023,6 +1029,11 @@ mod tests { } fn buffer_config_testing_system() -> KompactSystem { let mut cfg = KompactConfig::new(); + let mut network_buffer_config = BufferConfig::new(); + network_buffer_config.chunk_size(512); + network_buffer_config.initial_pool_count(2); + network_buffer_config.max_pool_count(3); + network_buffer_config.encode_min_remaining(10); cfg.load_config_str( r#"{ buffer_config { @@ -1036,7 +1047,7 @@ mod tests { cfg.system_components(DeadletterBox::new, { NetworkConfig::with_buffer_config( "127.0.0.1:0".parse().expect("Address should work"), - BufferConfig::new(512, 2, 3, 10), + network_buffer_config, ) .build() }); diff --git a/core/src/net/buffer_pool.rs b/core/src/net/buffer_pool.rs index 0610bd41..bb502826 100644 --- a/core/src/net/buffer_pool.rs +++ b/core/src/net/buffer_pool.rs @@ -50,6 +50,7 @@ impl BufferPool { config: &BufferConfig, custom_allocator: &Option>, ) -> Self { + config.validate(); let chunk_allocator = { if let Some(allocator) = custom_allocator { allocator.clone() diff --git a/core/src/net/network_thread.rs b/core/src/net/network_thread.rs index 6354c8c1..a98a26f4 100644 --- a/core/src/net/network_thread.rs +++ b/core/src/net/network_thread.rs @@ -802,7 +802,6 @@ fn bind_with_retries( mod tests { use super::*; use crate::{dispatch::NetworkConfig, prelude::BufferConfig}; - use std::net::{IpAddr, Ipv4Addr}; // Cleaner test-cases for manually running the thread fn poll_and_handle(thread: &mut NetworkThread) -> () { @@ -872,8 +871,8 @@ mod tests { fn merge_connections_basic() -> () { // Sets up two NetworkThreads and does mutual connection request - let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7778); - let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 7780); + let addr1 = "127.0.0.1:7778".parse().expect("Address should work"); + let addr2 = "127.0.0.1:7780".parse().expect("Address should work"); let (mut network_thread1, input_queue_1_sender, mut network_thread2, input_queue_2_sender) = setup_two_threads(addr1, addr2); @@ -945,8 +944,8 @@ mod tests { // Sets up two NetworkThreads and does mutual connection request // This test uses a different order of events than basic - let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8778); - let addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8780); + let addr1 = "127.0.0.1:8778".parse().expect("Address should work"); + let addr2 = "127.0.0.1:8780".parse().expect("Address should work"); let (mut network_thread1, input_queue_1_sender, mut network_thread2, input_queue_2_sender) = setup_two_threads(addr1, addr2); @@ -1016,8 +1015,12 @@ mod tests { #[test] fn network_thread_custom_buffer_config() -> () { - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9788); - let buffer_config = BufferConfig::new(128, 13, 14, 10); + let addr = "127.0.0.1:9788".parse().expect("Address should work"); + let mut buffer_config = BufferConfig::new(); + buffer_config.chunk_size(128); + buffer_config.max_pool_count(14); + buffer_config.initial_pool_count(13); + buffer_config.encode_min_remaining(10); let network_config = NetworkConfig::with_buffer_config(addr, buffer_config); let mut cfg = KompactConfig::new(); cfg.system_components(DeadletterBox::new, NetworkConfig::default().build()); From a54edde07a7dc08ce273ada79271937077f20e5a Mon Sep 17 00:00:00 2001 From: adamhass Date: Thu, 24 Sep 2020 11:56:53 +0200 Subject: [PATCH 4/6] revert the travis.yml changes, improved BufferConfig docs and renamed fields and methods, and new() to default(). --- .travis.yml | 14 ----- core/src/dispatch/mod.rs | 4 +- core/src/net/buffer.rs | 107 ++++++++++++++++++--------------- core/src/net/buffer_pool.rs | 6 +- core/src/net/network_thread.rs | 6 +- 5 files changed, 66 insertions(+), 71 deletions(-) diff --git a/.travis.yml b/.travis.yml index d2ac3327..d9fa8978 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,8 +10,6 @@ os: linux dist: xenial language: rust jobs: - allow_failures: - - if: rust = nightly include: - stage: test name: "Build and test root: stable" @@ -19,12 +17,6 @@ jobs: script: - cargo build --verbose --all - cargo test --verbose --all - - stage: test # remove this stage when nightly passes - name: "Build and test root: nightly-2020-09-21" - rust: nightly-2020-09-21 - script: - - cargo build --verbose --all - - cargo test --verbose --all - stage: test name: "Build and test root: nightly" rust: nightly @@ -43,12 +35,6 @@ jobs: script: - cd core - ./test-feature-matrix.sh - - stage: test-features # remove this stage when nightly passes - rust: nightly-2020-09-21 - name: "Test feature matrix: nightly" - script: - - cd core - - ./test-feature-matrix.sh - stage: deploy rust: nightly name: "Github Release" diff --git a/core/src/dispatch/mod.rs b/core/src/dispatch/mod.rs index 8902a1bf..7aeeb439 100644 --- a/core/src/dispatch/mod.rs +++ b/core/src/dispatch/mod.rs @@ -75,7 +75,7 @@ impl NetworkConfig { NetworkConfig { addr, transport: Transport::TCP, - buffer_config: BufferConfig::new(), + buffer_config: BufferConfig::default(), custom_allocator: None, } } @@ -139,7 +139,7 @@ impl Default for NetworkConfig { NetworkConfig { addr: "127.0.0.1:0".parse().unwrap(), transport: Transport::TCP, - buffer_config: BufferConfig::new(), + buffer_config: BufferConfig::default(), custom_allocator: None, } } diff --git a/core/src/net/buffer.rs b/core/src/net/buffer.rs index 6d3bf114..7b660172 100644 --- a/core/src/net/buffer.rs +++ b/core/src/net/buffer.rs @@ -21,71 +21,80 @@ const FRAME_HEAD_LEN: usize = frames::FRAME_HEAD_LEN as usize; /// The configuration for the network buffers #[derive(Debug, PartialEq, Eq, Clone)] pub struct BufferConfig { - /// Specifies the length of `BufferChunk`s + /// Specifies the size in bytes of `BufferChunk`s pub(crate) chunk_size: usize, - /// Specifies the amount of `BufferChunk` a `BufferPool` will pre-allocate on creation - pub(crate) initial_pool_count: usize, - /// Specifies the maximum amount of `BufferChunk`s individual `BufferPool`s will allocate - pub(crate) max_pool_count: usize, - /// Minimum number of bytes an `EncodeBuffer` must have available before serialisation into it - pub(crate) encode_min_remaining: usize, + /// Specifies the number of `BufferChunk` a `BufferPool` will pre-allocate on creation. + pub(crate) initial_chunk_count: usize, + /// Specifies the max number of `BufferChunk`s each `BufferPool` may have. + pub(crate) max_chunk_count: usize, + /// Minimum number of bytes an `EncodeBuffer` must have available before serialisation into it. + pub(crate) encode_buf_min_free_space: usize, } impl BufferConfig { /// Create a new BufferConfig with default values which may be overwritten - pub fn new() -> Self { + /// `chunk_size` default value is 128,000. + /// `initial_chunk_count` default value is 2. + /// `max_chunk_count` default value is 1000. + /// `encode_buf_min_free_space` is 64. + pub fn default() -> Self { BufferConfig { chunk_size: 128 * 1000, // 128Kb chunks - initial_pool_count: 2, // 256Kb initial/minimum BufferPools - max_pool_count: 1000, // 128Mb maximum BufferPools - encode_min_remaining: 64, // typical L1 cache line size + initial_chunk_count: 2, // 256Kb initial/minimum BufferPools + max_chunk_count: 1000, // 128Mb maximum BufferPools + encode_buf_min_free_space: 64, // typical L1 cache line size } } - /// Sets the BufferConfigs chunk_size to the given value + /// Sets the BufferConfigs `chunk_size` to the given number of bytes. + /// Must be greater than 127 AND greater than `encode_buf_min_free_space` pub fn chunk_size(&mut self, size: usize) -> () { self.chunk_size = size; } - /// Sets the BufferConfigs initial_pool_count to the given value + /// Sets the BufferConfigs `initial_chunk_count` to the given number. + /// Must be <= `max_chunk_count`. pub fn initial_pool_count(&mut self, count: usize) -> () { - self.initial_pool_count = count; + self.initial_chunk_count = count; } - /// Sets the BufferConfigs max_pool_count to the given value - pub fn max_pool_count(&mut self, count: usize) -> () { - self.max_pool_count = count; + /// Sets the BufferConfigs `max_chunk_count` to the given number. + /// Must be >= `initial_pool_count`. + pub fn max_chunk_count(&mut self, count: usize) -> () { + self.max_chunk_count = count; } - /// Sets the BufferConfigs encode_min_remaining to the given value - pub fn encode_min_remaining(&mut self, size: usize) -> () { - self.encode_min_remaining = size; + /// Sets the BufferConfigs `encode_buf_min_free_space` to the given number of bytes. + /// Must be < `chunk_size`. + pub fn encode_buf_min_free_space(&mut self, size: usize) -> () { + self.encode_buf_min_free_space = size; } /// Tries to deserialize a BufferConfig from the specified in the given `config`. /// Returns a default BufferConfig if it fails to read from the config. pub fn from_config(config: &Hocon) -> BufferConfig { - let mut buffer_config = BufferConfig::new(); + let mut buffer_config = BufferConfig::default(); if let Some(chunk_size) = config["buffer_config"]["chunk_size"].as_i64() { buffer_config.chunk_size = chunk_size as usize; } if let Some(initial_pool_count) = config["buffer_config"]["initial_pool_count"].as_i64() { - buffer_config.initial_pool_count = initial_pool_count as usize; + buffer_config.initial_chunk_count = initial_pool_count as usize; } if let Some(max_pool_count) = config["buffer_config"]["max_pool_count"].as_i64() { - buffer_config.max_pool_count = max_pool_count as usize; + buffer_config.max_chunk_count = max_pool_count as usize; } if let Some(encode_min_remaining) = config["buffer_config"]["encode_min_remaining"].as_i64() { - buffer_config.encode_min_remaining = encode_min_remaining as usize; + buffer_config.encode_buf_min_free_space = encode_min_remaining as usize; } buffer_config.validate(); buffer_config } - /// Performs basic sanity checks on the config parameters + /// Performs basic sanity checks on the config parameters and causes a Panic if it is invalid. + /// Is called automatically by the `BufferPool` on creation. pub fn validate(&self) { - if self.initial_pool_count > self.max_pool_count { + if self.initial_chunk_count > self.max_chunk_count { panic!("initial_pool_count may not be greater than max_pool_count") } - if self.chunk_size <= self.encode_min_remaining { + if self.chunk_size <= self.encode_buf_min_free_space { panic!("chunk_size must be greater than encode_min_remaining") } if self.chunk_size < 128 { @@ -390,7 +399,7 @@ impl EncodeBuffer { write_offset: 0, read_offset: 0, dispatcher_ref: None, - min_remaining: config.encode_min_remaining, + min_remaining: config.encode_buf_min_free_space, } } else { panic!("Couldn't initialize EncodeBuffer"); @@ -411,7 +420,7 @@ impl EncodeBuffer { write_offset: 0, read_offset: 0, dispatcher_ref: Some(dispatcher_ref), - min_remaining: config.encode_min_remaining, + min_remaining: config.encode_buf_min_free_space, } } else { panic!("Couldn't initialize EncodeBuffer"); @@ -832,9 +841,9 @@ mod tests { #[should_panic(expected = "chunk_size must be greater than encode_min_remaining")] fn invalid_encode_min_remaining_validation() { // The BufferConfig should panic because encode_min_remain too high - let mut buffer_config = BufferConfig::new(); + let mut buffer_config = BufferConfig::default(); buffer_config.chunk_size(128); - buffer_config.encode_min_remaining(128); + buffer_config.encode_buf_min_free_space(128); buffer_config.validate(); } @@ -842,14 +851,14 @@ mod tests { // the EncodeBuffer should overload multiple times and will have to succeed in reusing >=1 Chunk #[test] fn encode_buffer_overload_reuse_default_config() { - let buffer_config = BufferConfig::new(); + let buffer_config = BufferConfig::default(); let encode_buffer = encode_buffer_overload_reuse(&buffer_config); // Check the buffer pool sizes assert_eq!( encode_buffer.buffer_pool.get_pool_sizes(), ( - buffer_config.initial_pool_count, // No additional has been allocated - buffer_config.initial_pool_count - 1 // Currently in the pool + buffer_config.initial_chunk_count, // No additional has been allocated + buffer_config.initial_chunk_count - 1 // Currently in the pool ) ); } @@ -857,11 +866,11 @@ mod tests { // As above, except we create a much larger BufferPool with larger Chunks manually configured #[test] fn encode_buffer_overload_reuse_manually_configured_large_buffers() { - let mut buffer_config = BufferConfig::new(); + let mut buffer_config = BufferConfig::default(); buffer_config.chunk_size(50000000); // 50 MB chunk_size buffer_config.initial_pool_count(10); // 500 MB pool init value - buffer_config.max_pool_count(20); // 1 GB pool max size - buffer_config.encode_min_remaining(256);// 256 B min_remaining + buffer_config.max_chunk_count(20); // 1 GB pool max size + buffer_config.encode_buf_min_free_space(256);// 256 B min_remaining let encode_buffer = encode_buffer_overload_reuse(&buffer_config); assert_eq!(encode_buffer.buffer.len(), 50000000); @@ -893,7 +902,7 @@ mod tests { .hocon(); let buffer_config = BufferConfig::from_config(&hocon.unwrap()); // Ensure we successfully parsed the Config - assert_eq!(buffer_config.encode_min_remaining, 2 as usize); + assert_eq!(buffer_config.encode_buf_min_free_space, 2 as usize); let encode_buffer = encode_buffer_overload_reuse(&buffer_config); // Ensure that the assert_eq!(encode_buffer.buffer.len(), 128); @@ -917,7 +926,7 @@ mod tests { // Create a string that's bigger than the ENCODEBUFFER_MIN_REMAINING let mut test_string = "".to_string(); - for i in 0..=&buffer_config.encode_min_remaining + 1 { + for i in 0..=&buffer_config.encode_buf_min_free_space + 1 { // Make sure the string isn't just the same char over and over again, // Means we test for some more correctness in buffers test_string.push((i.to_string()).chars().next().unwrap()); @@ -925,9 +934,9 @@ mod tests { let mut cnt = 0; // Make sure we churn through all the initial buffers. - for _ in 0..=&buffer_config.initial_pool_count + 1 { + for _ in 0..=&buffer_config.initial_chunk_count + 1 { // Make sure we fill up a chunk per iteration: - for _ in 0..=(&buffer_config.chunk_size / &buffer_config.encode_min_remaining) + 1 { + for _ in 0..=(&buffer_config.chunk_size / &buffer_config.encode_buf_min_free_space) + 1 { buffer_encoder.put_slice(test_string.clone().as_bytes()); let chunk = buffer_encoder.get_chunk_lease(); assert_eq!( @@ -947,7 +956,7 @@ mod tests { #[should_panic(expected = "src too big for buffering")] fn encode_buffer_panic() { // Instantiate an encode buffer with default values - let buffer_config = BufferConfig::new(); + let buffer_config = BufferConfig::default(); let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None); let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer); use std::string::ToString; @@ -961,7 +970,7 @@ mod tests { #[test] fn aligned_after_drop() { // Instantiate an encode buffer with default values - let buffer_config = BufferConfig::new(); + let buffer_config = BufferConfig::default(); let mut encode_buffer = EncodeBuffer::with_config(&buffer_config, &None); { let buffer_encoder = &mut EncodeBuffer::get_buffer_encoder(&mut encode_buffer); @@ -1006,9 +1015,9 @@ mod tests { impl ComponentLifecycle for BufferTestActor { fn on_start(&mut self) -> Handled { if self.custom_buf { - let mut buffer_config = BufferConfig::new(); - buffer_config.encode_min_remaining(30); - buffer_config.max_pool_count(5); + let mut buffer_config = BufferConfig::default(); + buffer_config.encode_buf_min_free_space(30); + buffer_config.max_chunk_count(5); buffer_config.initial_pool_count(4); buffer_config.chunk_size(128); // Initialize the buffers @@ -1029,11 +1038,11 @@ mod tests { } fn buffer_config_testing_system() -> KompactSystem { let mut cfg = KompactConfig::new(); - let mut network_buffer_config = BufferConfig::new(); + let mut network_buffer_config = BufferConfig::default(); network_buffer_config.chunk_size(512); network_buffer_config.initial_pool_count(2); - network_buffer_config.max_pool_count(3); - network_buffer_config.encode_min_remaining(10); + network_buffer_config.max_chunk_count(3); + network_buffer_config.encode_buf_min_free_space(10); cfg.load_config_str( r#"{ buffer_config { diff --git a/core/src/net/buffer_pool.rs b/core/src/net/buffer_pool.rs index bb502826..c3af8cfc 100644 --- a/core/src/net/buffer_pool.rs +++ b/core/src/net/buffer_pool.rs @@ -61,14 +61,14 @@ impl BufferPool { } }; let mut pool = VecDeque::::new(); - for _ in 0..config.initial_pool_count { + for _ in 0..config.initial_chunk_count { pool.push_front(BufferChunk::from_chunk(chunk_allocator.get_chunk())); } BufferPool { pool, - pool_size: config.initial_pool_count, + pool_size: config.initial_chunk_count, chunk_allocator, - max_pool_size: config.max_pool_count, + max_pool_size: config.max_chunk_count, } } diff --git a/core/src/net/network_thread.rs b/core/src/net/network_thread.rs index a98a26f4..decee15b 100644 --- a/core/src/net/network_thread.rs +++ b/core/src/net/network_thread.rs @@ -1016,11 +1016,11 @@ mod tests { #[test] fn network_thread_custom_buffer_config() -> () { let addr = "127.0.0.1:9788".parse().expect("Address should work"); - let mut buffer_config = BufferConfig::new(); + let mut buffer_config = BufferConfig::default(); buffer_config.chunk_size(128); - buffer_config.max_pool_count(14); + buffer_config.max_chunk_count(14); buffer_config.initial_pool_count(13); - buffer_config.encode_min_remaining(10); + buffer_config.encode_buf_min_free_space(10); let network_config = NetworkConfig::with_buffer_config(addr, buffer_config); let mut cfg = KompactConfig::new(); cfg.system_components(DeadletterBox::new, NetworkConfig::default().build()); From 2c2712f87232620679ce63266c1d277745c2aec0 Mon Sep 17 00:00:00 2001 From: adamhass Date: Mon, 28 Sep 2020 10:58:59 +0200 Subject: [PATCH 5/6] Adding 'Configuring Buffers' chapter to the tutorial --- core/src/net/buffer.rs | 8 +- docs/src/SUMMARY.md | 1 + docs/src/distributed/networkbuffers.md | 112 +++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 docs/src/distributed/networkbuffers.md diff --git a/core/src/net/buffer.rs b/core/src/net/buffer.rs index 7b660172..d11481c8 100644 --- a/core/src/net/buffer.rs +++ b/core/src/net/buffer.rs @@ -39,10 +39,10 @@ impl BufferConfig { /// `encode_buf_min_free_space` is 64. pub fn default() -> Self { BufferConfig { - chunk_size: 128 * 1000, // 128Kb chunks - initial_chunk_count: 2, // 256Kb initial/minimum BufferPools - max_chunk_count: 1000, // 128Mb maximum BufferPools - encode_buf_min_free_space: 64, // typical L1 cache line size + chunk_size: 128 * 1000, // 128KB chunks + initial_chunk_count: 2, // 256KB initial/minimum BufferPools + max_chunk_count: 1000, // 128MB maximum BufferPools + encode_buf_min_free_space: 64, // typical L1 cache line size } } diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index b589dfff..50dabd01 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -23,6 +23,7 @@ - [Basic Communication](distributed/basiccommunication.md) - [Named Services](distributed/namedservices.md) - [Serialisation](distributed/serialisation.md) + - [Configuring Buffers](distributed/networkbuffers.md) - [Async/Await Interaction](async/index.md) [Project Info](project.md) diff --git a/docs/src/distributed/networkbuffers.md b/docs/src/distributed/networkbuffers.md new file mode 100644 index 00000000..2200b54f --- /dev/null +++ b/docs/src/distributed/networkbuffers.md @@ -0,0 +1,112 @@ +# Network Buffers + +Kompact uses a BufferPool system to serialize network messages. This section will describe the BufferPools briefly and how they can be configured with different parameters. + +Before we begin describing the Network Buffers we remind the reader that there are two different methods for sending messages over the network in Kompact: +1. **Lazy serialisation** `dst.tell(msg: M, from: S);` +2. **Eager serialisation** `dst.tell_serialised(msg: M, from: &self)?;` + +With lazy serialisation the Actor moves the data to the heap, and transfers it unserialised to the `NetworkDispatcher`, which later serialises the message into its (the `NetworkDispatcher's`) own buffers. +Eager serialisation serialises the data immediately into the Actors Buffers, and then transfers ownership of the serialised the data to the `NetworkDispatcher`. + +## How the BufferPools work + +### BufferPool locations +In a Kompact system where many actors use eager serialisation there will be many `BufferPools`. Ìf the actors in the system only use lazy serialisation there will be two pools, one used by the `NetworkDispatcher` for serialising outbound data, and one used by the `NetworkThread` for receiving incoming data. + +### BufferPool, BufferChunk, ChunkLease +Each `BufferPool` (pool) consists of more than one `BufferChunks` (chunks). A chunk is the concrete memory area used for serialising data into. There may be many messages serialised into a single chunk, and discrete slices of the chunks (i.e. individual messages) can be extracted and sent to other threads/actors through the smart-pointer `ChunkLease` (lease). When a chunk runs out of space it will be locked and returned to the pool. If and only if all outstanding leases created from a chunk have been dropped may the chunk be unlocked and reused, or deallocated. + +When a pool is created it will pre-allocate a configurable amount of chunks, and will attempt to reuse those as long as possible, and only when it needs to will it allocate more chunks, up to a configurable maximum number of chunks. + +**Note: In the current version, behaviour is unstable when a pool runs out of chunks and is unable to allocate more and will often cause a panic, similar to out-of-memory exception.** + +### BufferPool interface +Actors access their pool through the `EncodeBuffer` wrapper which maintains a single active chunk at a time, and automatically swaps the active buffer with the local `BufferPool` when necessary. + +The method `tell_serialised(msg, &self)?;` automatically uses the `EncodeBuffer` interface such that users of Kompact do not need to use the interfaces of the pool (why the method requires the `self` reference). Currently, a serialised message may not be greater than the size of the `BufferChunk`. + +### BufferPool initialization +Actors initialize their local buffers automatically when the first invocation of `tell_serialised(...)` occurs. If an Actor never invokes the method it will not allocate any buffers. + +An actor may call the initialization method through the call `self.ctx.borrow().init_buffers(None, None);`[^1] to explicitly initialize the local `BufferPool` without sending a message. + +## BufferConfig + +### Parameters +There are four configurable parameters in the BufferConfig: + +1. `chunk_size`: the size (in bytes) of the `BufferChunks`, and thereby dictating the maximum serialised message size. Default value is 128KB. +2. `initial_chunk_count`: how many `BufferChunks` the `BufferPool` will pre-allocate. Default value is 2. +3. `max_chunk_count`: the maximum number of `BufferChunks` the `BufferPool` may have allocated simultaneously. Default value is 1000. +4. `encode_buf_min_free_space`: When an Actor begins serialising a message the `EncodeBuffer` will compare how much space (in bytes) is left in the active chunk and compare it to this parameter, if there is less free space the active chunk will be replaced with a new one from the pool before continuing the serialisation. Default value is 64. + +### Configuring the Buffers + +#### Individual Actor Configuration +If no `BufferConfig` is specified Kompact will use the default settings for all `BufferPools`. Actors may be configured with individual `BufferConfigs` through the `init_buffer(Some(config), None)`[^1] config. It is important that the call is made before any calls to `tell_serialised(...)`. For example, the `on_start()` function of the `ComponentLifecycle` may be used to ensure this, as in the following example: + +```rust,edition2018,no_run,noplaypen +impl ComponentLifecycle for CustomBufferConfigActor { + fn on_start(&mut self) -> Handled { + let mut buffer_config = BufferConfig::default(); + buffer_config.encode_buf_min_free_space(128); + buffer_config.max_chunk_count(5); + buffer_config.initial_pool_count(4); + buffer_config.chunk_size(256000); + + self.ctx.borrow().init_buffers(Some(buffer_config), None); + Handled::Ok + } + ... +} +``` + +#### Configuring All Actors +If a programmer wishes for all actors to use the same `BufferConfig` configuration, a Hocon string can be inserted into the `KompactConfig` or loaded from a Hocon-file ([see configuration chapter on loading configurations](./../local/configuration.md)), for example: +```rust,edition2018,no_run,noplaypen +let mut cfg = KompactConfig::new(); +cfg.load_config_str( + r#"{ + buffer_config { + chunk_size: 256000, + initial_pool_count: 3, + max_pool_count: 4, + encode_min_remaining: 20, + } + }"#, +); +... +let system = cfg.build().expect("KompactSystem"); +``` +If a `BufferConfig` is loaded into the systems `KompactConfig` then all actors will use that configuration instead of the default `BufferConfig`, however individual actors may still override the configuration by using the `init_buffers(...)` method. + +#### Configuring the NetworkDispatcher and NetworkThread +The `NetworkDispatcher` and `NetworkThread` are configured separately from the Actors and use their buffers for lazy serialisation and receiving data from the network. To configure their buffers the `NetworkConfig` may be created using the method `::with_buffer_config(...)` as in the example below: + +```rust,edition2018,no_run,noplaypen +let mut cfg = KompactConfig::new(); +let mut network_buffer_config = BufferConfig::default(); +network_buffer_config.chunk_size(512); +network_buffer_config.initial_pool_count(2); +network_buffer_config.max_chunk_count(3); +network_buffer_config.encode_buf_min_free_space(10); +cfg.system_components(DeadletterBox::new, { + NetworkConfig::with_buffer_config( + "127.0.0.1:0".parse().expect("Address should work"), + network_buffer_config, + ) + .build() +}); +let system = cfg.build().expect("KompactSystem"); +``` + +#### BufferConfig Validation + +`BufferConfig` implements the method `validate()` which causes a panic if any of the parameters are valid. It is invoked whenever a `BufferPool` is created from the given configuration. The validation checks the following conditions hold true: +`chunk_size` > `encode_buf_min_free_space` +`chunk_size` > 127 +`max_chunk_count` >= `initial_chunk_count` + +- - - +[^1]: The method `init_buffers(...)` takes two `Option` arguments, of which the second argument has not been covered. The second argument allows users of Kompact to specify a `CustomAllocator`, an untested experimental feature which is left undocumented for now. \ No newline at end of file From ccec9c5b76200930c3d4c20a55b7b5c7ef19b42c Mon Sep 17 00:00:00 2001 From: adamhass Date: Tue, 29 Sep 2020 13:20:06 +0200 Subject: [PATCH 6/6] Fixed incorrect renaming from before, muting debug prints from the features tested in test-feature-matrix, changed the test_actors info! prints to debug! --- core/Cargo.toml | 12 ++++++------ core/src/dispatch/mod.rs | 8 ++++---- core/src/net/buffer.rs | 24 ++++++++++++------------ core/src/net/network_thread.rs | 2 +- 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index a9c3aeb0..d6774abc 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -26,12 +26,12 @@ maintenance = { status = "actively-developed" } [features] default = ["serde_support","slog/max_level_info", "slog/release_max_level_info", "ser_id_64", "use_local_executor"] silent_logging = ["serde_support", "slog/max_level_info", "slog/release_max_level_error"] -low_latency = ["executors/ws-no-park"] -ser_id_64 = [] -ser_id_32 = [] -ser_id_16 = [] -ser_id_8 = [] -thread_pinning = ["core_affinity", "executors/thread-pinning"] +low_latency = ["executors/ws-no-park", "slog/max_level_info"] +ser_id_64 = ["slog/max_level_info"] +ser_id_32 = ["slog/max_level_info"] +ser_id_16 = ["slog/max_level_info"] +ser_id_8 = ["slog/max_level_info"] +thread_pinning = ["core_affinity", "executors/thread-pinning", "slog/max_level_info"] serde_support = ["serde", "bytes/serde"] type_erasure = [] use_local_executor = [] diff --git a/core/src/dispatch/mod.rs b/core/src/dispatch/mod.rs index 7aeeb439..a3f99513 100644 --- a/core/src/dispatch/mod.rs +++ b/core/src/dispatch/mod.rs @@ -2129,7 +2129,7 @@ mod dispatch_tests { impl ComponentLifecycle for PingerAct { fn on_start(&mut self) -> Handled { - info!(self.ctx.log(), "Starting"); + debug!(self.ctx.log(), "Starting"); if self.eager { self.target .tell_serialised(PingMsg { i: 0 }, self) @@ -2151,7 +2151,7 @@ mod dispatch_tests { fn receive_network(&mut self, msg: NetMessage) -> Handled { match msg.try_deserialise::() { Ok(pong) => { - info!(self.ctx.log(), "Got msg {:?}", pong); + debug!(self.ctx.log(), "Got msg {:?}", pong); self.count += 1; if self.count < PING_COUNT { if self.eager { @@ -2204,7 +2204,7 @@ mod dispatch_tests { let sender = msg.sender; match_deser! {msg.data; { ping: PingMsg [PingPongSer] => { - info!(self.ctx.log(), "Got msg {:?} from {}", ping, sender); + debug!(self.ctx.log(), "Got msg {:?} from {}", ping, sender); let pong = PongMsg { i: ping.i }; if self.eager { sender @@ -2242,7 +2242,7 @@ mod dispatch_tests { } fn receive_network(&mut self, msg: NetMessage) -> Handled { - info!( + debug!( self.ctx.log(), "Forwarding some msg from {} to {}", msg.sender, self.forward_to ); diff --git a/core/src/net/buffer.rs b/core/src/net/buffer.rs index d11481c8..f78592d6 100644 --- a/core/src/net/buffer.rs +++ b/core/src/net/buffer.rs @@ -53,11 +53,11 @@ impl BufferConfig { } /// Sets the BufferConfigs `initial_chunk_count` to the given number. /// Must be <= `max_chunk_count`. - pub fn initial_pool_count(&mut self, count: usize) -> () { + pub fn initial_chunk_count(&mut self, count: usize) -> () { self.initial_chunk_count = count; } /// Sets the BufferConfigs `max_chunk_count` to the given number. - /// Must be >= `initial_pool_count`. + /// Must be >= `initial_chunk_count`. pub fn max_chunk_count(&mut self, count: usize) -> () { self.max_chunk_count = count; } @@ -74,8 +74,8 @@ impl BufferConfig { if let Some(chunk_size) = config["buffer_config"]["chunk_size"].as_i64() { buffer_config.chunk_size = chunk_size as usize; } - if let Some(initial_pool_count) = config["buffer_config"]["initial_pool_count"].as_i64() { - buffer_config.initial_chunk_count = initial_pool_count as usize; + if let Some(initial_chunk_count) = config["buffer_config"]["initial_chunk_count"].as_i64() { + buffer_config.initial_chunk_count = initial_chunk_count as usize; } if let Some(max_pool_count) = config["buffer_config"]["max_pool_count"].as_i64() { buffer_config.max_chunk_count = max_pool_count as usize; @@ -92,7 +92,7 @@ impl BufferConfig { /// Is called automatically by the `BufferPool` on creation. pub fn validate(&self) { if self.initial_chunk_count > self.max_chunk_count { - panic!("initial_pool_count may not be greater than max_pool_count") + panic!("initial_chunk_count may not be greater than max_pool_count") } if self.chunk_size <= self.encode_buf_min_free_space { panic!("chunk_size must be greater than encode_min_remaining") @@ -815,14 +815,14 @@ mod tests { use std::{borrow::Borrow, time::Duration}; #[test] - #[should_panic(expected = "initial_pool_count may not be greater than max_pool_count")] + #[should_panic(expected = "initial_chunk_count may not be greater than max_pool_count")] fn invalid_pool_counts_config_validation() { let hocon = HoconLoader::new() .load_str( r#"{ buffer_config { chunk_size: 64, - initial_pool_count: 3, + initial_chunk_count: 3, max_pool_count: 2, encode_min_remaining: 2, } @@ -868,7 +868,7 @@ mod tests { fn encode_buffer_overload_reuse_manually_configured_large_buffers() { let mut buffer_config = BufferConfig::default(); buffer_config.chunk_size(50000000); // 50 MB chunk_size - buffer_config.initial_pool_count(10); // 500 MB pool init value + buffer_config.initial_chunk_count(10); // 500 MB pool init value buffer_config.max_chunk_count(20); // 1 GB pool max size buffer_config.encode_buf_min_free_space(256);// 256 B min_remaining @@ -892,7 +892,7 @@ mod tests { r#"{ buffer_config { chunk_size: 128, - initial_pool_count: 2, + initial_chunk_count: 2, max_pool_count: 2, encode_min_remaining: 2, } @@ -1018,7 +1018,7 @@ mod tests { let mut buffer_config = BufferConfig::default(); buffer_config.encode_buf_min_free_space(30); buffer_config.max_chunk_count(5); - buffer_config.initial_pool_count(4); + buffer_config.initial_chunk_count(4); buffer_config.chunk_size(128); // Initialize the buffers self.ctx.borrow().init_buffers(Some(buffer_config), None); @@ -1040,14 +1040,14 @@ mod tests { let mut cfg = KompactConfig::new(); let mut network_buffer_config = BufferConfig::default(); network_buffer_config.chunk_size(512); - network_buffer_config.initial_pool_count(2); + network_buffer_config.initial_chunk_count(2); network_buffer_config.max_chunk_count(3); network_buffer_config.encode_buf_min_free_space(10); cfg.load_config_str( r#"{ buffer_config { chunk_size: 256, - initial_pool_count: 3, + initial_chunk_count: 3, max_pool_count: 4, encode_min_remaining: 20, } diff --git a/core/src/net/network_thread.rs b/core/src/net/network_thread.rs index decee15b..df557858 100644 --- a/core/src/net/network_thread.rs +++ b/core/src/net/network_thread.rs @@ -1019,7 +1019,7 @@ mod tests { let mut buffer_config = BufferConfig::default(); buffer_config.chunk_size(128); buffer_config.max_chunk_count(14); - buffer_config.initial_pool_count(13); + buffer_config.initial_chunk_count(13); buffer_config.encode_buf_min_free_space(10); let network_config = NetworkConfig::with_buffer_config(addr, buffer_config); let mut cfg = KompactConfig::new();