diff --git a/iceoryx2/src/port/details/data_segment.rs b/iceoryx2/src/port/details/data_segment.rs new file mode 100644 index 000000000..dcdbe0930 --- /dev/null +++ b/iceoryx2/src/port/details/data_segment.rs @@ -0,0 +1,35 @@ +// Copyright (c) 2023 - 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use iceoryx2_cal::shm_allocator::AllocationStrategy; + +use crate::service; + +#[repr(C)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum DataSegmentType { + Dynamic, + Static, +} + +impl DataSegmentType { + pub(crate) fn new_from_allocation_strategy(v: AllocationStrategy) -> Self { + match v { + AllocationStrategy::Static => DataSegmentType::Static, + _ => DataSegmentType::Dynamic, + } + } +} + +pub(crate) struct DataSegment { + memory: Service::SharedMemory, +} diff --git a/iceoryx2/src/port/details/mod.rs b/iceoryx2/src/port/details/mod.rs index 1cf265a4c..aca86cae8 100644 --- a/iceoryx2/src/port/details/mod.rs +++ b/iceoryx2/src/port/details/mod.rs @@ -10,5 +10,6 @@ // // SPDX-License-Identifier: Apache-2.0 OR MIT +pub(crate) mod data_segment; pub(crate) mod publisher_connections; pub(crate) mod subscriber_connections; diff --git a/iceoryx2/src/port/publisher.rs b/iceoryx2/src/port/publisher.rs index 2d5a711f1..b8f940500 100644 --- a/iceoryx2/src/port/publisher.rs +++ b/iceoryx2/src/port/publisher.rs @@ -101,6 +101,7 @@ //! # } //! ``` +use super::details::data_segment::DataSegmentType; use super::port_identifiers::UniquePublisherId; use super::UniqueSubscriberId; use crate::port::details::subscriber_connections::*; @@ -110,9 +111,7 @@ use crate::raw_sample::RawSampleMut; use crate::sample_mut_uninit::SampleMutUninit; use crate::service::builder::publish_subscribe::CustomPayloadMarker; use crate::service::config_scheme::{connection_config, data_segment_config}; -use crate::service::dynamic_config::publish_subscribe::{ - DataSegmentType, PublisherDetails, SubscriberDetails, -}; +use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails}; use crate::service::header::publish_subscribe::Header; use crate::service::naming_scheme::{ data_segment_name, extract_publisher_id_from_connection, extract_subscriber_id_from_connection, @@ -240,9 +239,9 @@ pub(crate) enum RemovePubSubPortFromAllConnectionsError { } #[derive(Debug)] -pub(crate) struct DataSegment { +pub(crate) struct PublisherBackend { sample_reference_counter: Vec, - memory: Service::SharedMemory, + data_segment: Service::SharedMemory, payload_size: usize, payload_type_layout: Layout, port_id: UniquePublisherId, @@ -257,7 +256,7 @@ pub(crate) struct DataSegment { is_active: IoxAtomicBool, } -impl DataSegment { +impl PublisherBackend { fn sample_index(&self, distance_to_chunk: usize) -> usize { distance_to_chunk / self.payload_size } @@ -266,7 +265,7 @@ impl DataSegment { self.retrieve_returned_samples(); let msg = "Unable to allocate Sample"; - let ptr = self.memory.allocate(layout)?; + let ptr = self.data_segment.allocate(layout)?; if self.sample_reference_counter[self.sample_index(ptr.offset.offset())] .fetch_add(1, Ordering::Relaxed) != 0 @@ -289,7 +288,7 @@ impl DataSegment { == 1 { unsafe { - self.memory + self.data_segment .deallocate(distance_to_chunk, self.payload_type_layout); } } @@ -538,7 +537,7 @@ pub struct Publisher< Payload: Debug + ?Sized + 'static, UserHeader: Debug, > { - pub(crate) data_segment: Arc>, + pub(crate) backend: Arc>, dynamic_publisher_handle: Option, payload_size: usize, _payload: PhantomData, @@ -550,7 +549,7 @@ impl Drop { fn drop(&mut self) { if let Some(handle) = self.dynamic_publisher_handle { - self.data_segment + self.backend .service_state .dynamic_storage .get() @@ -584,28 +583,38 @@ impl .messaging_pattern .required_amount_of_samples_per_data_segment(config.max_loaned_samples); - let data_segment_type = if config.allocation_strategy == AllocationStrategy::Static { - DataSegmentType::Static - } else { - DataSegmentType::Dynamic + let data_segment_type = + DataSegmentType::new_from_allocation_strategy(config.allocation_strategy); + + let sample_layout = static_config + .message_type_details + .sample_layout(config.initial_max_slice_len); + + let max_slice_len = config.initial_max_slice_len; + let publisher_details = PublisherDetails { + data_segment_type, + publisher_id: port_id, + number_of_samples, + max_slice_len, + node_id: *service.__internal_state().shared_node.id(), }; + let global_config = service.__internal_state().shared_node.config(); let data_segment = fail!(from origin, - when Self::create_data_segment(&port_id, service.__internal_state().shared_node.config(), number_of_samples, static_config, &config), + when Self::create_data_segment(&publisher_details, &global_config, sample_layout), with PublisherCreateError::UnableToCreateDataSegment, "{} since the data segment could not be acquired.", msg); - let max_slice_len = config.initial_max_slice_len; - let data_segment = Arc::new(DataSegment { + let backend = Arc::new(PublisherBackend { is_active: IoxAtomicBool::new(true), - memory: data_segment, + data_segment, payload_size: static_config .message_type_details() - .sample_layout(config.initial_max_slice_len) + .sample_layout(max_slice_len) .size(), payload_type_layout: static_config .message_type_details() - .payload_layout(config.initial_max_slice_len), + .payload_layout(max_slice_len), sample_reference_counter: { let mut v = Vec::with_capacity(number_of_samples); for _ in 0..number_of_samples { @@ -632,7 +641,7 @@ impl loan_counter: IoxAtomicUsize::new(0), }); - let payload_size = data_segment + let payload_size = backend .subscriber_connections .static_config .message_type_details @@ -640,14 +649,14 @@ impl .size; let mut new_self = Self { - data_segment, + backend, dynamic_publisher_handle: None, payload_size, _payload: PhantomData, _user_header: PhantomData, }; - if let Err(e) = new_self.data_segment.populate_subscriber_channels() { + if let Err(e) = new_self.backend.populate_subscriber_channels() { warn!(from new_self, "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e); } @@ -660,13 +669,8 @@ impl .dynamic_storage .get() .publish_subscribe() - .add_publisher_id(PublisherDetails { - data_segment_type, - publisher_id: port_id, - number_of_samples, - max_slice_len, - node_id: *service.__internal_state().shared_node.id(), - }) { + .add_publisher_id(publisher_details) + { Some(unique_index) => unique_index, None => { fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers, @@ -681,59 +685,54 @@ impl } fn create_data_segment( - port_id: &UniquePublisherId, + details: &PublisherDetails, global_config: &config::Config, - number_of_samples: usize, - static_config: &publish_subscribe::StaticConfig, - config: &LocalPublisherConfig, + sample_layout: Layout, ) -> Result { - let l = static_config - .message_type_details - .sample_layout(config.initial_max_slice_len); - let allocator_config = shm_allocator::pool_allocator::Config { bucket_layout: l }; + let allocator_config = shm_allocator::pool_allocator::Config { + bucket_layout: sample_layout, + }; Ok(fail!(from "Publisher::create_data_segment()", when <>::Builder as NamedConceptBuilder< Service::SharedMemory, - >>::new(&data_segment_name(port_id)) + >>::new(&data_segment_name(&details.publisher_id)) .config(&data_segment_config::(global_config)) - .size(l.size() * number_of_samples + l.align() - 1) + .size(sample_layout.size() * details.number_of_samples + sample_layout.align() - 1) .create(&allocator_config), "Unable to create the data segment.")) } /// Returns the [`UniquePublisherId`] of the [`Publisher`] pub fn id(&self) -> UniquePublisherId { - self.data_segment.port_id + self.backend.port_id } /// Returns the strategy the [`Publisher`] follows when a [`SampleMut`] cannot be delivered /// since the [`Subscriber`](crate::port::subscriber::Subscriber)s buffer is full. pub fn unable_to_deliver_strategy(&self) -> UnableToDeliverStrategy { - self.data_segment.config.unable_to_deliver_strategy + self.backend.config.unable_to_deliver_strategy } /// Returns the maximum slice length configured for this [`Publisher`]. pub fn initial_max_slice_len(&self) -> usize { - self.data_segment.config.initial_max_slice_len + self.backend.config.initial_max_slice_len } fn allocate(&self, layout: Layout) -> Result { let msg = "Unable to allocate Sample with"; - if self.data_segment.loan_counter.load(Ordering::Relaxed) - >= self.data_segment.config.max_loaned_samples + if self.backend.loan_counter.load(Ordering::Relaxed) + >= self.backend.config.max_loaned_samples { fail!(from self, with PublisherLoanError::ExceedsMaxLoanedSamples, "{} {:?} since already {} samples were loaned and it would exceed the maximum of parallel loans of {}. Release or send a loaned sample to loan another sample.", - msg, layout, self.data_segment.loan_counter.load(Ordering::Relaxed), self.data_segment.config.max_loaned_samples); + msg, layout, self.backend.loan_counter.load(Ordering::Relaxed), self.backend.config.max_loaned_samples); } - match self.data_segment.allocate(layout) { + match self.backend.allocate(layout) { Ok(chunk) => { - self.data_segment - .loan_counter - .fetch_add(1, Ordering::Relaxed); + self.backend.loan_counter.fetch_add(1, Ordering::Relaxed); Ok(chunk) } Err(ShmAllocationError::AllocationError(AllocationError::OutOfMemory)) => { @@ -752,7 +751,7 @@ impl } fn sample_layout(&self, number_of_elements: usize) -> Layout { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -760,7 +759,7 @@ impl } fn user_header_ptr(&self, header: *const Header) -> *const u8 { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -769,7 +768,7 @@ impl } fn payload_ptr(&self, header: *const Header) -> *const u8 { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -778,7 +777,7 @@ impl } fn payload_type_variant(&self) -> TypeVariant { - self.data_segment + self.backend .subscriber_connections .static_config .message_type_details @@ -859,13 +858,13 @@ impl let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader; let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit; - unsafe { header_ptr.write(Header::new(self.data_segment.port_id, 1)) }; + unsafe { header_ptr.write(Header::new(self.backend.port_id, 1)) }; let sample = unsafe { RawSampleMut::new_unchecked(header_ptr, user_header_ptr, payload_ptr) }; Ok( SampleMutUninit::, UserHeader>::new( - &self.data_segment, + &self.backend, sample, chunk.offset, ), @@ -1004,7 +1003,7 @@ impl underlying_number_of_slice_elements: usize, ) -> Result], UserHeader>, PublisherLoanError> { - let max_slice_len = self.data_segment.config.initial_max_slice_len; + let max_slice_len = self.backend.config.initial_max_slice_len; if max_slice_len < slice_len { fail!(from self, with PublisherLoanError::ExceedsMaxLoanSize, "Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.", @@ -1017,7 +1016,7 @@ impl let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader; let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit; - unsafe { header_ptr.write(Header::new(self.data_segment.port_id, slice_len as _)) }; + unsafe { header_ptr.write(Header::new(self.backend.port_id, slice_len as _)) }; let sample = unsafe { RawSampleMut::new_unchecked( @@ -1029,7 +1028,7 @@ impl Ok( SampleMutUninit::], UserHeader>::new( - &self.data_segment, + &self.backend, sample, chunk.offset, ), @@ -1069,7 +1068,7 @@ impl Upda for Publisher { fn update_connections(&self) -> Result<(), ConnectionFailure> { - self.data_segment.update_connections() + self.backend.update_connections() } } diff --git a/iceoryx2/src/sample_mut.rs b/iceoryx2/src/sample_mut.rs index de4b29404..b5ff7882e 100644 --- a/iceoryx2/src/sample_mut.rs +++ b/iceoryx2/src/sample_mut.rs @@ -64,7 +64,7 @@ //! ``` use crate::{ - port::publisher::{DataSegment, PublisherSendError}, + port::publisher::{PublisherBackend, PublisherSendError}, raw_sample::RawSampleMut, service::header::publish_subscribe::Header, }; @@ -87,7 +87,7 @@ use std::{ /// Does not implement [`Send`] since it releases unsent samples in the [`crate::port::publisher::Publisher`] and the /// [`crate::port::publisher::Publisher`] is not thread-safe! pub struct SampleMut { - pub(crate) data_segment: Arc>, + pub(crate) publisher_backend: Arc>, pub(crate) ptr: RawSampleMut, pub(crate) offset_to_chunk: PointerOffset, } @@ -98,11 +98,11 @@ impl Debu fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "SampleMut<{}, {}, {}> {{ data_segment: {:?}, offset_to_chunk: {:?} }}", + "SampleMut<{}, {}, {}> {{ publisher_backend: {:?}, offset_to_chunk: {:?} }}", core::any::type_name::(), core::any::type_name::(), core::any::type_name::(), - self.data_segment, + self.publisher_backend, self.offset_to_chunk ) } @@ -112,7 +112,8 @@ impl Drop for SampleMut { fn drop(&mut self) { - self.data_segment.return_loaned_sample(self.offset_to_chunk); + self.publisher_backend + .return_loaned_sample(self.offset_to_chunk); } } @@ -288,6 +289,7 @@ impl< /// # } /// ``` pub fn send(self) -> Result { - self.data_segment.send_sample(self.offset_to_chunk.offset()) + self.publisher_backend + .send_sample(self.offset_to_chunk.offset()) } } diff --git a/iceoryx2/src/sample_mut_uninit.rs b/iceoryx2/src/sample_mut_uninit.rs index e4d23950c..5b5591aa1 100644 --- a/iceoryx2/src/sample_mut_uninit.rs +++ b/iceoryx2/src/sample_mut_uninit.rs @@ -95,7 +95,7 @@ use std::{fmt::Debug, mem::MaybeUninit, sync::Arc}; use iceoryx2_cal::shm_allocator::PointerOffset; use crate::{ - port::publisher::DataSegment, raw_sample::RawSampleMut, sample_mut::SampleMut, + port::publisher::PublisherBackend, raw_sample::RawSampleMut, sample_mut::SampleMut, service::header::publish_subscribe::Header, }; @@ -261,13 +261,13 @@ impl SampleMutUninit, UserHeader> { pub(crate) fn new( - data_segment: &Arc>, + publisher_backend: &Arc>, ptr: RawSampleMut>, offset_to_chunk: PointerOffset, ) -> Self { Self { sample: SampleMut { - data_segment: Arc::clone(data_segment), + publisher_backend: Arc::clone(publisher_backend), ptr, offset_to_chunk, }, @@ -341,13 +341,13 @@ impl SampleMutUninit], UserHeader> { pub(crate) fn new( - data_segment: &Arc>, + publisher_backend: &Arc>, ptr: RawSampleMut]>, offset_to_chunk: PointerOffset, ) -> Self { Self { sample: SampleMut { - data_segment: Arc::clone(data_segment), + publisher_backend: Arc::clone(publisher_backend), ptr, offset_to_chunk, }, diff --git a/iceoryx2/src/service/dynamic_config/publish_subscribe.rs b/iceoryx2/src/service/dynamic_config/publish_subscribe.rs index 26dbd87cc..591c1efe3 100644 --- a/iceoryx2/src/service/dynamic_config/publish_subscribe.rs +++ b/iceoryx2/src/service/dynamic_config/publish_subscribe.rs @@ -33,18 +33,14 @@ use iceoryx2_bb_memory::bump_allocator::BumpAllocator; use crate::{ node::NodeId, - port::port_identifiers::{UniquePortId, UniquePublisherId, UniqueSubscriberId}, + port::{ + details::data_segment::DataSegmentType, + port_identifiers::{UniquePortId, UniquePublisherId, UniqueSubscriberId}, + }, }; use super::PortCleanupAction; -#[repr(C)] -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub(crate) enum DataSegmentType { - Dynamic, - Static, -} - #[repr(C)] #[derive(Debug, Clone, Copy)] pub(crate) struct DynamicConfigSettings {