Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make StandardCommandPool lockless #1939

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vulkano/src/command_buffer/auto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<L> AutoCommandBufferBuilder<L, StandardCommandPoolBuilder> {
}
}

let pool_builder_alloc = Device::standard_command_pool(&device, queue_family)
let pool_builder_alloc = Device::standard_command_pool(&device, queue_family)?
.allocate(level, 1)?
.next()
.expect("Requested one command buffer from the command pool, but got zero.");
Expand Down
209 changes: 73 additions & 136 deletions vulkano/src/command_buffer/pool/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,7 @@ use crate::{
OomError, VulkanObject,
};
use crossbeam_queue::SegQueue;
use std::{
collections::HashMap,
marker::PhantomData,
mem::ManuallyDrop,
ptr,
sync::{Arc, Mutex, Weak},
thread,
vec::IntoIter as VecIntoIter,
};
use std::{marker::PhantomData, mem::ManuallyDrop, ptr, sync::Arc, vec::IntoIter as VecIntoIter};

// Copyright (c) 2016 The vulkano developers
// Licensed under the Apache License, Version 2.0
Expand All @@ -30,31 +22,15 @@ use std::{

/// Standard implementation of a command pool.
///
/// It is guaranteed that the allocated command buffers keep the `Arc<StandardCommandPool>` alive.
/// This is desirable so that we can store a `Weak<StandardCommandPool>`.
///
/// Will use one Vulkan pool per thread in order to avoid locking. Will try to reuse command
/// buffers. Command buffers can't be moved between threads during the building process, but
/// finished command buffers can.
/// A thread can have as many `Arc<StandardCommandPool>`s as needed, but none of them can escape the
/// thread they were created on. This is done so that there are no locks involved when creating
/// command buffers. Command buffers can't be moved between threads during the building process, but
/// finished command buffers can. When a command buffer is dropped, it is returned back to the pool
/// for reuse.
#[derive(Debug)]
pub struct StandardCommandPool {
// The device.
device: Arc<Device>,

// Identifier of the queue family.
queue_family: u32,

// For each thread, we store thread-specific info.
per_thread: Mutex<HashMap<thread::ThreadId, Weak<StandardCommandPoolPerThread>>>,
}

unsafe impl Send for StandardCommandPool {}
unsafe impl Sync for StandardCommandPool {}

#[derive(Debug)]
struct StandardCommandPoolPerThread {
// The Vulkan pool of this thread.
pool: Mutex<UnsafeCommandPool>,
// The Vulkan pool specific to a device's queue family.
inner: UnsafeCommandPool,
// List of existing primary command buffers that are available for reuse.
available_primary_command_buffers: SegQueue<UnsafeCommandPoolAlloc>,
// List of existing secondary command buffers that are available for reuse.
Expand All @@ -64,21 +40,36 @@ struct StandardCommandPoolPerThread {
impl StandardCommandPool {
/// Builds a new pool.
///
/// # Panic
/// # Panics
///
/// - Panics if the device and the queue family don't belong to the same physical device.
///
pub fn new(device: Arc<Device>, queue_family: QueueFamily) -> StandardCommandPool {
pub fn new(
device: Arc<Device>,
queue_family: QueueFamily,
) -> Result<StandardCommandPool, OomError> {
assert_eq!(
device.physical_device().internal_object(),
queue_family.physical_device().internal_object()
);

StandardCommandPool {
device: device,
queue_family: queue_family.id(),
per_thread: Mutex::new(Default::default()),
}
let inner = UnsafeCommandPool::new(
device,
UnsafeCommandPoolCreateInfo {
queue_family_index: queue_family.id(),
reset_command_buffer: true,
..Default::default()
},
)
.map_err(|err| match err {
UnsafeCommandPoolCreationError::OomError(err) => err,
_ => panic!("Unexpected error: {}", err),
})?;

Ok(StandardCommandPool {
inner,
available_primary_command_buffers: Default::default(),
available_secondary_command_buffers: Default::default(),
})
}
}

Expand All @@ -92,84 +83,47 @@ unsafe impl CommandPool for Arc<StandardCommandPool> {
level: CommandBufferLevel,
mut command_buffer_count: u32,
) -> Result<Self::Iter, OomError> {
// Find the correct `StandardCommandPoolPerThread` structure.
let mut hashmap = self.per_thread.lock().unwrap();
// TODO: meh for iterating everything every time
hashmap.retain(|_, w| w.upgrade().is_some());

let this_thread = thread::current().id();

// Get an appropriate `Arc<StandardCommandPoolPerThread>`.
let per_thread = if let Some(entry) = hashmap.get(&this_thread).and_then(Weak::upgrade) {
entry
} else {
let new_pool = UnsafeCommandPool::new(
self.device.clone(),
UnsafeCommandPoolCreateInfo {
queue_family_index: self.queue_family().id(),
reset_command_buffer: true,
..Default::default()
},
)
.map_err(|err| match err {
UnsafeCommandPoolCreationError::OomError(err) => err,
_ => panic!("Unexpected error: {}", err),
})?;
let pt = Arc::new(StandardCommandPoolPerThread {
pool: Mutex::new(new_pool),
available_primary_command_buffers: SegQueue::new(),
available_secondary_command_buffers: SegQueue::new(),
});

hashmap.insert(this_thread, Arc::downgrade(&pt));
pt
};

// The final output.
let mut output = Vec::with_capacity(command_buffer_count as usize);

// First, pick from already-existing command buffers.
{
let existing = match level {
CommandBufferLevel::Primary => &per_thread.available_primary_command_buffers,
CommandBufferLevel::Secondary => &per_thread.available_secondary_command_buffers,
CommandBufferLevel::Primary => &self.available_primary_command_buffers,
CommandBufferLevel::Secondary => &self.available_secondary_command_buffers,
};

for _ in 0..command_buffer_count as usize {
if let Some(cmd) = existing.pop() {
output.push(StandardCommandPoolBuilder {
inner: StandardCommandPoolAlloc {
cmd: ManuallyDrop::new(cmd),
pool: per_thread.clone(),
pool_parent: self.clone(),
level,
device: self.device.clone(),
pool: self.clone(),
},
dummy_avoid_send_sync: PhantomData,
});
} else {
break;
}
}
};
}

// Then allocate the rest.
if output.len() < command_buffer_count as usize {
let pool_lock = per_thread.pool.lock().unwrap();
command_buffer_count -= output.len() as u32;

for cmd in pool_lock.allocate_command_buffers(CommandBufferAllocateInfo {
level,
command_buffer_count,
..Default::default()
})? {
for cmd in self
.inner
.allocate_command_buffers(CommandBufferAllocateInfo {
level,
command_buffer_count,
..Default::default()
})?
{
output.push(StandardCommandPoolBuilder {
inner: StandardCommandPoolAlloc {
cmd: ManuallyDrop::new(cmd),
pool: per_thread.clone(),
pool_parent: self.clone(),
level,
device: self.device.clone(),
pool: self.clone(),
},
dummy_avoid_send_sync: PhantomData,
});
Expand All @@ -182,21 +136,18 @@ unsafe impl CommandPool for Arc<StandardCommandPool> {

#[inline]
fn queue_family(&self) -> QueueFamily {
self.device
.physical_device()
.queue_family_by_id(self.queue_family)
.unwrap()
self.inner.queue_family()
}
}

unsafe impl DeviceOwned for StandardCommandPool {
#[inline]
fn device(&self) -> &Arc<Device> {
&self.device
self.inner.device()
}
}

/// Command buffer allocated from a `StandardCommandPool` and that is currently being built.
/// Command buffer allocated from a `StandardCommandPool` that is currently being built.
pub struct StandardCommandPoolBuilder {
// The only difference between a `StandardCommandPoolBuilder` and a `StandardCommandPoolAlloc`
// is that the former must not implement `Send` and `Sync`. Therefore we just share the structs.
Expand Down Expand Up @@ -236,13 +187,7 @@ pub struct StandardCommandPoolAlloc {
// The actual command buffer. Extracted in the `Drop` implementation.
cmd: ManuallyDrop<UnsafeCommandPoolAlloc>,
// We hold a reference to the command pool for our destructor.
pool: Arc<StandardCommandPoolPerThread>,
// Keep alive the `StandardCommandPool`, otherwise it would be destroyed.
pool_parent: Arc<StandardCommandPool>,
// Command buffer level.
level: CommandBufferLevel,
// The device we belong to. Necessary because of the `DeviceOwned` trait implementation.
device: Arc<Device>,
pool: Arc<StandardCommandPool>,
}

unsafe impl Send for StandardCommandPoolAlloc {}
Expand All @@ -256,21 +201,14 @@ unsafe impl CommandPoolAlloc for StandardCommandPoolAlloc {

#[inline]
fn queue_family(&self) -> QueueFamily {
let queue_family_id = self.pool.pool.lock().unwrap().queue_family().id();

self.device
.physical_device()
.queue_family_by_id(queue_family_id)
.unwrap()
self.pool.queue_family()
}
}

unsafe impl DeviceOwned for StandardCommandPoolAlloc {
#[inline]
fn device(&self) -> &Arc<Device> {
// Note that we could grab the device from `self.pool`. Unfortunately this requires a mutex
// lock, so it isn't compatible with the API of `DeviceOwned`.
&self.device
self.pool.device()
}
}

Expand All @@ -279,7 +217,7 @@ impl Drop for StandardCommandPoolAlloc {
// Safe because `self.cmd` is wrapped in a `ManuallyDrop`.
let cmd: UnsafeCommandPoolAlloc = unsafe { ptr::read(&*self.cmd) };

match self.level {
match cmd.level() {
CommandBufferLevel::Primary => self.pool.available_primary_command_buffers.push(cmd),
CommandBufferLevel::Secondary => {
self.pool.available_secondary_command_buffers.push(cmd)
Expand All @@ -290,26 +228,19 @@ impl Drop for StandardCommandPoolAlloc {

#[cfg(test)]
mod tests {
use crate::command_buffer::pool::CommandPool;
use crate::command_buffer::pool::CommandPoolBuilderAlloc;
use crate::command_buffer::pool::StandardCommandPool;
use std::sync::Arc;
use std::thread;

use crate::command_buffer::pool::{CommandPool, CommandPoolBuilderAlloc};
use crate::command_buffer::CommandBufferLevel;
use crate::device::Device;
use crate::VulkanObject;
use std::sync::Arc;

#[test]
fn reuse_command_buffers() {
let (device, _) = gfx_dev_and_queue!();
let queue_family = device.physical_device().queue_families().next().unwrap();
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue_family);
// Avoid the weak reference to StandardCommandPoolPerThread expiring.
let cb_hold_weakref = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
.next()
.unwrap();
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();

let cb = pool
.allocate(CommandBufferLevel::Primary, 1)
Expand All @@ -328,20 +259,26 @@ mod tests {
}

#[test]
fn pool_kept_alive_by_allocs() {
fn pool_kept_alive_by_thread() {
let (device, queue) = gfx_dev_and_queue!();

let pool = Arc::new(StandardCommandPool::new(device, queue.family()));
let pool_weak = Arc::downgrade(&pool);
let thread = thread::spawn({
let (device, queue) = (device.clone(), queue.clone());
move || {
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();

let cb = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
.next()
.unwrap();
drop(pool);
assert!(pool_weak.upgrade().is_some());
pool.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
.next()
.unwrap()
.inner
}
});

// The thread-local storage should drop its reference to the pool here
let cb = thread.join().unwrap();

let pool_weak = Arc::downgrade(&cb.pool);
drop(cb);
assert!(pool_weak.upgrade().is_none());
}
Expand Down
8 changes: 4 additions & 4 deletions vulkano/src/command_buffer/synced/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ mod tests {
unsafe {
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let pool_builder_alloc = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
Expand Down Expand Up @@ -599,7 +599,7 @@ mod tests {
})
.collect::<Vec<_>>();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let allocs = pool
.allocate(CommandBufferLevel::Primary, 2)
.unwrap()
Expand Down Expand Up @@ -659,7 +659,7 @@ mod tests {
unsafe {
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let pool_builder_alloc = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
Expand Down Expand Up @@ -690,7 +690,7 @@ mod tests {
unsafe {
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let pool_builder_alloc = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
Expand Down
Loading