Skip to content

Commit

Permalink
[eclipse-iceoryx#532] Renamed DataSegment into PublisherBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Nov 28, 2024
1 parent b15ae4f commit 5739a13
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 79 deletions.
35 changes: 35 additions & 0 deletions iceoryx2/src/port/details/data_segment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2023 - 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

use iceoryx2_cal::shm_allocator::AllocationStrategy;

use crate::service;

#[repr(C)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum DataSegmentType {
Dynamic,
Static,
}

impl DataSegmentType {
pub(crate) fn new_from_allocation_strategy(v: AllocationStrategy) -> Self {
match v {
AllocationStrategy::Static => DataSegmentType::Static,
_ => DataSegmentType::Dynamic,
}
}
}

pub(crate) struct DataSegment<Service: service::Service> {
memory: Service::SharedMemory,
}
1 change: 1 addition & 0 deletions iceoryx2/src/port/details/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

pub(crate) mod data_segment;
pub(crate) mod publisher_connections;
pub(crate) mod subscriber_connections;
119 changes: 59 additions & 60 deletions iceoryx2/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
//! # }
//! ```
use super::details::data_segment::DataSegmentType;
use super::port_identifiers::UniquePublisherId;
use super::UniqueSubscriberId;
use crate::port::details::subscriber_connections::*;
Expand All @@ -110,9 +111,7 @@ use crate::raw_sample::RawSampleMut;
use crate::sample_mut_uninit::SampleMutUninit;
use crate::service::builder::publish_subscribe::CustomPayloadMarker;
use crate::service::config_scheme::{connection_config, data_segment_config};
use crate::service::dynamic_config::publish_subscribe::{
DataSegmentType, PublisherDetails, SubscriberDetails,
};
use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails};
use crate::service::header::publish_subscribe::Header;
use crate::service::naming_scheme::{
data_segment_name, extract_publisher_id_from_connection, extract_subscriber_id_from_connection,
Expand Down Expand Up @@ -240,9 +239,9 @@ pub(crate) enum RemovePubSubPortFromAllConnectionsError {
}

#[derive(Debug)]
pub(crate) struct DataSegment<Service: service::Service> {
pub(crate) struct PublisherBackend<Service: service::Service> {
sample_reference_counter: Vec<IoxAtomicU64>,
memory: Service::SharedMemory,
data_segment: Service::SharedMemory,
payload_size: usize,
payload_type_layout: Layout,
port_id: UniquePublisherId,
Expand All @@ -257,7 +256,7 @@ pub(crate) struct DataSegment<Service: service::Service> {
is_active: IoxAtomicBool,
}

impl<Service: service::Service> DataSegment<Service> {
impl<Service: service::Service> PublisherBackend<Service> {
fn sample_index(&self, distance_to_chunk: usize) -> usize {
distance_to_chunk / self.payload_size
}
Expand All @@ -266,7 +265,7 @@ impl<Service: service::Service> DataSegment<Service> {
self.retrieve_returned_samples();

let msg = "Unable to allocate Sample";
let ptr = self.memory.allocate(layout)?;
let ptr = self.data_segment.allocate(layout)?;
if self.sample_reference_counter[self.sample_index(ptr.offset.offset())]
.fetch_add(1, Ordering::Relaxed)
!= 0
Expand All @@ -289,7 +288,7 @@ impl<Service: service::Service> DataSegment<Service> {
== 1
{
unsafe {
self.memory
self.data_segment
.deallocate(distance_to_chunk, self.payload_type_layout);
}
}
Expand Down Expand Up @@ -538,7 +537,7 @@ pub struct Publisher<
Payload: Debug + ?Sized + 'static,
UserHeader: Debug,
> {
pub(crate) data_segment: Arc<DataSegment<Service>>,
pub(crate) backend: Arc<PublisherBackend<Service>>,
dynamic_publisher_handle: Option<ContainerHandle>,
payload_size: usize,
_payload: PhantomData<Payload>,
Expand All @@ -550,7 +549,7 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug> Drop
{
fn drop(&mut self) {
if let Some(handle) = self.dynamic_publisher_handle {
self.data_segment
self.backend
.service_state
.dynamic_storage
.get()
Expand Down Expand Up @@ -584,28 +583,38 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
.messaging_pattern
.required_amount_of_samples_per_data_segment(config.max_loaned_samples);

let data_segment_type = if config.allocation_strategy == AllocationStrategy::Static {
DataSegmentType::Static
} else {
DataSegmentType::Dynamic
let data_segment_type =
DataSegmentType::new_from_allocation_strategy(config.allocation_strategy);

let sample_layout = static_config
.message_type_details
.sample_layout(config.initial_max_slice_len);

let max_slice_len = config.initial_max_slice_len;
let publisher_details = PublisherDetails {
data_segment_type,
publisher_id: port_id,
number_of_samples,
max_slice_len,
node_id: *service.__internal_state().shared_node.id(),
};
let global_config = service.__internal_state().shared_node.config();

let data_segment = fail!(from origin,
when Self::create_data_segment(&port_id, service.__internal_state().shared_node.config(), number_of_samples, static_config, &config),
when Self::create_data_segment(&publisher_details, &global_config, sample_layout),
with PublisherCreateError::UnableToCreateDataSegment,
"{} since the data segment could not be acquired.", msg);

let max_slice_len = config.initial_max_slice_len;
let data_segment = Arc::new(DataSegment {
let backend = Arc::new(PublisherBackend {
is_active: IoxAtomicBool::new(true),
memory: data_segment,
data_segment,
payload_size: static_config
.message_type_details()
.sample_layout(config.initial_max_slice_len)
.sample_layout(max_slice_len)
.size(),
payload_type_layout: static_config
.message_type_details()
.payload_layout(config.initial_max_slice_len),
.payload_layout(max_slice_len),
sample_reference_counter: {
let mut v = Vec::with_capacity(number_of_samples);
for _ in 0..number_of_samples {
Expand All @@ -632,22 +641,22 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
loan_counter: IoxAtomicUsize::new(0),
});

let payload_size = data_segment
let payload_size = backend
.subscriber_connections
.static_config
.message_type_details
.payload
.size;

let mut new_self = Self {
data_segment,
backend,
dynamic_publisher_handle: None,
payload_size,
_payload: PhantomData,
_user_header: PhantomData,
};

if let Err(e) = new_self.data_segment.populate_subscriber_channels() {
if let Err(e) = new_self.backend.populate_subscriber_channels() {
warn!(from new_self, "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e);
}

Expand All @@ -660,13 +669,8 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
.dynamic_storage
.get()
.publish_subscribe()
.add_publisher_id(PublisherDetails {
data_segment_type,
publisher_id: port_id,
number_of_samples,
max_slice_len,
node_id: *service.__internal_state().shared_node.id(),
}) {
.add_publisher_id(publisher_details)
{
Some(unique_index) => unique_index,
None => {
fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers,
Expand All @@ -681,59 +685,54 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
}

fn create_data_segment(
port_id: &UniquePublisherId,
details: &PublisherDetails,
global_config: &config::Config,
number_of_samples: usize,
static_config: &publish_subscribe::StaticConfig,
config: &LocalPublisherConfig,
sample_layout: Layout,
) -> Result<Service::SharedMemory, SharedMemoryCreateError> {
let l = static_config
.message_type_details
.sample_layout(config.initial_max_slice_len);
let allocator_config = shm_allocator::pool_allocator::Config { bucket_layout: l };
let allocator_config = shm_allocator::pool_allocator::Config {
bucket_layout: sample_layout,
};

Ok(fail!(from "Publisher::create_data_segment()",
when <<Service::SharedMemory as SharedMemory<PoolAllocator>>::Builder as NamedConceptBuilder<
Service::SharedMemory,
>>::new(&data_segment_name(port_id))
>>::new(&data_segment_name(&details.publisher_id))
.config(&data_segment_config::<Service>(global_config))
.size(l.size() * number_of_samples + l.align() - 1)
.size(sample_layout.size() * details.number_of_samples + sample_layout.align() - 1)
.create(&allocator_config),
"Unable to create the data segment."))
}

/// Returns the [`UniquePublisherId`] of the [`Publisher`]
pub fn id(&self) -> UniquePublisherId {
self.data_segment.port_id
self.backend.port_id
}

/// Returns the strategy the [`Publisher`] follows when a [`SampleMut`] cannot be delivered
/// since the [`Subscriber`](crate::port::subscriber::Subscriber)s buffer is full.
pub fn unable_to_deliver_strategy(&self) -> UnableToDeliverStrategy {
self.data_segment.config.unable_to_deliver_strategy
self.backend.config.unable_to_deliver_strategy
}

/// Returns the maximum slice length configured for this [`Publisher`].
pub fn initial_max_slice_len(&self) -> usize {
self.data_segment.config.initial_max_slice_len
self.backend.config.initial_max_slice_len
}

fn allocate(&self, layout: Layout) -> Result<ShmPointer, PublisherLoanError> {
let msg = "Unable to allocate Sample with";

if self.data_segment.loan_counter.load(Ordering::Relaxed)
>= self.data_segment.config.max_loaned_samples
if self.backend.loan_counter.load(Ordering::Relaxed)
>= self.backend.config.max_loaned_samples
{
fail!(from self, with PublisherLoanError::ExceedsMaxLoanedSamples,
"{} {:?} since already {} samples were loaned and it would exceed the maximum of parallel loans of {}. Release or send a loaned sample to loan another sample.",
msg, layout, self.data_segment.loan_counter.load(Ordering::Relaxed), self.data_segment.config.max_loaned_samples);
msg, layout, self.backend.loan_counter.load(Ordering::Relaxed), self.backend.config.max_loaned_samples);
}

match self.data_segment.allocate(layout) {
match self.backend.allocate(layout) {
Ok(chunk) => {
self.data_segment
.loan_counter
.fetch_add(1, Ordering::Relaxed);
self.backend.loan_counter.fetch_add(1, Ordering::Relaxed);
Ok(chunk)
}
Err(ShmAllocationError::AllocationError(AllocationError::OutOfMemory)) => {
Expand All @@ -752,15 +751,15 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
}

fn sample_layout(&self, number_of_elements: usize) -> Layout {
self.data_segment
self.backend
.subscriber_connections
.static_config
.message_type_details
.sample_layout(number_of_elements)
}

fn user_header_ptr(&self, header: *const Header) -> *const u8 {
self.data_segment
self.backend
.subscriber_connections
.static_config
.message_type_details
Expand All @@ -769,7 +768,7 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
}

fn payload_ptr(&self, header: *const Header) -> *const u8 {
self.data_segment
self.backend
.subscriber_connections
.static_config
.message_type_details
Expand All @@ -778,7 +777,7 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug>
}

fn payload_type_variant(&self) -> TypeVariant {
self.data_segment
self.backend
.subscriber_connections
.static_config
.message_type_details
Expand Down Expand Up @@ -859,13 +858,13 @@ impl<Service: service::Service, Payload: Debug + Sized, UserHeader: Debug>
let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader;
let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit<Payload>;

unsafe { header_ptr.write(Header::new(self.data_segment.port_id, 1)) };
unsafe { header_ptr.write(Header::new(self.backend.port_id, 1)) };

let sample =
unsafe { RawSampleMut::new_unchecked(header_ptr, user_header_ptr, payload_ptr) };
Ok(
SampleMutUninit::<Service, MaybeUninit<Payload>, UserHeader>::new(
&self.data_segment,
&self.backend,
sample,
chunk.offset,
),
Expand Down Expand Up @@ -1004,7 +1003,7 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
underlying_number_of_slice_elements: usize,
) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, PublisherLoanError>
{
let max_slice_len = self.data_segment.config.initial_max_slice_len;
let max_slice_len = self.backend.config.initial_max_slice_len;
if max_slice_len < slice_len {
fail!(from self, with PublisherLoanError::ExceedsMaxLoanSize,
"Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
Expand All @@ -1017,7 +1016,7 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>
let user_header_ptr = self.user_header_ptr(header_ptr) as *mut UserHeader;
let payload_ptr = self.payload_ptr(header_ptr) as *mut MaybeUninit<Payload>;

unsafe { header_ptr.write(Header::new(self.data_segment.port_id, slice_len as _)) };
unsafe { header_ptr.write(Header::new(self.backend.port_id, slice_len as _)) };

let sample = unsafe {
RawSampleMut::new_unchecked(
Expand All @@ -1029,7 +1028,7 @@ impl<Service: service::Service, Payload: Debug, UserHeader: Debug>

Ok(
SampleMutUninit::<Service, [MaybeUninit<Payload>], UserHeader>::new(
&self.data_segment,
&self.backend,
sample,
chunk.offset,
),
Expand Down Expand Up @@ -1069,7 +1068,7 @@ impl<Service: service::Service, Payload: Debug + ?Sized, UserHeader: Debug> Upda
for Publisher<Service, Payload, UserHeader>
{
fn update_connections(&self) -> Result<(), ConnectionFailure> {
self.data_segment.update_connections()
self.backend.update_connections()
}
}

Expand Down
Loading

0 comments on commit 5739a13

Please sign in to comment.