diff --git a/vulkano/src/command_buffer/traits.rs b/vulkano/src/command_buffer/traits.rs index a7f87d0a76..b8ff619428 100644 --- a/vulkano/src/command_buffer/traits.rs +++ b/vulkano/src/command_buffer/traits.rs @@ -419,10 +419,7 @@ where #[inline] unsafe fn signal_finished(&self) { - if !self.finished.swap(true, Ordering::SeqCst) { - self.command_buffer.unlock(); - } - + self.finished.store(true, Ordering::SeqCst); self.previous.signal_finished(); } @@ -503,7 +500,6 @@ where self.flush().unwrap(); // Block until the queue finished. self.queue.lock().wait_idle().unwrap(); - self.command_buffer.unlock(); self.previous.signal_finished(); } } diff --git a/vulkano/src/device/queue.rs b/vulkano/src/device/queue.rs index 672d6ca4d5..15bb995a06 100644 --- a/vulkano/src/device/queue.rs +++ b/vulkano/src/device/queue.rs @@ -19,16 +19,17 @@ use crate::{ }, swapchain::{PresentInfo, SwapchainPresentInfo}, sync::{Fence, PipelineStage}, - OomError, RequirementNotMet, RequiresOneOf, SynchronizedVulkanObject, Version, VulkanError, - VulkanObject, + OomError, RequirementNotMet, RequiresOneOf, Version, VulkanError, VulkanObject, }; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use std::{ + collections::VecDeque, error::Error, ffi::CString, fmt::{Display, Error as FmtError, Formatter}, hash::{Hash, Hasher}, + mem::take, ptr, sync::Arc, }; @@ -37,10 +38,12 @@ use std::{ // TODO: should use internal synchronization? #[derive(Debug)] pub struct Queue { - handle: Mutex, + handle: ash::vk::Queue, device: Arc, queue_family_index: u32, id: u32, // id within family + + state: Mutex, } impl Queue { @@ -52,10 +55,11 @@ impl Queue { id: u32, ) -> Arc { Arc::new(Queue { - handle: Mutex::new(handle), + handle, device, queue_family_index, id, + state: Mutex::new(Default::default()), }) } @@ -82,17 +86,25 @@ impl Queue { pub fn lock(&self) -> QueueGuard { QueueGuard { queue: self, - handle: self.handle.lock(), + state: self.state.lock(), } } } -unsafe impl SynchronizedVulkanObject for Queue { +impl Drop for Queue { + #[inline] + fn drop(&mut self) { + let mut queue_guard = self.lock(); + queue_guard.wait_idle().unwrap(); + } +} + +unsafe impl VulkanObject for Queue { type Object = ash::vk::Queue; #[inline] - fn internal_object_guard(&self) -> MutexGuard { - self.handle.lock() + fn internal_object(&self) -> Self::Object { + self.handle } } @@ -123,11 +135,32 @@ impl Hash for Queue { pub struct QueueGuard<'a> { queue: &'a Queue, - handle: MutexGuard<'a, ash::vk::Queue>, + state: MutexGuard<'a, QueueState>, } impl<'a> QueueGuard<'a> { - /// Waits until all work on this queue has finished. + /// Releases ownership of resources belonging to queue operations that have completed. + /// + /// This is implemented by checking for operations that have a signaled fence, and then + /// releasing the resources of that operation and all preceding ones. If you execute an + /// operation without a fence, it will not be cleaned up until you execute another operation + /// with a fence after it, as a fence is the only way for the CPU to know that the queue has + /// reached a certain point in its execution. + /// + /// It is highly recommended to call `cleanup_finished` from time to time, for example once + /// every frame. Otherwise, the queue will hold onto resources indefinitely (using up memory) + /// and resource locks will not be released, which may cause errors when submitting future + /// queue operations. + #[inline] + pub fn cleanup_finished(&mut self) { + self.state.cleanup_finished(); + } + + /// Waits until all work on this queue has finished, then releases ownership of all resources + /// that were in use by the queue. + /// + /// This is equivalent to submitting a fence to the queue, waiting on it, and then calling + /// `cleanup_finished`. /// /// Just like [`Device::wait_idle`], you shouldn't have to call this function in a typical /// program. @@ -135,9 +168,16 @@ impl<'a> QueueGuard<'a> { pub fn wait_idle(&mut self) -> Result<(), OomError> { unsafe { let fns = self.queue.device.fns(); - (fns.v1_0.queue_wait_idle)(*self.handle) + (fns.v1_0.queue_wait_idle)(self.queue.handle) .result() .map_err(VulkanError::from)?; + + // Since we now know that the queue is finished with all work, + // we can safely release all resources. + for (operation, _) in take(&mut self.state.operations) { + operation.unlock(); + } + Ok(()) } } @@ -148,7 +188,7 @@ impl<'a> QueueGuard<'a> { bind_infos: impl IntoIterator, fence: Option>, ) -> Result<(), VulkanError> { - let bind_infos = bind_infos.into_iter(); + let bind_infos: SmallVec<[_; 4]> = bind_infos.into_iter().collect(); #[allow(unused)] struct PerBindSparseInfo { @@ -163,6 +203,7 @@ impl<'a> QueueGuard<'a> { } let (mut bind_infos_vk, mut per_bind_vk): (SmallVec<[_; 4]>, SmallVec<[_; 4]>) = bind_infos + .iter() .map(|bind_info| { let &BindSparseInfo { ref wait_semaphores, @@ -171,7 +212,7 @@ impl<'a> QueueGuard<'a> { ref image_binds, ref signal_semaphores, _ne: _, - } = &bind_info; + } = bind_info; let wait_semaphores_vk: SmallVec<[_; 4]> = wait_semaphores .iter() @@ -400,14 +441,18 @@ impl<'a> QueueGuard<'a> { let fns = self.queue.device.fns(); (fns.v1_0.queue_bind_sparse)( - *self.handle, + self.queue.handle, bind_infos_vk.len() as u32, bind_infos_vk.as_ptr(), - fence.map_or_else(Default::default, |fence| fence.internal_object()), + fence + .as_ref() + .map_or_else(Default::default, |fence| fence.internal_object()), ) .result() .map_err(VulkanError::from)?; + self.state.operations.push_back((bind_infos.into(), fence)); + Ok(()) } @@ -511,7 +556,11 @@ impl<'a> QueueGuard<'a> { } let fns = self.queue.device().fns(); - let _ = (fns.khr_swapchain.queue_present_khr)(*self.handle, &info_vk); + let _ = (fns.khr_swapchain.queue_present_khr)(self.queue.handle, &info_vk); + + // Some presents may succeed and some may fail. Since we don't know what the implementation + // is going to do, we act as if they all succeeded. + self.state.operations.push_back((present_info.into(), None)); results .into_iter() @@ -521,10 +570,10 @@ impl<'a> QueueGuard<'a> { #[cfg_attr(not(feature = "document_unchecked"), doc(hidden))] pub(crate) unsafe fn submit_unchecked( &mut self, - submits: impl IntoIterator, + submit_infos: impl IntoIterator, fence: Option>, ) -> Result<(), VulkanError> { - let submits = submits.into_iter(); + let submit_infos: SmallVec<[_; 4]> = submit_infos.into_iter().collect(); if self.queue.device.enabled_features().synchronization2 { struct PerSubmitInfo { @@ -533,81 +582,83 @@ impl<'a> QueueGuard<'a> { signal_semaphore_infos_vk: SmallVec<[ash::vk::SemaphoreSubmitInfo; 4]>, } - let (mut submit_info_vk, per_submit_vk): (SmallVec<[_; 4]>, SmallVec<[_; 4]>) = submits - .map(|submit_info| { - let SubmitInfo { - wait_semaphores, - command_buffers, - signal_semaphores, - _ne: _, - } = submit_info; - - let wait_semaphore_infos_vk = wait_semaphores - .into_iter() - .map(|semaphore_submit_info| { - let SemaphoreSubmitInfo { - semaphore, - stages, - _ne: _, - } = semaphore_submit_info; - - ash::vk::SemaphoreSubmitInfo { - semaphore: semaphore.internal_object(), - value: 0, // TODO: - stage_mask: stages.into(), - device_index: 0, // TODO: + let (mut submit_info_vk, per_submit_vk): (SmallVec<[_; 4]>, SmallVec<[_; 4]>) = + submit_infos + .iter() + .map(|submit_info| { + let &SubmitInfo { + ref wait_semaphores, + ref command_buffers, + ref signal_semaphores, + _ne: _, + } = submit_info; + + let wait_semaphore_infos_vk = wait_semaphores + .iter() + .map(|semaphore_submit_info| { + let &SemaphoreSubmitInfo { + ref semaphore, + stages, + _ne: _, + } = semaphore_submit_info; + + ash::vk::SemaphoreSubmitInfo { + semaphore: semaphore.internal_object(), + value: 0, // TODO: + stage_mask: stages.into(), + device_index: 0, // TODO: + ..Default::default() + } + }) + .collect(); + + let command_buffer_infos_vk = command_buffers + .iter() + .map(|cb| ash::vk::CommandBufferSubmitInfo { + command_buffer: cb.inner().internal_object(), + device_mask: 0, // TODO: ..Default::default() - } - }) - .collect(); - - let command_buffer_infos_vk = command_buffers - .into_iter() - .map(|cb| ash::vk::CommandBufferSubmitInfo { - command_buffer: cb.inner().internal_object(), - device_mask: 0, // TODO: - ..Default::default() - }) - .collect(); - - let signal_semaphore_infos_vk = signal_semaphores - .into_iter() - .map(|semaphore_submit_info| { - let SemaphoreSubmitInfo { - semaphore, - stages, - _ne: _, - } = semaphore_submit_info; - - ash::vk::SemaphoreSubmitInfo { - semaphore: semaphore.internal_object(), - value: 0, // TODO: - stage_mask: stages.into(), - device_index: 0, // TODO: + }) + .collect(); + + let signal_semaphore_infos_vk = signal_semaphores + .iter() + .map(|semaphore_submit_info| { + let &SemaphoreSubmitInfo { + ref semaphore, + stages, + _ne: _, + } = semaphore_submit_info; + + ash::vk::SemaphoreSubmitInfo { + semaphore: semaphore.internal_object(), + value: 0, // TODO: + stage_mask: stages.into(), + device_index: 0, // TODO: + ..Default::default() + } + }) + .collect(); + + ( + ash::vk::SubmitInfo2 { + flags: ash::vk::SubmitFlags::empty(), // TODO: + wait_semaphore_info_count: 0, + p_wait_semaphore_infos: ptr::null(), + command_buffer_info_count: 0, + p_command_buffer_infos: ptr::null(), + signal_semaphore_info_count: 0, + p_signal_semaphore_infos: ptr::null(), ..Default::default() - } - }) - .collect(); - - ( - ash::vk::SubmitInfo2 { - flags: ash::vk::SubmitFlags::empty(), // TODO: - wait_semaphore_info_count: 0, - p_wait_semaphore_infos: ptr::null(), - command_buffer_info_count: 0, - p_command_buffer_infos: ptr::null(), - signal_semaphore_info_count: 0, - p_signal_semaphore_infos: ptr::null(), - ..Default::default() - }, - PerSubmitInfo { - wait_semaphore_infos_vk, - command_buffer_infos_vk, - signal_semaphore_infos_vk, - }, - ) - }) - .unzip(); + }, + PerSubmitInfo { + wait_semaphore_infos_vk, + command_buffer_infos_vk, + signal_semaphore_infos_vk, + }, + ) + }) + .unzip(); for ( submit_info_vk, @@ -633,18 +684,22 @@ impl<'a> QueueGuard<'a> { if self.queue.device.api_version() >= Version::V1_3 { (fns.v1_3.queue_submit2)( - *self.handle, + self.queue.handle, submit_info_vk.len() as u32, submit_info_vk.as_ptr(), - fence.map_or_else(Default::default, |fence| fence.internal_object()), + fence + .as_ref() + .map_or_else(Default::default, |fence| fence.internal_object()), ) } else { debug_assert!(self.queue.device.enabled_extensions().khr_synchronization2); (fns.khr_synchronization2.queue_submit2_khr)( - *self.handle, + self.queue.handle, submit_info_vk.len() as u32, submit_info_vk.as_ptr(), - fence.map_or_else(Default::default, |fence| fence.internal_object()), + fence + .as_ref() + .map_or_else(Default::default, |fence| fence.internal_object()), ) } .result() @@ -657,66 +712,68 @@ impl<'a> QueueGuard<'a> { signal_semaphores_vk: SmallVec<[ash::vk::Semaphore; 4]>, } - let (mut submit_info_vk, per_submit_vk): (SmallVec<[_; 4]>, SmallVec<[_; 4]>) = submits - .map(|submit_info| { - let SubmitInfo { - wait_semaphores, - command_buffers, - signal_semaphores, - _ne: _, - } = submit_info; - - let (wait_semaphores_vk, wait_dst_stage_mask_vk) = wait_semaphores - .into_iter() - .map(|semaphore_submit_info| { - let SemaphoreSubmitInfo { - semaphore, - stages, - _ne: _, - } = semaphore_submit_info; - - (semaphore.internal_object(), stages.into()) - }) - .unzip(); + let (mut submit_info_vk, per_submit_vk): (SmallVec<[_; 4]>, SmallVec<[_; 4]>) = + submit_infos + .iter() + .map(|submit_info| { + let &SubmitInfo { + ref wait_semaphores, + ref command_buffers, + ref signal_semaphores, + _ne: _, + } = submit_info; + + let (wait_semaphores_vk, wait_dst_stage_mask_vk) = wait_semaphores + .iter() + .map(|semaphore_submit_info| { + let &SemaphoreSubmitInfo { + ref semaphore, + stages, + _ne: _, + } = semaphore_submit_info; + + (semaphore.internal_object(), stages.into()) + }) + .unzip(); + + let command_buffers_vk = command_buffers + .iter() + .map(|cb| cb.inner().internal_object()) + .collect(); + + let signal_semaphores_vk = signal_semaphores + .iter() + .map(|semaphore_submit_info| { + let &SemaphoreSubmitInfo { + ref semaphore, + stages: _, + _ne: _, + } = semaphore_submit_info; + + semaphore.internal_object() + }) + .collect(); - let command_buffers_vk = command_buffers - .into_iter() - .map(|cb| cb.inner().internal_object()) - .collect(); - - let signal_semaphores_vk = signal_semaphores - .into_iter() - .map(|semaphore_submit_info| { - let SemaphoreSubmitInfo { - semaphore, - stages: _, - _ne: _, - } = semaphore_submit_info; - - semaphore.internal_object() - }) - .collect(); - - ( - ash::vk::SubmitInfo { - wait_semaphore_count: 0, - p_wait_semaphores: ptr::null(), - p_wait_dst_stage_mask: ptr::null(), - command_buffer_count: 0, - p_command_buffers: ptr::null(), - signal_semaphore_count: 0, - p_signal_semaphores: ptr::null(), - ..Default::default() - }, - PerSubmitInfo { - wait_semaphores_vk, - wait_dst_stage_mask_vk, - command_buffers_vk, - signal_semaphores_vk, - }, - ) - }) - .unzip(); + ( + ash::vk::SubmitInfo { + wait_semaphore_count: 0, + p_wait_semaphores: ptr::null(), + p_wait_dst_stage_mask: ptr::null(), + command_buffer_count: 0, + p_command_buffers: ptr::null(), + signal_semaphore_count: 0, + p_signal_semaphores: ptr::null(), + ..Default::default() + }, + PerSubmitInfo { + wait_semaphores_vk, + wait_dst_stage_mask_vk, + command_buffers_vk, + signal_semaphores_vk, + }, + ) + }) + .unzip(); for ( submit_info_vk, @@ -742,15 +799,21 @@ impl<'a> QueueGuard<'a> { let fns = self.queue.device.fns(); (fns.v1_0.queue_submit)( - *self.handle, + self.queue.handle, submit_info_vk.len() as u32, submit_info_vk.as_ptr(), - fence.map_or_else(Default::default, |fence| fence.internal_object()), + fence + .as_ref() + .map_or_else(Default::default, |fence| fence.internal_object()), ) .result() .map_err(VulkanError::from)?; } + self.state + .operations + .push_back((submit_infos.into(), fence)); + Ok(()) } @@ -810,7 +873,7 @@ impl<'a> QueueGuard<'a> { }; let fns = self.queue.device.instance().fns(); - (fns.ext_debug_utils.queue_begin_debug_utils_label_ext)(*self.handle, &label_info); + (fns.ext_debug_utils.queue_begin_debug_utils_label_ext)(self.queue.handle, &label_info); } /// Closes a queue debug label region. @@ -856,7 +919,7 @@ impl<'a> QueueGuard<'a> { #[cfg_attr(not(feature = "document_unchecked"), doc(hidden))] pub unsafe fn end_debug_utils_label_unchecked(&mut self) { let fns = self.queue.device.instance().fns(); - (fns.ext_debug_utils.queue_end_debug_utils_label_ext)(*self.handle); + (fns.ext_debug_utils.queue_end_debug_utils_label_ext)(self.queue.handle); } /// Inserts a queue debug label. @@ -915,7 +978,7 @@ impl<'a> QueueGuard<'a> { }; let fns = self.queue.device.instance().fns(); - (fns.ext_debug_utils.queue_insert_debug_utils_label_ext)(*self.handle, &label_info); + (fns.ext_debug_utils.queue_insert_debug_utils_label_ext)(self.queue.handle, &label_info); } } @@ -1053,3 +1116,81 @@ impl From for QueueError { } } } + +#[derive(Debug, Default)] +struct QueueState { + operations: VecDeque<(QueueOperation, Option>)>, +} + +impl QueueState { + fn cleanup_finished(&mut self) { + // Find the most recent operation that has a signaled fence. + let last_signaled_fence_index = + self.operations + .iter() + .enumerate() + .rev() + .find_map(|(index, (_, fence))| { + fence + .as_ref() + // If `is_signaled` returns an error, treat it as not signaled. + .map_or(false, |fence| fence.is_signaled().unwrap_or(false)) + .then_some(index) + }); + + if let Some(index) = last_signaled_fence_index { + // Remove all operations up to this index, and perform cleanup if needed. + for (operation, _) in self.operations.drain(..index + 1) { + unsafe { + operation.unlock(); + } + } + } + } +} + +#[derive(Debug)] +enum QueueOperation { + BindSparse(SmallVec<[BindSparseInfo; 4]>), + Present(PresentInfo), + Submit(SmallVec<[SubmitInfo; 4]>), +} + +impl QueueOperation { + unsafe fn unlock(self) { + match self { + QueueOperation::BindSparse(_bind_infos) => { + // TODO: Do we need to unlock buffers and images here? + } + QueueOperation::Present(_) => (), + QueueOperation::Submit(submit_infos) => { + for submit_info in submit_infos { + for command_buffer in submit_info.command_buffers { + command_buffer.unlock(); + } + } + } + } + } +} + +impl From> for QueueOperation { + #[inline] + fn from(val: SmallVec<[BindSparseInfo; 4]>) -> Self { + Self::BindSparse(val) + } +} + +impl From for QueueOperation { + #[inline] + fn from(val: PresentInfo) -> Self { + Self::Present(val) + } +} + +impl From> for QueueOperation { + #[inline] + fn from(val: SmallVec<[SubmitInfo; 4]>) -> Self { + Self::Submit(val) + } +} diff --git a/vulkano/src/lib.rs b/vulkano/src/lib.rs index 0fb9674cbe..7997094832 100644 --- a/vulkano/src/lib.rs +++ b/vulkano/src/lib.rs @@ -84,7 +84,6 @@ pub use ash::vk::Handle; pub use half; pub use library::{LoadingError, VulkanLibrary}; -use parking_lot::MutexGuard; use std::{ error::Error, fmt::{Display, Error as FmtError, Formatter}, @@ -139,16 +138,6 @@ pub unsafe trait VulkanObject { fn internal_object(&self) -> Self::Object; } -/// Gives access to the internal identifier of an object. -// TODO: remove ; crappy design -pub unsafe trait SynchronizedVulkanObject { - /// The type of the object. - type Object: ash::vk::Handle; - - /// Returns a reference to the object. - fn internal_object_guard(&self) -> MutexGuard; -} - /// Error type returned by most Vulkan functions. #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum OomError { diff --git a/vulkano/src/sync/future/fence_signal.rs b/vulkano/src/sync/future/fence_signal.rs index f9f1249972..8ee1666a5f 100644 --- a/vulkano/src/sync/future/fence_signal.rs +++ b/vulkano/src/sync/future/fence_signal.rs @@ -153,6 +153,11 @@ where unsafe { previous.signal_finished(); } + + if let Some(queue) = previous.queue() { + queue.lock().cleanup_finished(); + } + Ok(()) } FenceSignalFutureState::Cleaned => Ok(()), @@ -174,27 +179,28 @@ where match *state { FenceSignalFutureState::Flushed(ref mut prev, ref fence) => { match fence.wait(Some(Duration::from_secs(0))) { - Ok(()) => unsafe { prev.signal_finished() }, + Ok(()) => { + unsafe { prev.signal_finished() } + + if let Some(queue) = prev.queue() { + queue.lock().cleanup_finished(); + } + + *state = FenceSignalFutureState::Cleaned; + } Err(_) => { prev.cleanup_finished(); - return; } } } FenceSignalFutureState::Pending(ref mut prev, _) => { prev.cleanup_finished(); - return; } FenceSignalFutureState::PartiallyFlushed(ref mut prev, _) => { prev.cleanup_finished(); - return; } - _ => return, - }; - - // This code can only be reached if we're already flushed and waiting on the fence - // succeeded. - *state = FenceSignalFutureState::Cleaned; + _ => (), + } } // Implementation of `flush`. You must lock the state and pass the mutex guard here. @@ -443,6 +449,10 @@ where fence.wait(None).unwrap(); unsafe { previous.signal_finished(); + + if let Some(queue) = previous.queue() { + queue.lock().cleanup_finished(); + } } } FenceSignalFutureState::Cleaned => {