diff --git a/newsfragments/5435.added.md b/newsfragments/5435.added.md new file mode 100644 index 00000000000..b86073a5525 --- /dev/null +++ b/newsfragments/5435.added.md @@ -0,0 +1 @@ +Added the `pyo3::sync::RwLockExt` trait to allow detaching from the Python interpreter while blocking to acquire a rwlock. The trait is implemented for `std::sync::RwLock` as well as `parking_lot`'s / `lock_api`'s `RwLock`. \ No newline at end of file diff --git a/src/sync.rs b/src/sync.rs index 8557d83cd30..a3c527aa62f 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -559,6 +559,15 @@ mod once_lock_ext_sealed { impl Sealed for std::sync::OnceLock {} } +mod rwlock_ext_sealed { + pub trait Sealed {} + impl Sealed for std::sync::RwLock {} + #[cfg(feature = "lock_api")] + impl Sealed for lock_api::RwLock {} + #[cfg(feature = "arc_lock")] + impl Sealed for std::sync::Arc> {} +} + /// Extension trait for [`Once`] to help avoid deadlocking when using a [`Once`] when attached to a /// Python thread. pub trait OnceExt: Sealed { @@ -609,6 +618,40 @@ pub trait MutexExt: Sealed { fn lock_py_attached(&self, py: Python<'_>) -> Self::LockResult<'_>; } +/// Extension trait for [`std::sync::RwLock`] which helps avoid deadlocks between +/// the Python interpreter and acquiring the `RwLock`. +pub trait RwLockExt: rwlock_ext_sealed::Sealed { + /// The result type returned by the `read_py_attached` method. + type ReadLockResult<'a> + where + Self: 'a; + + /// The result type returned by the `write_py_attached` method. + type WriteLockResult<'a> + where + Self: 'a; + + /// Lock this `RwLock` for reading in a manner that cannot deadlock with + /// the Python interpreter. + /// + /// Before attempting to lock the rwlock, this function detaches from the + /// Python runtime. When the lock is acquired, it re-attaches to the Python + /// runtime before returning the `ReadLockResult`. This avoids deadlocks between + /// the GIL and other global synchronization events triggered by the Python + /// interpreter. + fn read_py_attached(&self, py: Python<'_>) -> Self::ReadLockResult<'_>; + + /// Lock this `RwLock` for writing in a manner that cannot deadlock with + /// the Python interpreter. + /// + /// Before attempting to lock the rwlock, this function detaches from the + /// Python runtime. When the lock is acquired, it re-attaches to the Python + /// runtime before returning the `WriteLockResult`. This avoids deadlocks between + /// the GIL and other global synchronization events triggered by the Python + /// interpreter. + fn write_py_attached(&self, py: Python<'_>) -> Self::WriteLockResult<'_>; +} + impl OnceExt for Once { type OnceState = OnceState; @@ -793,6 +836,137 @@ where } } +impl RwLockExt for std::sync::RwLock { + type ReadLockResult<'a> + = std::sync::LockResult> + where + Self: 'a; + + type WriteLockResult<'a> + = std::sync::LockResult> + where + Self: 'a; + + fn read_py_attached(&self, _py: Python<'_>) -> Self::ReadLockResult<'_> { + // If try_read is successful or returns a poisoned rwlock, return them so + // the caller can deal with them. Otherwise we need to use blocking + // read lock, which requires detaching from the Python runtime to avoid + // possible deadlocks. + match self.try_read() { + Ok(inner) => return Ok(inner), + Err(std::sync::TryLockError::Poisoned(inner)) => { + return std::sync::LockResult::Err(inner) + } + Err(std::sync::TryLockError::WouldBlock) => {} + } + + // SAFETY: detach from the runtime right before a possibly blocking call + // then reattach when the blocking call completes and before calling + // into the C API. + let ts_guard = unsafe { SuspendAttach::new() }; + + let res = self.read(); + drop(ts_guard); + res + } + + fn write_py_attached(&self, _py: Python<'_>) -> Self::WriteLockResult<'_> { + // If try_write is successful or returns a poisoned rwlock, return them so + // the caller can deal with them. Otherwise we need to use blocking + // write lock, which requires detaching from the Python runtime to avoid + // possible deadlocks. + match self.try_write() { + Ok(inner) => return Ok(inner), + Err(std::sync::TryLockError::Poisoned(inner)) => { + return std::sync::LockResult::Err(inner) + } + Err(std::sync::TryLockError::WouldBlock) => {} + } + + // SAFETY: detach from the runtime right before a possibly blocking call + // then reattach when the blocking call completes and before calling + // into the C API. + let ts_guard = unsafe { SuspendAttach::new() }; + + let res = self.write(); + drop(ts_guard); + res + } +} + +#[cfg(feature = "lock_api")] +impl RwLockExt for lock_api::RwLock { + type ReadLockResult<'a> + = lock_api::RwLockReadGuard<'a, R, T> + where + Self: 'a; + + type WriteLockResult<'a> + = lock_api::RwLockWriteGuard<'a, R, T> + where + Self: 'a; + + fn read_py_attached(&self, _py: Python<'_>) -> Self::ReadLockResult<'_> { + if let Some(guard) = self.try_read() { + return guard; + } + + let ts_guard = unsafe { SuspendAttach::new() }; + let res = self.read(); + drop(ts_guard); + res + } + + fn write_py_attached(&self, _py: Python<'_>) -> Self::WriteLockResult<'_> { + if let Some(guard) = self.try_write() { + return guard; + } + + let ts_guard = unsafe { SuspendAttach::new() }; + let res = self.write(); + drop(ts_guard); + res + } +} + +#[cfg(feature = "arc_lock")] +impl RwLockExt for std::sync::Arc> +where + R: lock_api::RawRwLock, +{ + type ReadLockResult<'a> + = lock_api::ArcRwLockReadGuard + where + Self: 'a; + + type WriteLockResult<'a> + = lock_api::ArcRwLockWriteGuard + where + Self: 'a; + + fn read_py_attached(&self, _py: Python<'_>) -> Self::ReadLockResult<'_> { + if let Some(guard) = self.try_read_arc() { + return guard; + } + + let ts_guard = unsafe { SuspendAttach::new() }; + let res = self.read_arc(); + drop(ts_guard); + res + } + + fn write_py_attached(&self, _py: Python<'_>) -> Self::WriteLockResult<'_> { + if let Some(guard) = self.try_write_arc() { + return guard; + } + + let ts_guard = unsafe { SuspendAttach::new() }; + let res = self.write_arc(); + drop(ts_guard); + res + } +} + #[cold] fn init_once_py_attached(once: &Once, _py: Python<'_>, f: F) where @@ -1295,4 +1469,237 @@ mod tests { }); assert!(*guard == 42); } + + #[cfg(feature = "macros")] + #[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled + #[test] + fn test_rwlock_ext_writer_blocks_reader() { + use std::sync::RwLock; + + let barrier = Barrier::new(2); + + let rwlock = Python::attach(|py| -> RwLock> { + RwLock::new(Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap()) + }); + + std::thread::scope(|s| { + s.spawn(|| { + Python::attach(|py| { + let b = rwlock.write_py_attached(py).unwrap(); + barrier.wait(); + // sleep to ensure the other thread actually blocks + std::thread::sleep(std::time::Duration::from_millis(10)); + (*b).bind(py).borrow().0.store(true, Ordering::Release); + drop(b); + }); + }); + s.spawn(|| { + barrier.wait(); + Python::attach(|py| { + // blocks until the other thread releases the lock + let b = rwlock.read_py_attached(py).unwrap(); + assert!((*b).bind(py).borrow().0.load(Ordering::Acquire)); + }); + }); + }); + } + + #[cfg(feature = "macros")] + #[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled + #[test] + fn test_rwlock_ext_reader_blocks_writer() { + use std::sync::RwLock; + + let barrier = Barrier::new(2); + + let rwlock = Python::attach(|py| -> RwLock> { + RwLock::new(Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap()) + }); + + std::thread::scope(|s| { + s.spawn(|| { + Python::attach(|py| { + let b = rwlock.read_py_attached(py).unwrap(); + barrier.wait(); + + // sleep to ensure the other thread actually blocks + std::thread::sleep(std::time::Duration::from_millis(10)); + + // The bool must still be false (i.e., the writer did not actually write the + // value yet). + assert!(!(*b).bind(py).borrow().0.load(Ordering::Acquire)); + }); + }); + s.spawn(|| { + barrier.wait(); + Python::attach(|py| { + // blocks until the other thread releases the lock + let b = rwlock.write_py_attached(py).unwrap(); + (*b).bind(py).borrow().0.store(true, Ordering::Release); + drop(b); + }); + }); + }); + + // Confirm that the writer did in fact run and write the expected `true` value. + Python::attach(|py| { + let b = rwlock.read_py_attached(py).unwrap(); + assert!((*b).bind(py).borrow().0.load(Ordering::Acquire)); + drop(b); + }); + } + + #[cfg(feature = "macros")] + #[cfg(all( + any(feature = "parking_lot", feature = "lock_api"), + not(target_arch = "wasm32") // We are building wasm Python with pthreads disabled + ))] + #[test] + fn test_parking_lot_rwlock_ext_writer_blocks_reader() { + macro_rules! test_rwlock { + ($write_guard:ty, $read_guard:ty, $rwlock:stmt) => {{ + let barrier = Barrier::new(2); + + let rwlock = Python::attach({ $rwlock }); + + std::thread::scope(|s| { + s.spawn(|| { + Python::attach(|py| { + let b: $write_guard = rwlock.write_py_attached(py); + barrier.wait(); + // sleep to ensure the other thread actually blocks + std::thread::sleep(std::time::Duration::from_millis(10)); + (*b).bind(py).borrow().0.store(true, Ordering::Release); + drop(b); + }); + }); + s.spawn(|| { + barrier.wait(); + Python::attach(|py| { + // blocks until the other thread releases the lock + let b: $read_guard = rwlock.read_py_attached(py); + assert!((*b).bind(py).borrow().0.load(Ordering::Acquire)); + }); + }); + }); + }}; + } + + test_rwlock!( + parking_lot::RwLockWriteGuard<'_, _>, + parking_lot::RwLockReadGuard<'_, _>, + |py| { + parking_lot::RwLock::new(Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap()) + } + ); + + #[cfg(feature = "arc_lock")] + test_rwlock!( + parking_lot::ArcRwLockWriteGuard<_, _>, + parking_lot::ArcRwLockReadGuard<_, _>, + |py| { + let rwlock = parking_lot::RwLock::new( + Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap(), + ); + std::sync::Arc::new(rwlock) + } + ); + } + + #[cfg(feature = "macros")] + #[cfg(all( + any(feature = "parking_lot", feature = "lock_api"), + not(target_arch = "wasm32") // We are building wasm Python with pthreads disabled + ))] + #[test] + fn test_parking_lot_rwlock_ext_reader_blocks_writer() { + macro_rules! test_rwlock { + ($write_guard:ty, $read_guard:ty, $rwlock:stmt) => {{ + let barrier = Barrier::new(2); + + let rwlock = Python::attach({ $rwlock }); + + std::thread::scope(|s| { + s.spawn(|| { + Python::attach(|py| { + let b: $read_guard = rwlock.read_py_attached(py); + barrier.wait(); + + // sleep to ensure the other thread actually blocks + std::thread::sleep(std::time::Duration::from_millis(10)); + + // The bool must still be false (i.e., the writer did not actually write the + // value yet). + assert!(!(*b).bind(py).borrow().0.load(Ordering::Acquire)); (*b).bind(py).borrow().0.store(true, Ordering::Release); + + drop(b); + }); + }); + s.spawn(|| { + barrier.wait(); + Python::attach(|py| { + // blocks until the other thread releases the lock + let b: $write_guard = rwlock.write_py_attached(py); + (*b).bind(py).borrow().0.store(true, Ordering::Release); + }); + }); + }); + + // Confirm that the writer did in fact run and write the expected `true` value. + Python::attach(|py| { + let b: $read_guard = rwlock.read_py_attached(py); + assert!((*b).bind(py).borrow().0.load(Ordering::Acquire)); + drop(b); + }); + }}; + } + + test_rwlock!( + parking_lot::RwLockWriteGuard<'_, _>, + parking_lot::RwLockReadGuard<'_, _>, + |py| { + parking_lot::RwLock::new(Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap()) + } + ); + + #[cfg(feature = "arc_lock")] + test_rwlock!( + parking_lot::ArcRwLockWriteGuard<_, _>, + parking_lot::ArcRwLockReadGuard<_, _>, + |py| { + let rwlock = parking_lot::RwLock::new( + Py::new(py, BoolWrapper(AtomicBool::new(false))).unwrap(), + ); + std::sync::Arc::new(rwlock) + } + ); + } + + #[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled + #[test] + fn test_rwlock_ext_poison() { + use std::sync::RwLock; + + let rwlock = RwLock::new(42); + + std::thread::scope(|s| { + let lock_result = s.spawn(|| { + Python::attach(|py| { + let _unused = rwlock.write_py_attached(py); + panic!(); + }); + }); + assert!(lock_result.join().is_err()); + assert!(rwlock.is_poisoned()); + Python::attach(|py| { + assert!(rwlock.read_py_attached(py).is_err()); + assert!(rwlock.write_py_attached(py).is_err()); + }); + }); + Python::attach(|py| { + // recover from the poisoning + let guard = rwlock.write_py_attached(py).unwrap_err().into_inner(); + assert!(*guard == 42); + }); + } }