Skip to content

Commit

Permalink
[eclipse-iceoryx#162] Integrate IpcCapable trait into semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Mar 21, 2024
1 parent a47960d commit ea42d2a
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 119 deletions.
6 changes: 5 additions & 1 deletion iceoryx2-bb/posix/src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,8 @@ impl<'a> IpcConstructible<'a, BarrierHandle> for Barrier<'a> {
}
}

impl<'a> IpcCapable<'a, BarrierHandle> for Barrier<'a> {}
impl<'a> IpcCapable<'a, BarrierHandle> for Barrier<'a> {
fn is_interprocess_capable(&self) -> bool {
self.handle.is_inter_process_capable()
}
}
6 changes: 4 additions & 2 deletions iceoryx2-bb/posix/src/ipc_capable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,11 @@ pub trait Handle: Send + Sync {
fn state(&self) -> HandleState;
}

/// Every struct that implements this trait is inter-process capable without any restriction,
/// meaning there is no configuration/variation that is not inter-process capable.
/// Represents struct that can be configured for inter-process use.
pub trait IpcCapable<'a, T: Handle>: internal::IpcConstructible<'a, T> + Sized {
/// Returns true if the object is interprocess capable, otherwise false
fn is_interprocess_capable(&self) -> bool;

/// Creates an IPC Capable object from its handle.
///
/// # Safety
Expand Down
152 changes: 66 additions & 86 deletions iceoryx2-bb/posix/src/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
//! Provides the [`NamedSemaphore`] and the [`UnnamedSemaphore`]. Both can be used in an
//! inter-process context to signal events between processes.
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::{cell::UnsafeCell, fmt::Debug};
pub use crate::ipc_capable::{Handle, IpcCapable};

pub use crate::unmovable_ipc_handle::IpcCapable;
use crate::unmovable_ipc_handle::{internal::*, IpcHandleState};
use std::cell::UnsafeCell;
use std::fmt::Debug;

use crate::ipc_capable::internal::{Capability, HandleStorage, IpcConstructible};
use crate::ipc_capable::HandleState;
use iceoryx2_bb_container::semantic_string::*;
use iceoryx2_bb_elementary::enum_gen;
use iceoryx2_bb_log::{debug, fail, fatal_panic};
use iceoryx2_bb_log::{debug, fail, fatal_panic, warn};
use iceoryx2_bb_system_types::file_name::FileName;
use iceoryx2_bb_system_types::file_path::*;
use iceoryx2_bb_system_types::path::*;
Expand Down Expand Up @@ -604,101 +606,94 @@ impl UnnamedSemaphoreBuilder {
self
}

/// Creates an [`UnnamedSemaphore`].
pub fn create(
self,
handle: &UnnamedSemaphoreHandle,
) -> Result<UnnamedSemaphore, UnnamedSemaphoreCreationError> {
fn initialize_semaphore(
&self,
sem: *mut posix::sem_t,
) -> Result<Capability, UnnamedSemaphoreCreationError> {
let msg = "Unable to create semaphore";

if handle
.reference_counter
.compare_exchange(
IpcHandleState::Uninitialized as _,
IpcHandleState::PerformingInitialization as _,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_err()
{
fail!(from self, with UnnamedSemaphoreCreationError::HandleAlreadyInitialized,
"{} since the handle is already initialized with another semaphore.", msg);
}

handle
.is_interprocess_capable
.store(self.is_interprocess_capable, Ordering::Relaxed);

unsafe { *handle.clock_type.get() = self.clock_type };

if self.initial_value > MAX_INITIAL_SEMAPHORE_VALUE {
handle.reference_counter.store(-1, Ordering::Relaxed);
fail!(from self, with UnnamedSemaphoreCreationError::InitialValueTooLarge,
"{} since the initial value {} is too large.", msg, self.initial_value);
}

if unsafe {
posix::sem_init(
handle.as_ptr(),
sem,
if self.is_interprocess_capable { 1 } else { 0 },
self.initial_value,
)
} != -1
} == -1
{
handle_errno!(UnnamedSemaphoreCreationError, from self,
Errno::EINVAL => (InitialValueTooLarge, "{} since the initial value {} is too large. Please verify posix configuration!", msg, self.initial_value),
Errno::ENOSPC => (ExceedsMaximumNumberOfSemaphores, "{} since it exceeds the maximum amount of semaphores {}.", msg, Limit::MaxNumberOfSemaphores.value()),
Errno::EPERM => (InsufficientPermissions, "{} due to insufficient permissions.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
);
}

match self.is_interprocess_capable {
true => Ok(Capability::InterProcess),
false => Ok(Capability::ProcessLocal),
}
}

/// Creates an [`UnnamedSemaphore`].
pub fn create(
self,
handle: &UnnamedSemaphoreHandle,
) -> Result<UnnamedSemaphore, UnnamedSemaphoreCreationError> {
unsafe {
handle
.reference_counter
.store(IpcHandleState::Initialized as _, Ordering::Release);
return Ok(UnnamedSemaphore::new(handle));
.handle
.initialize(|sem| self.initialize_semaphore(sem))?;
}

handle_errno!(UnnamedSemaphoreCreationError, from self,
Errno::EINVAL => (InitialValueTooLarge, "{} since the initial value {} is too large. Please verify posix configuration!", msg, self.initial_value),
Errno::ENOSPC => (ExceedsMaximumNumberOfSemaphores, "{} since it exceeds the maximum amount of semaphores {}.", msg, Limit::MaxNumberOfSemaphores.value()),
Errno::EPERM => (InsufficientPermissions, "{} due to insufficient permissions.", msg),
v => (UnknownError(v as i32), "{} since an unknown error occurred ({}).", msg, v)
);
unsafe { *handle.clock_type.get() = self.clock_type };

Ok(UnnamedSemaphore::new(handle))
}
}

#[derive(Debug)]
pub struct UnnamedSemaphoreHandle {
handle: UnsafeCell<posix::sem_t>,
handle: HandleStorage<posix::sem_t>,
clock_type: UnsafeCell<ClockType>,
is_interprocess_capable: AtomicBool,
reference_counter: AtomicI64,
}

unsafe impl Send for UnnamedSemaphoreHandle {}
unsafe impl Sync for UnnamedSemaphoreHandle {}

impl crate::unmovable_ipc_handle::internal::UnmovableIpcHandle for UnnamedSemaphoreHandle {
fn reference_counter(&self) -> &AtomicI64 {
&self.reference_counter
}

fn is_interprocess_capable(&self) -> bool {
self.is_interprocess_capable.load(Ordering::Relaxed)
}
}

impl Default for UnnamedSemaphoreHandle {
fn default() -> Self {
impl Handle for UnnamedSemaphoreHandle {
fn new() -> Self {
Self {
handle: UnsafeCell::new(posix::sem_t::new()),
handle: HandleStorage::new(posix::sem_t::new()),
clock_type: UnsafeCell::new(ClockType::default()),
is_interprocess_capable: AtomicBool::new(false),
reference_counter: AtomicI64::new(IpcHandleState::Uninitialized as _),
}
}
}

impl UnnamedSemaphoreHandle {
pub fn new() -> Self {
Self::default()
fn is_inter_process_capable(&self) -> bool {
self.handle.is_inter_process_capable()
}

fn as_ptr(&self) -> *mut posix::sem_t {
self.handle.get()
fn state(&self) -> HandleState {
self.handle.state()
}
}

impl Drop for UnnamedSemaphoreHandle {
fn drop(&mut self) {
if self.handle.state() == HandleState::Initialized {
unsafe {
self.handle.cleanup(|sem| {
if posix::sem_destroy(sem) != 0 {
warn!(from self,
"Unable to destroy unnamed semaphore. Was it already destroyed by another instance in another process?");
}
});
};
}
}
}

Expand Down Expand Up @@ -739,36 +734,21 @@ pub struct UnnamedSemaphore<'a> {
unsafe impl Send for UnnamedSemaphore<'_> {}
unsafe impl Sync for UnnamedSemaphore<'_> {}

impl Drop for UnnamedSemaphore<'_> {
fn drop(&mut self) {
if self.handle.reference_counter.fetch_sub(1, Ordering::AcqRel) == 1 {
if unsafe { posix::sem_destroy(self.handle.as_ptr()) } != 0 {
fatal_panic!(from self, "This should never happen! Unable to destroy semaphore since the file-descriptor was invalid.");
}

self.handle.reference_counter.store(-1, Ordering::Release);
}
}
}

impl<'a> CreateIpcConstruct<'a, UnnamedSemaphoreHandle> for UnnamedSemaphore<'a> {
impl<'a> IpcConstructible<'a, UnnamedSemaphoreHandle> for UnnamedSemaphore<'a> {
fn new(handle: &'a UnnamedSemaphoreHandle) -> Self {
Self { handle }
}
}

impl<'a> IpcCapable<'a, UnnamedSemaphoreHandle> for UnnamedSemaphore<'a> {}

impl<'a> UnnamedSemaphore<'a> {
/// Returns true if the semaphore is interprocess capable, otherwise false
pub fn is_interprocess_capable(&self) -> bool {
self.handle.is_interprocess_capable.load(Ordering::Relaxed)
impl<'a> IpcCapable<'a, UnnamedSemaphoreHandle> for UnnamedSemaphore<'a> {
fn is_interprocess_capable(&self) -> bool {
self.handle.is_inter_process_capable()
}
}

impl internal::SemaphoreHandle for UnnamedSemaphore<'_> {
fn handle(&self) -> *mut posix::sem_t {
self.handle.as_ptr()
unsafe { self.handle.handle.get() }
}

fn get_clock_type(&self) -> ClockType {
Expand Down
35 changes: 5 additions & 30 deletions iceoryx2-bb/posix/tests/semaphore_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ fn unnamed_semaphore_multiple_ipc_semaphores_are_working() {
.create(&handle)
.unwrap();

let sut2 = UnnamedSemaphore::from_ipc_handle(&handle).unwrap();
let sut2 = unsafe { UnnamedSemaphore::from_ipc_handle(&handle) };

assert_that!(sut1.post(), is_ok);
assert_that!(sut2.try_wait().unwrap(), eq true);
Expand All @@ -418,46 +418,21 @@ fn unnamed_semaphore_multiple_ipc_semaphores_are_working() {
}

#[test]
#[should_panic]
fn unnamed_semaphore_acquire_uninitialized_ipc_handle_failes() {
let handle = UnnamedSemaphoreHandle::new();

let sut = UnnamedSemaphore::from_ipc_handle(&handle);
assert_that!(sut, is_err);
assert_that!(*sut.as_ref().err().unwrap(), eq AcquireIpcHandleError::Uninitialized);

let sut1 = UnnamedSemaphoreBuilder::new()
.is_interprocess_capable(true)
.create(&handle)
.unwrap();

let sut2 = UnnamedSemaphore::from_ipc_handle(&handle);
assert_that!(sut2, is_ok);

drop(sut1);

let sut3 = UnnamedSemaphore::from_ipc_handle(&handle);
assert_that!(sut3, is_ok);

drop(sut2);
drop(sut3);

let sut = UnnamedSemaphore::from_ipc_handle(&handle);
assert_that!(sut, is_err);
assert_that!(*sut.as_ref().err().unwrap(), eq AcquireIpcHandleError::Uninitialized);
unsafe { UnnamedSemaphore::from_ipc_handle(&handle) };
}

#[test]
#[should_panic]
fn unnamed_semaphore_acquiring_non_ipc_capable_handle_fails() {
let handle = UnnamedSemaphoreHandle::new();
let _sut1 = UnnamedSemaphoreBuilder::new()
.is_interprocess_capable(false)
.create(&handle)
.unwrap();

let sut = UnnamedSemaphore::from_ipc_handle(&handle);
assert_that!(sut, is_err);
assert_that!(
*sut.as_ref().err().unwrap(), eq
AcquireIpcHandleError::IsNotInterProcessCapable
);
unsafe { UnnamedSemaphore::from_ipc_handle(&handle) };
}
1 change: 1 addition & 0 deletions iceoryx2-bb/threadsafe/tests/trigger_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::time::Duration;

use iceoryx2_bb_posix::clock::{nanosleep, Time};
use iceoryx2_bb_posix::mutex::MutexHandle;
use iceoryx2_bb_posix::semaphore::*;
use iceoryx2_bb_testing::assert_that;
use iceoryx2_bb_testing::watchdog::Watchdog;
use iceoryx2_bb_threadsafe::trigger_queue::*;
Expand Down

0 comments on commit ea42d2a

Please sign in to comment.