Skip to content

Commit

Permalink
Make StandardCommandPool lockless (#1939)
Browse files Browse the repository at this point in the history
  • Loading branch information
marc0246 authored Aug 1, 2022
1 parent c0c9c21 commit 91dc544
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 163 deletions.
2 changes: 1 addition & 1 deletion vulkano/src/command_buffer/auto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<L> AutoCommandBufferBuilder<L, StandardCommandPoolBuilder> {
}
}

let pool_builder_alloc = Device::standard_command_pool(&device, queue_family)
let pool_builder_alloc = Device::standard_command_pool(&device, queue_family)?
.allocate(level, 1)?
.next()
.expect("Requested one command buffer from the command pool, but got zero.");
Expand Down
209 changes: 73 additions & 136 deletions vulkano/src/command_buffer/pool/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,7 @@ use crate::{
OomError, VulkanObject,
};
use crossbeam_queue::SegQueue;
use std::{
collections::HashMap,
marker::PhantomData,
mem::ManuallyDrop,
ptr,
sync::{Arc, Mutex, Weak},
thread,
vec::IntoIter as VecIntoIter,
};
use std::{marker::PhantomData, mem::ManuallyDrop, ptr, sync::Arc, vec::IntoIter as VecIntoIter};

// Copyright (c) 2016 The vulkano developers
// Licensed under the Apache License, Version 2.0
Expand All @@ -30,31 +22,15 @@ use std::{

/// Standard implementation of a command pool.
///
/// It is guaranteed that the allocated command buffers keep the `Arc<StandardCommandPool>` alive.
/// This is desirable so that we can store a `Weak<StandardCommandPool>`.
///
/// Will use one Vulkan pool per thread in order to avoid locking. Will try to reuse command
/// buffers. Command buffers can't be moved between threads during the building process, but
/// finished command buffers can.
/// A thread can have as many `Arc<StandardCommandPool>`s as needed, but none of them can escape the
/// thread they were created on. This is done so that there are no locks involved when creating
/// command buffers. Command buffers can't be moved between threads during the building process, but
/// finished command buffers can. When a command buffer is dropped, it is returned back to the pool
/// for reuse.
#[derive(Debug)]
pub struct StandardCommandPool {
// The device.
device: Arc<Device>,

// Identifier of the queue family.
queue_family: u32,

// For each thread, we store thread-specific info.
per_thread: Mutex<HashMap<thread::ThreadId, Weak<StandardCommandPoolPerThread>>>,
}

unsafe impl Send for StandardCommandPool {}
unsafe impl Sync for StandardCommandPool {}

#[derive(Debug)]
struct StandardCommandPoolPerThread {
// The Vulkan pool of this thread.
pool: Mutex<UnsafeCommandPool>,
// The Vulkan pool specific to a device's queue family.
inner: UnsafeCommandPool,
// List of existing primary command buffers that are available for reuse.
available_primary_command_buffers: SegQueue<UnsafeCommandPoolAlloc>,
// List of existing secondary command buffers that are available for reuse.
Expand All @@ -64,21 +40,36 @@ struct StandardCommandPoolPerThread {
impl StandardCommandPool {
/// Builds a new pool.
///
/// # Panic
/// # Panics
///
/// - Panics if the device and the queue family don't belong to the same physical device.
///
pub fn new(device: Arc<Device>, queue_family: QueueFamily) -> StandardCommandPool {
pub fn new(
device: Arc<Device>,
queue_family: QueueFamily,
) -> Result<StandardCommandPool, OomError> {
assert_eq!(
device.physical_device().internal_object(),
queue_family.physical_device().internal_object()
);

StandardCommandPool {
device: device,
queue_family: queue_family.id(),
per_thread: Mutex::new(Default::default()),
}
let inner = UnsafeCommandPool::new(
device,
UnsafeCommandPoolCreateInfo {
queue_family_index: queue_family.id(),
reset_command_buffer: true,
..Default::default()
},
)
.map_err(|err| match err {
UnsafeCommandPoolCreationError::OomError(err) => err,
_ => panic!("Unexpected error: {}", err),
})?;

Ok(StandardCommandPool {
inner,
available_primary_command_buffers: Default::default(),
available_secondary_command_buffers: Default::default(),
})
}
}

Expand All @@ -92,84 +83,47 @@ unsafe impl CommandPool for Arc<StandardCommandPool> {
level: CommandBufferLevel,
mut command_buffer_count: u32,
) -> Result<Self::Iter, OomError> {
// Find the correct `StandardCommandPoolPerThread` structure.
let mut hashmap = self.per_thread.lock().unwrap();
// TODO: meh for iterating everything every time
hashmap.retain(|_, w| w.upgrade().is_some());

let this_thread = thread::current().id();

// Get an appropriate `Arc<StandardCommandPoolPerThread>`.
let per_thread = if let Some(entry) = hashmap.get(&this_thread).and_then(Weak::upgrade) {
entry
} else {
let new_pool = UnsafeCommandPool::new(
self.device.clone(),
UnsafeCommandPoolCreateInfo {
queue_family_index: self.queue_family().id(),
reset_command_buffer: true,
..Default::default()
},
)
.map_err(|err| match err {
UnsafeCommandPoolCreationError::OomError(err) => err,
_ => panic!("Unexpected error: {}", err),
})?;
let pt = Arc::new(StandardCommandPoolPerThread {
pool: Mutex::new(new_pool),
available_primary_command_buffers: SegQueue::new(),
available_secondary_command_buffers: SegQueue::new(),
});

hashmap.insert(this_thread, Arc::downgrade(&pt));
pt
};

// The final output.
let mut output = Vec::with_capacity(command_buffer_count as usize);

// First, pick from already-existing command buffers.
{
let existing = match level {
CommandBufferLevel::Primary => &per_thread.available_primary_command_buffers,
CommandBufferLevel::Secondary => &per_thread.available_secondary_command_buffers,
CommandBufferLevel::Primary => &self.available_primary_command_buffers,
CommandBufferLevel::Secondary => &self.available_secondary_command_buffers,
};

for _ in 0..command_buffer_count as usize {
if let Some(cmd) = existing.pop() {
output.push(StandardCommandPoolBuilder {
inner: StandardCommandPoolAlloc {
cmd: ManuallyDrop::new(cmd),
pool: per_thread.clone(),
pool_parent: self.clone(),
level,
device: self.device.clone(),
pool: self.clone(),
},
dummy_avoid_send_sync: PhantomData,
});
} else {
break;
}
}
};
}

// Then allocate the rest.
if output.len() < command_buffer_count as usize {
let pool_lock = per_thread.pool.lock().unwrap();
command_buffer_count -= output.len() as u32;

for cmd in pool_lock.allocate_command_buffers(CommandBufferAllocateInfo {
level,
command_buffer_count,
..Default::default()
})? {
for cmd in self
.inner
.allocate_command_buffers(CommandBufferAllocateInfo {
level,
command_buffer_count,
..Default::default()
})?
{
output.push(StandardCommandPoolBuilder {
inner: StandardCommandPoolAlloc {
cmd: ManuallyDrop::new(cmd),
pool: per_thread.clone(),
pool_parent: self.clone(),
level,
device: self.device.clone(),
pool: self.clone(),
},
dummy_avoid_send_sync: PhantomData,
});
Expand All @@ -182,21 +136,18 @@ unsafe impl CommandPool for Arc<StandardCommandPool> {

#[inline]
fn queue_family(&self) -> QueueFamily {
self.device
.physical_device()
.queue_family_by_id(self.queue_family)
.unwrap()
self.inner.queue_family()
}
}

unsafe impl DeviceOwned for StandardCommandPool {
#[inline]
fn device(&self) -> &Arc<Device> {
&self.device
self.inner.device()
}
}

/// Command buffer allocated from a `StandardCommandPool` and that is currently being built.
/// Command buffer allocated from a `StandardCommandPool` that is currently being built.
pub struct StandardCommandPoolBuilder {
// The only difference between a `StandardCommandPoolBuilder` and a `StandardCommandPoolAlloc`
// is that the former must not implement `Send` and `Sync`. Therefore we just share the structs.
Expand Down Expand Up @@ -236,13 +187,7 @@ pub struct StandardCommandPoolAlloc {
// The actual command buffer. Extracted in the `Drop` implementation.
cmd: ManuallyDrop<UnsafeCommandPoolAlloc>,
// We hold a reference to the command pool for our destructor.
pool: Arc<StandardCommandPoolPerThread>,
// Keep alive the `StandardCommandPool`, otherwise it would be destroyed.
pool_parent: Arc<StandardCommandPool>,
// Command buffer level.
level: CommandBufferLevel,
// The device we belong to. Necessary because of the `DeviceOwned` trait implementation.
device: Arc<Device>,
pool: Arc<StandardCommandPool>,
}

unsafe impl Send for StandardCommandPoolAlloc {}
Expand All @@ -256,21 +201,14 @@ unsafe impl CommandPoolAlloc for StandardCommandPoolAlloc {

#[inline]
fn queue_family(&self) -> QueueFamily {
let queue_family_id = self.pool.pool.lock().unwrap().queue_family().id();

self.device
.physical_device()
.queue_family_by_id(queue_family_id)
.unwrap()
self.pool.queue_family()
}
}

unsafe impl DeviceOwned for StandardCommandPoolAlloc {
#[inline]
fn device(&self) -> &Arc<Device> {
// Note that we could grab the device from `self.pool`. Unfortunately this requires a mutex
// lock, so it isn't compatible with the API of `DeviceOwned`.
&self.device
self.pool.device()
}
}

Expand All @@ -279,7 +217,7 @@ impl Drop for StandardCommandPoolAlloc {
// Safe because `self.cmd` is wrapped in a `ManuallyDrop`.
let cmd: UnsafeCommandPoolAlloc = unsafe { ptr::read(&*self.cmd) };

match self.level {
match cmd.level() {
CommandBufferLevel::Primary => self.pool.available_primary_command_buffers.push(cmd),
CommandBufferLevel::Secondary => {
self.pool.available_secondary_command_buffers.push(cmd)
Expand All @@ -290,26 +228,19 @@ impl Drop for StandardCommandPoolAlloc {

#[cfg(test)]
mod tests {
use crate::command_buffer::pool::CommandPool;
use crate::command_buffer::pool::CommandPoolBuilderAlloc;
use crate::command_buffer::pool::StandardCommandPool;
use std::sync::Arc;
use std::thread;

use crate::command_buffer::pool::{CommandPool, CommandPoolBuilderAlloc};
use crate::command_buffer::CommandBufferLevel;
use crate::device::Device;
use crate::VulkanObject;
use std::sync::Arc;

#[test]
fn reuse_command_buffers() {
let (device, _) = gfx_dev_and_queue!();
let queue_family = device.physical_device().queue_families().next().unwrap();
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue_family);
// Avoid the weak reference to StandardCommandPoolPerThread expiring.
let cb_hold_weakref = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
.next()
.unwrap();
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();

let cb = pool
.allocate(CommandBufferLevel::Primary, 1)
Expand All @@ -328,20 +259,26 @@ mod tests {
}

#[test]
fn pool_kept_alive_by_allocs() {
fn pool_kept_alive_by_thread() {
let (device, queue) = gfx_dev_and_queue!();

let pool = Arc::new(StandardCommandPool::new(device, queue.family()));
let pool_weak = Arc::downgrade(&pool);
let thread = thread::spawn({
let (device, queue) = (device.clone(), queue.clone());
move || {
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();

let cb = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
.next()
.unwrap();
drop(pool);
assert!(pool_weak.upgrade().is_some());
pool.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
.next()
.unwrap()
.inner
}
});

// The thread-local storage should drop its reference to the pool here
let cb = thread.join().unwrap();

let pool_weak = Arc::downgrade(&cb.pool);
drop(cb);
assert!(pool_weak.upgrade().is_none());
}
Expand Down
8 changes: 4 additions & 4 deletions vulkano/src/command_buffer/synced/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ mod tests {
unsafe {
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let pool_builder_alloc = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
Expand Down Expand Up @@ -599,7 +599,7 @@ mod tests {
})
.collect::<Vec<_>>();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let allocs = pool
.allocate(CommandBufferLevel::Primary, 2)
.unwrap()
Expand Down Expand Up @@ -659,7 +659,7 @@ mod tests {
unsafe {
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let pool_builder_alloc = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
Expand Down Expand Up @@ -690,7 +690,7 @@ mod tests {
unsafe {
let (device, queue) = gfx_dev_and_queue!();

let pool = Device::standard_command_pool(&device, queue.family());
let pool = Device::standard_command_pool(&device, queue.family()).unwrap();
let pool_builder_alloc = pool
.allocate(CommandBufferLevel::Primary, 1)
.unwrap()
Expand Down
Loading

0 comments on commit 91dc544

Please sign in to comment.