diff --git a/iceoryx2-cal/src/zero_copy_connection/mod.rs b/iceoryx2-cal/src/zero_copy_connection/mod.rs index f865b604e..0dd355129 100644 --- a/iceoryx2-cal/src/zero_copy_connection/mod.rs +++ b/iceoryx2-cal/src/zero_copy_connection/mod.rs @@ -25,6 +25,8 @@ pub use iceoryx2_bb_system_types::path::Path; pub enum ZeroCopyCreationError { InternalError, AnotherInstanceIsAlreadyConnected, + InsufficientPermissions, + VersionMismatch, ConnectionMaybeCorrupted, InvalidSampleSize, IncompatibleBufferSize, diff --git a/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs b/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs index d9393813d..70f3c239e 100644 --- a/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs +++ b/iceoryx2-cal/src/zero_copy_connection/posix_shared_memory.rs @@ -12,10 +12,12 @@ use std::cell::UnsafeCell; use std::fmt::Debug; -use std::ptr::NonNull; use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; -use std::time::Duration; +use crate::dynamic_storage::{ + DynamicStorage, DynamicStorageBuilder, DynamicStorageCreateError, DynamicStorageOpenError, + DynamicStorageOpenOrCreateError, +}; use crate::named_concept::*; pub use crate::zero_copy_connection::*; use iceoryx2_bb_elementary::relocatable_container::RelocatableContainer; @@ -23,17 +25,12 @@ use iceoryx2_bb_lock_free::spsc::{ index_queue::RelocatableIndexQueue, safely_overflowing_index_queue::RelocatableSafelyOverflowingIndexQueue, }; -use iceoryx2_bb_log::{error, fail, fatal_panic}; -use iceoryx2_bb_memory::bump_allocator::BumpAllocator; +use iceoryx2_bb_log::{fail, fatal_panic}; use iceoryx2_bb_posix::adaptive_wait::AdaptiveWaitBuilder; -use iceoryx2_bb_posix::creation_mode::CreationMode; -use iceoryx2_bb_posix::permission::Permission; -use iceoryx2_bb_posix::shared_memory::{SharedMemory, SharedMemoryBuilder}; use self::used_chunk_list::RelocatableUsedChunkList; -const MAX_CREATION_DURATION: Duration = Duration::from_millis(10); -const IS_INITIALIZED_STATE_VALUE: u64 = 0xbeefaffedeadbeef; +type Storage = crate::dynamic_storage::posix_shared_memory::Storage; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub struct Configuration { @@ -42,6 +39,15 @@ pub struct Configuration { path_hint: Path, } +impl Configuration { + fn convert(&self) -> ::Configuration { + ::Configuration::default() + .prefix(self.prefix) + .suffix(self.suffix) + .path_hint(self.path_hint) + } +} + impl Default for Configuration { fn default() -> Self { Self { @@ -96,15 +102,8 @@ impl State { } } -fn cleanup_shared_memory( - origin: &T, - shared_memory: &SharedMemory, - state_to_remove: State, -) { - let mgmt_ref = - unsafe { &*(shared_memory.base_address().as_ptr() as *const SharedManagementData) }; - - let mut current_state = mgmt_ref.state.load(Ordering::Relaxed); +fn cleanup_shared_memory(storage: &Storage, state_to_remove: State) { + let mut current_state = storage.get().state.load(Ordering::Relaxed); loop { let new_state = if current_state == state_to_remove.value() { State::MarkedForDestruction.value() @@ -112,7 +111,7 @@ fn cleanup_shared_memory( current_state & !state_to_remove.value() }; - match mgmt_ref.state.compare_exchange( + match storage.get().state.compare_exchange( current_state, new_state, Ordering::Relaxed, @@ -128,13 +127,12 @@ fn cleanup_shared_memory( } } - if current_state == State::MarkedForDestruction.value() - && SharedMemory::remove(shared_memory.name()).is_err() - { - error!(from origin, "Failed to remove shared memory when cleaning up the connection."); + if current_state == State::MarkedForDestruction.value() { + storage.acquire_ownership() } } +#[derive(Debug)] #[repr(C)] struct SharedManagementData { submission_channel: RelocatableSafelyOverflowingIndexQueue, @@ -181,8 +179,7 @@ impl SharedManagementData { completion_channel_buffer_capacity: usize, number_of_samples: usize, ) -> usize { - std::mem::size_of::() + std::mem::align_of::() - 1 - + RelocatableIndexQueue::const_memory_size(completion_channel_buffer_capacity) + RelocatableIndexQueue::const_memory_size(completion_channel_buffer_capacity) + RelocatableSafelyOverflowingIndexQueue::const_memory_size( submission_channel_buffer_capacity, ) @@ -210,122 +207,104 @@ impl Builder { self.buffer_size + self.max_borrowed_samples + 1 } - fn create_or_open_shm(&self) -> Result { - let shm_size = SharedManagementData::const_memory_size( + fn create_or_open_shm(&self) -> Result { + let supplementary_size = SharedManagementData::const_memory_size( self.submission_channel_size(), self.completion_channel_size(), self.number_of_samples, ); + let dynamic_storage_config = self.config.convert(); + let msg = "Failed to acquire underlying shared memory"; - let full_name = self.config.path_for(&self.name).file_name(); - let shm = fail!(from self, when SharedMemoryBuilder::new(&full_name) - .creation_mode(CreationMode::OpenOrCreate) - .size(shm_size) - .permission(Permission::OWNER_ALL) - .create(), - with ZeroCopyCreationError::InternalError, - "{} since it could not be opened/created. This can be caused by incompatible builder settings.", msg); - - let mgmt_ptr = shm.base_address().as_ptr() as *mut SharedManagementData; - match shm.has_ownership() { - true => { - let msg = "Failed to set up newly created connection"; - unsafe { - mgmt_ptr.write(SharedManagementData::new( - self.submission_channel_size(), - self.completion_channel_size(), - self.enable_safe_overflow, - self.max_borrowed_samples, - self.sample_size, - self.number_of_samples, - )) - }; - - let supplementary_ptr = - (mgmt_ptr as usize + std::mem::size_of::()) as *mut u8; - let supplementary_len = shm_size - std::mem::size_of::(); - - let allocator = BumpAllocator::new( - unsafe { NonNull::new_unchecked(supplementary_ptr) }, - supplementary_len, - ); - - fatal_panic!(from self, when unsafe { (*mgmt_ptr).submission_channel.init(&allocator) }, - "{} since the receive channel allocation failed. - This is an implementation bug!", msg); - fatal_panic!(from self, when unsafe { (*mgmt_ptr).completion_channel.init(&allocator) }, - "{} since the retrieve channel allocation failed. - This is an implementation bug!", msg); - fatal_panic!(from self, when unsafe { (*mgmt_ptr).used_chunk_list.init(&allocator) }, - "{} since the used chunk list allocation failed. - This is an implementation bug!", msg); - - ////////////////////////////////////////// - // SYNC POINT: write SharedManagementData - ////////////////////////////////////////// - unsafe { - (*mgmt_ptr) - .init_state - .store(IS_INITIALIZED_STATE_VALUE, Ordering::Release) - }; - shm.release_ownership(); + let storage = <>::Builder<'_> as NamedConceptBuilder< + Storage, + >>::new(&self.name) + .config(&dynamic_storage_config) + .supplementary_size(supplementary_size) + .initializer(|data, allocator| { + fatal_panic!(from self, when unsafe { data.submission_channel.init(allocator) }, + "{} since the receive channel allocation failed. - This is an implementation bug!", msg); + fatal_panic!(from self, when unsafe { data.completion_channel.init(allocator) }, + "{} since the retrieve channel allocation failed. - This is an implementation bug!", msg); + fatal_panic!(from self, when unsafe { data.used_chunk_list.init(allocator) }, + "{} since the used chunk list allocation failed. - This is an implementation bug!", msg); + + true + }) + .open_or_create( + SharedManagementData::new( + self.submission_channel_size(), + self.completion_channel_size(), + self.enable_safe_overflow, + self.max_borrowed_samples, + self.sample_size, + self.number_of_samples, + ) + ); + + let storage = match storage { + Ok(storage) => storage, + Err(DynamicStorageOpenOrCreateError::DynamicStorageCreateError( + DynamicStorageCreateError::InsufficientPermissions, + )) => { + fail!(from self, with ZeroCopyCreationError::InsufficientPermissions, + "{} due to insufficient permissions to create underlying dynamic storage.", msg); } - false => { - let msg = "Failed to open existing connection"; - - let mut adaptive_wait = fail!(from self, when AdaptiveWaitBuilder::new().create(), - with ZeroCopyCreationError::InternalError, "{} since the adaptive wait could not be created.", msg); - - let mgmt_ref = unsafe { &mut *mgmt_ptr }; - - ////////////////////////////////////////// - // SYNC POINT: read SharedManagementData - ////////////////////////////////////////// - while mgmt_ref.init_state.load(Ordering::Acquire) != IS_INITIALIZED_STATE_VALUE { - if fail!(from self, when adaptive_wait.wait(), with ZeroCopyCreationError::InternalError, - "{} since a failure while waiting for creation finalization occurred.", msg) - < MAX_CREATION_DURATION - { - break; - } - } + Err(DynamicStorageOpenOrCreateError::DynamicStorageOpenError( + DynamicStorageOpenError::VersionMismatch, + )) => { + fail!(from self, with ZeroCopyCreationError::VersionMismatch, + "{} since the version of the connection does not match.", msg); + } + Err(e) => { + fail!(from self, with ZeroCopyCreationError::VersionMismatch, + "{} due to an internal failure ({:?}).", msg, e); + } + }; - if mgmt_ref.submission_channel.capacity() != self.submission_channel_size() { - fail!(from self, with ZeroCopyCreationError::IncompatibleBufferSize, + if storage.has_ownership() { + storage.release_ownership(); + } else { + let msg = "Failed to open existing connection"; + + if storage.get().submission_channel.capacity() != self.submission_channel_size() { + fail!(from self, with ZeroCopyCreationError::IncompatibleBufferSize, "{} since the connection has a buffer size of {} but a buffer size of {} is required.", - msg, mgmt_ref.submission_channel.capacity(), self.submission_channel_size()); - } + msg, storage.get().submission_channel.capacity(), self.submission_channel_size()); + } - if mgmt_ref.completion_channel.capacity() != self.completion_channel_size() { - fail!(from self, with ZeroCopyCreationError::IncompatibleMaxBorrowedSampleSetting, + if storage.get().completion_channel.capacity() != self.completion_channel_size() { + fail!(from self, with ZeroCopyCreationError::IncompatibleMaxBorrowedSampleSetting, "{} since the max borrowed sample setting is set to {} but a value of {} is required.", - msg, mgmt_ref.completion_channel.capacity() - mgmt_ref.submission_channel.capacity(), self.max_borrowed_samples); - } + msg, storage.get().completion_channel.capacity() - storage.get().submission_channel.capacity(), self.max_borrowed_samples); + } - if mgmt_ref.enable_safe_overflow != self.enable_safe_overflow { - fail!(from self, with ZeroCopyCreationError::IncompatibleOverflowSetting, + if storage.get().enable_safe_overflow != self.enable_safe_overflow { + fail!(from self, with ZeroCopyCreationError::IncompatibleOverflowSetting, "{} since the safe overflow is set to {} but should be set to {}.", - msg, mgmt_ref.enable_safe_overflow, self.enable_safe_overflow); - } + msg, storage.get().enable_safe_overflow, self.enable_safe_overflow); + } - if mgmt_ref.sample_size != self.sample_size { - fail!(from self, with ZeroCopyCreationError::IncompatibleSampleSize, + if storage.get().sample_size != self.sample_size { + fail!(from self, with ZeroCopyCreationError::IncompatibleSampleSize, "{} since the requested sample size is set to {} but should be set to {}.", - msg, self.sample_size, mgmt_ref.sample_size); - } + msg, self.sample_size, storage.get().sample_size); + } - if mgmt_ref.number_of_samples != self.number_of_samples { - fail!(from self, with ZeroCopyCreationError::IncompatibleNumberOfSamples, + if storage.get().number_of_samples != self.number_of_samples { + fail!(from self, with ZeroCopyCreationError::IncompatibleNumberOfSamples, "{} since the requested number of samples is set to {} but should be set to {}.", - msg, self.number_of_samples, mgmt_ref.number_of_samples); - } + msg, self.number_of_samples, storage.get().number_of_samples); } } - Ok(shm) + Ok(storage) } fn reserve_port( &self, - mgmt_ref: &mut SharedManagementData, + mgmt_ref: &SharedManagementData, new_state: u8, msg: &str, ) -> Result<(), ZeroCopyCreationError> { @@ -400,16 +379,14 @@ impl ZeroCopyConnectionBuilder for Builder { self.sample_size = sample_size; let msg = "Unable to create sender"; - let shm = fail!(from self, when self.create_or_open_shm(), + let storage = fail!(from self, when self.create_or_open_shm(), "{} since the corresponding connection could not be created or opened", msg); - let mgmt = unsafe { &mut *(shm.base_address().as_ptr() as *mut SharedManagementData) }; - self.reserve_port(mgmt, State::Sender.value(), msg)?; + self.reserve_port(storage.get(), State::Sender.value(), msg)?; Ok(Sender { - shared_memory: shm, + storage, name: self.name, - mgmt, }) } @@ -420,38 +397,28 @@ impl ZeroCopyConnectionBuilder for Builder { self.sample_size = sample_size; let msg = "Unable to create receiver"; - let shm = fail!(from self, when self.create_or_open_shm(), + let storage = fail!(from self, when self.create_or_open_shm(), "{} since the corresponding connection could not be created or opened", msg); - let mgmt = unsafe { &mut *(shm.base_address().as_ptr() as *mut SharedManagementData) }; - self.reserve_port(mgmt, State::Receiver.value(), msg)?; + self.reserve_port(storage.get(), State::Receiver.value(), msg)?; Ok(Receiver { - shared_memory: shm, + storage, borrow_counter: UnsafeCell::new(0), name: self.name, - mgmt, }) } } #[derive(Debug)] pub struct Sender { - shared_memory: SharedMemory, + storage: Storage, name: FileName, - mgmt: *const SharedManagementData, } impl Drop for Sender { fn drop(&mut self) { - cleanup_shared_memory(self, &self.shared_memory, State::Sender); - } -} - -impl Sender { - #[inline(always)] - fn mgmt(&self) -> &SharedManagementData { - unsafe { &*self.mgmt } + cleanup_shared_memory(&self.storage, State::Sender); } } @@ -463,19 +430,20 @@ impl NamedConcept for Sender { impl ZeroCopyPortDetails for Sender { fn buffer_size(&self) -> usize { - self.mgmt().submission_channel.capacity() + self.storage.get().submission_channel.capacity() } fn max_borrowed_samples(&self) -> usize { - self.mgmt().max_borrowed_samples + self.storage.get().max_borrowed_samples } fn has_enabled_safe_overflow(&self) -> bool { - self.mgmt().enable_safe_overflow + self.storage.get().enable_safe_overflow } fn is_connected(&self) -> bool { - self.mgmt().state.load(Ordering::Relaxed) == State::Sender.value() | State::Receiver.value() + self.storage.get().state.load(Ordering::Relaxed) + == State::Sender.value() | State::Receiver.value() } } @@ -483,26 +451,30 @@ impl ZeroCopySender for Sender { fn try_send(&self, ptr: PointerOffset) -> Result, ZeroCopySendError> { let msg = "Unable to send sample"; - if !self.mgmt().enable_safe_overflow && self.mgmt().submission_channel.is_full() { + if !self.storage.get().enable_safe_overflow + && self.storage.get().submission_channel.is_full() + { fail!(from self, with ZeroCopySendError::ReceiveBufferFull, "{} since the receive buffer is full.", msg); } if !self - .mgmt() + .storage + .get() .used_chunk_list - .insert(ptr.value() / self.mgmt().sample_size) + .insert(ptr.value() / self.storage.get().sample_size) { fail!(from self, with ZeroCopySendError::UsedChunkListFull, "{} since the used chunk list is full.", msg); } - match unsafe { self.mgmt().submission_channel.push(ptr.value()) } { + match unsafe { self.storage.get().submission_channel.push(ptr.value()) } { Some(v) => { if !self - .mgmt() + .storage + .get() .used_chunk_list - .remove(v / self.mgmt().sample_size) + .remove(v / self.storage.get().sample_size) { fail!(from self, with ZeroCopySendError::ConnectionCorrupted, "{} since an invalid offset was returned on overflow.", msg); @@ -518,11 +490,11 @@ impl ZeroCopySender for Sender { &self, ptr: PointerOffset, ) -> Result, ZeroCopySendError> { - if !self.mgmt().enable_safe_overflow { + if !self.storage.get().enable_safe_overflow { AdaptiveWaitBuilder::new() .create() .unwrap() - .wait_while(|| self.mgmt().submission_channel.is_full()) + .wait_while(|| self.storage.get().submission_channel.is_full()) .unwrap(); } @@ -530,13 +502,14 @@ impl ZeroCopySender for Sender { } fn reclaim(&self) -> Result, ZeroCopyReclaimError> { - match unsafe { self.mgmt().completion_channel.pop() } { + match unsafe { self.storage.get().completion_channel.pop() } { None => Ok(None), Some(v) => { if !self - .mgmt() + .storage + .get() .used_chunk_list - .remove(v / self.mgmt().sample_size) + .remove(v / self.storage.get().sample_size) { fail!(from self, with ZeroCopyReclaimError::ReceiverReturnedCorruptedOffset, "Unable to reclaim sample since the receiver returned the corrupted offset {}.", v); @@ -547,8 +520,9 @@ impl ZeroCopySender for Sender { } unsafe fn acquire_used_offsets(&self, mut callback: F) { - let sample_size = self.mgmt().sample_size; - self.mgmt() + let sample_size = self.storage.get().sample_size; + self.storage + .get() .used_chunk_list .remove_all(|index| callback(PointerOffset::new(index * sample_size))); } @@ -556,24 +530,18 @@ impl ZeroCopySender for Sender { #[derive(Debug)] pub struct Receiver { - shared_memory: SharedMemory, + storage: Storage, borrow_counter: UnsafeCell, name: FileName, - mgmt: *const SharedManagementData, } impl Drop for Receiver { fn drop(&mut self) { - cleanup_shared_memory(self, &self.shared_memory, State::Receiver); + cleanup_shared_memory(&self.storage, State::Receiver); } } impl Receiver { - #[inline(always)] - fn mgmt(&self) -> &SharedManagementData { - unsafe { &*self.mgmt } - } - #[allow(clippy::mut_from_ref)] // convenience to access internal mutable object fn borrow_counter(&self) -> &mut usize { @@ -592,31 +560,32 @@ impl NamedConcept for Receiver { impl ZeroCopyPortDetails for Receiver { fn buffer_size(&self) -> usize { - self.mgmt().submission_channel.capacity() + self.storage.get().submission_channel.capacity() } fn max_borrowed_samples(&self) -> usize { - self.mgmt().max_borrowed_samples + self.storage.get().max_borrowed_samples } fn has_enabled_safe_overflow(&self) -> bool { - self.mgmt().enable_safe_overflow + self.storage.get().enable_safe_overflow } fn is_connected(&self) -> bool { - self.mgmt().state.load(Ordering::Relaxed) == State::Sender.value() | State::Receiver.value() + self.storage.get().state.load(Ordering::Relaxed) + == State::Sender.value() | State::Receiver.value() } } impl ZeroCopyReceiver for Receiver { fn receive(&self) -> Result, ZeroCopyReceiveError> { - if *self.borrow_counter() >= self.mgmt().max_borrowed_samples { + if *self.borrow_counter() >= self.storage.get().max_borrowed_samples { fail!(from self, with ZeroCopyReceiveError::ReceiveWouldExceedMaxBorrowValue, "Unable to receive another sample since already {} samples were borrowed and this would exceed the max borrow value of {}.", self.borrow_counter(), self.max_borrowed_samples()); } - match unsafe { self.mgmt().submission_channel.pop() } { + match unsafe { self.storage.get().submission_channel.pop() } { None => Ok(None), Some(v) => { *self.borrow_counter() += 1; @@ -626,7 +595,7 @@ impl ZeroCopyReceiver for Receiver { } fn release(&self, ptr: PointerOffset) -> Result<(), ZeroCopyReleaseError> { - match unsafe { self.mgmt().completion_channel.push(ptr.value()) } { + match unsafe { self.storage.get().completion_channel.push(ptr.value()) } { true => { *self.borrow_counter() -= 1; Ok(()) @@ -649,45 +618,33 @@ impl NamedConceptMgmt for Connection { name: &FileName, cfg: &Self::Configuration, ) -> Result { - Ok(SharedMemory::does_exist(&cfg.path_for(name).file_name())) + Ok( + fail!(from "posix_shared_memory::ZeroCopyConnection::does_exist_cfg()", + when Storage::does_exist_cfg(name, &cfg.convert()), + "Failed to check if ZeroCopyConnection \"{}\" exists.", + name), + ) } fn list_cfg( - config: &Self::Configuration, + cfg: &Self::Configuration, ) -> Result, crate::static_storage::file::NamedConceptListError> { - let entries = SharedMemory::list(); - - let mut result = vec![]; - for entry in &entries { - if let Some(entry_name) = config.extract_name_from_file(entry) { - result.push(entry_name); - } - } - - Ok(result) + Ok( + fail!(from "posix_shared_memory::ZeroCopyConnection::list_cfg()", + when Storage::list_cfg(&cfg.convert()), + "Failed to list all ZeroCopyConnections."), + ) } unsafe fn remove_cfg( name: &FileName, cfg: &Self::Configuration, ) -> Result { - let full_name = cfg.path_for(name).file_name(); - let msg = "Unable to remove zero_copy_connection::posix_shared_memory"; - let origin = "zero_copy_connection::posix_shared_memory::Connection::remove_cfg()"; - - match iceoryx2_bb_posix::shared_memory::SharedMemory::remove(&full_name) { - Ok(v) => Ok(v), - Err( - iceoryx2_bb_posix::shared_memory::SharedMemoryRemoveError::InsufficientPermissions, - ) => { - fail!(from origin, with NamedConceptRemoveError::InsufficientPermissions, - "{} \"{}\" due to insufficient permissions.", msg, name); - } - Err(v) => { - fail!(from origin, with NamedConceptRemoveError::InternalError, - "{} \"{}\" due to an internal failure ({:?}).", msg, name, v); - } - } + Ok( + fail!(from "posix_shared_memory::ZeroCopyConnection::remove_cfg()", + when Storage::remove_cfg(name, &cfg.convert()), + "Failed to remove ZeroCopyConnection \"{}\".", name), + ) } }