Skip to content

Commit

Permalink
Merge pull request openzfs#554 from delphix/projects/merge-upstream/m…
Browse files Browse the repository at this point in the history
…aster

Merge remote-tracking branch '6.0/stage' into 'master'
  • Loading branch information
delphix-devops-bot authored Aug 4, 2022
2 parents a4c482b + 23f75ef commit afcce54
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 73 deletions.
4 changes: 2 additions & 2 deletions cmd/zfs_object_agent/util/src/vec_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ impl AlignedVec {
}

pub fn extend_from_value(&mut self, len: usize, value: u8) {
// Can't allow vec capacity to change, as that could violate the alignment constraint.
// We can't allow the vec to be reallocated, as that could violate the alignment constraint.
assert_le!(len, self.vec.capacity() - self.vec.len());
self.vec.resize(self.vec.len() + len, value);
self.verify();
}

pub fn extend_from_slice(&mut self, slice: &[u8]) {
// We can't allow the vec to be resized, as that could violate the alignment constraint.
// We can't allow the vec to be reallocated, as that could violate the alignment constraint.
assert_le!(slice.len(), self.vec.capacity() - self.vec.len());
self.vec.extend_from_slice(slice);
self.verify();
Expand Down
6 changes: 5 additions & 1 deletion cmd/zfs_object_agent/zettacache/src/aggregating_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use util::zettacache_stats::DiskIoType;
use util::AlignedVec;

use crate::base_types::DiskLocation;
use crate::base_types::Extent;
use crate::block_access::BlockAccess;

#[derive(Debug)]
Expand Down Expand Up @@ -73,7 +74,10 @@ impl AggregatingWriter {
tokio::spawn(async move {
block_access
.write_raw(
pending.location,
Extent {
location: pending.location,
size: pending.vec.len() as u64,
},
pending.vec.into(),
DiskIoType::MaintenanceWrite,
)
Expand Down
85 changes: 57 additions & 28 deletions cmd/zfs_object_agent/zettacache/src/block_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use derivative::Derivative;
use futures::Future;
use libc::c_void;
use log::*;
use more_asserts::*;
use nix::errno::Errno;
use nix::sys::stat::SFlag;
use num_traits::Num;
Expand Down Expand Up @@ -68,6 +69,7 @@ tunable! {
static ref DISK_WRITE_MAX_QUEUE_DEPTH: usize = 32;
static ref DISK_METADATA_WRITE_MAX_QUEUE_DEPTH: usize = 16;
pub static ref DISK_READ_MAX_QUEUE_DEPTH: usize = 64;
static ref DISK_SECTOR_SIZE_OVERRIDE: Option<ByteSize> = None;
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -210,6 +212,7 @@ struct ReadMessage {

struct WriteMessage {
offset: u64,
size: usize,
bytes: AlignedBytes,
tx: oneshot::Sender<()>,
}
Expand Down Expand Up @@ -439,18 +442,17 @@ impl Disk {
mut iter: I,
) -> (Vec<u64>, usize) {
let (mut run, mut len) = if let Some((&offset, message)) = iter.next() {
(vec![offset], message.bytes.len())
(vec![offset], message.size)
} else {
return (Vec::new(), 0);
};
for (&offset, message) in iter {
if len > 0 && len + message.bytes.len() > DISK_WRITE_MAX_AGGREGATION_SIZE.as_usize()
{
if len > 0 && len + message.size > DISK_WRITE_MAX_AGGREGATION_SIZE.as_usize() {
break;
}
if offset == run[0] + len as u64 {
run.push(offset);
len += message.bytes.len();
len += message.size;
} else {
break;
}
Expand Down Expand Up @@ -482,11 +484,15 @@ impl Disk {
let message = sorted.remove(&run[0]).unwrap();
let mut bytes = message.bytes;
// Directio requires the pointer to be sector-aligned
if bytes.alignment() % sector_size != 0 {
let padding = message.size - bytes.len();
if bytes.alignment() % sector_size != 0 || padding != 0 {
// We need to copy AlignedBytes created from a plain Bytes (e.g. ingesting
// from a cache miss where we read the object and then write its blocks to
// the zettacache).
bytes = AlignedBytes::copy_from_slice(&bytes, sector_size);
let mut vec = AlignedVec::with_capacity(bytes.len() + padding, sector_size);
vec.extend_from_slice(&bytes);
vec.extend_from_value(padding, 0);
bytes = vec.into();
};
assert_eq!(bytes.alignment() % sector_size, 0);
assert_eq!(bytes.as_ptr() as usize % sector_size, 0);
Expand Down Expand Up @@ -516,6 +522,7 @@ impl Disk {
for offset in run {
let message = sorted.remove(&offset).unwrap();
aggregate.extend_from_slice(&message.bytes);
aggregate.extend_from_value(message.size - message.bytes.len(), 0);
txs.push(message.tx);
}
let op = OpInProgress::new(stat_values);
Expand Down Expand Up @@ -555,14 +562,21 @@ impl Disk {
fn write(
&self,
offset: u64,
size: usize,
bytes: AlignedBytes,
io_type: DiskIoType,
) -> impl Future<Output = ()> {
self.verify_aligned(offset);
self.verify_aligned(bytes.len());
self.verify_aligned(size);
assert_le!(bytes.len(), size);

let (tx, rx) = oneshot::channel();
let message = WriteMessage { offset, bytes, tx };
let message = WriteMessage {
offset,
size,
bytes,
tx,
};

let txs = match io_type {
DiskIoType::WriteDataForInsert => &self.writer_txs,
Expand Down Expand Up @@ -766,28 +780,29 @@ impl BlockAccess {
fut.await
}

// The location.offset() and bytes.len() must be sector-aligned. However,
// bytes.alignment() need not be the sector size (it will be copied if not).
pub async fn write_raw(
&self,
location: DiskLocation,
bytes: AlignedBytes,
io_type: DiskIoType,
) {
/// The extent.location.offset() and extent.size must be sector-aligned. However,
/// bytes.alignment() may not be the sector size, and bytes.len() may be less than
/// extent.size (it will be copied/padded in these cases).
pub async fn write_raw(&self, extent: Extent, bytes: AlignedBytes, io_type: DiskIoType) {
assert!(
!self.readonly,
"attempting zettacache write in readonly mode"
);
self.verify_aligned(location.offset());
self.verify_aligned(bytes.len());
let disk = location.disk();
let fut =
self.disks
.read()
.unwrap()
.get(disk)
.unwrap()
.write(location.offset(), bytes, io_type);
self.verify_aligned(extent.location.offset());
self.verify_aligned(extent.size);
assert_le!(bytes.len() as u64, extent.size);
let fut = self
.disks
.read()
.unwrap()
.get(extent.location.disk())
.unwrap()
.write(
extent.location.offset(),
extent.size.as_usize(),
bytes,
io_type,
);
// drop disks RwLock before waiting for io
fut.await;
}
Expand Down Expand Up @@ -975,7 +990,7 @@ fn blkgetsize64(file: &File) -> Result<u64> {
Ok(cap)
}

/// Get sector size of block device.
/// Get logical sector size of block device.
fn blksszget(file: &File) -> Result<usize> {
nix::ioctl_read_bad!(ioctl_blksszget, 0x1268, usize);
let mut ssz: usize = 0;
Expand All @@ -984,13 +999,27 @@ fn blksszget(file: &File) -> Result<usize> {
Ok(ssz)
}

/// Get physical sector size of block device.
fn blkpbszget(file: &File) -> Result<usize> {
nix::ioctl_read_bad!(ioctl_blkpbszget, 0x127B, usize);
let mut ssz: usize = 0;
let ssz_ptr = &mut ssz as *mut usize;
unsafe { ioctl_blkpbszget(file.as_raw_fd(), ssz_ptr) }?;
Ok(ssz)
}

/// get (sector_size, disk_size), both in bytes
fn disk_sizes(file: &File) -> Result<(usize, u64)> {
let stat = nix::sys::stat::fstat(file.as_raw_fd())?;
trace!("stat: {:?}", stat);
let mode = SFlag::from_bits_truncate(stat.st_mode);
if mode.contains(SFlag::S_IFBLK) {
Ok((blksszget(file)?, blkgetsize64(file)?))
let sector_size = DISK_SECTOR_SIZE_OVERRIDE
.map(|s| s.as_usize())
.ok_or(())
.or_else(|_| blkpbszget(file))
.or_else(|_| blksszget(file))?;
Ok((sector_size, blkgetsize64(file)?))
} else if mode.contains(SFlag::S_IFREG) {
Ok((*MIN_SECTOR_SIZE, u64::try_from(stat.st_size)?))
} else {
Expand Down
35 changes: 16 additions & 19 deletions cmd/zfs_object_agent/zettacache/src/block_allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,20 +1222,24 @@ impl BlockAllocator {
Some(extent)
}

/// The returned Extent may be larger than the requested size, e.g. due to padding to the
/// disk's sector size. Returns None if there's insufficient space.
pub fn allocate(&mut self, request_size: u32) -> Option<Extent> {
assert_ge!(self.slab_size, request_size);

// Note: we assume allocation sizes are guaranteed to be aligned from the caller for now.
self.block_access.verify_aligned(request_size);

assert_le!(request_size, self.slab_size);
let bucket = self
.slab_buckets
.get_bucket_size_for_allocation_size(request_size);

self.allocate_impl(bucket, request_size)
self.allocate_impl(bucket, self.block_access.round_up_to_sector(request_size))
}

fn allocate_impl(&mut self, bucket_size: SlabBucketSize, request_size: u32) -> Option<Extent> {
/// allocation_size must be sector-aligned
fn allocate_impl(
&mut self,
bucket_size: SlabBucketSize,
allocation_size: u32,
) -> Option<Extent> {
self.block_access.verify_aligned(allocation_size);
let bucket = self.slab_buckets.get_bucket_for_bucket_size(bucket_size);

let slabs_in_bucket = bucket.by_freeness.len();
Expand All @@ -1261,12 +1265,10 @@ impl BlockAllocator {
//
loop {
match bucket.get_current() {
Some(id) => match self.slabs.get_mut(id).allocate(request_size) {
Some(id) => match self.slabs.get_mut(id).allocate(allocation_size) {
Some(extent) => {
super_trace!(
"satisfied {} byte allocation request: {:?}",
request_size,
extent
"satisfied {allocation_size} byte allocation request: {extent:?}",
);
self.dirty_slab_id(id);
self.stats_track_allocation(extent);
Expand All @@ -1281,21 +1283,16 @@ impl BlockAllocator {
);
}
},
None => match self.allocate_from_new_slab(request_size) {
None => match self.allocate_from_new_slab(allocation_size) {
Some(extent) => {
super_trace!(
"satisfied {} byte allocation request: {:?}",
request_size,
extent
"satisfied {allocation_size} byte allocation request: {extent:?}",
);
return Some(extent);
}
None => {
super_trace!(
"allocation of {} bytes failed; no free slabs left; {} slabs used for {:?}",
request_size,
slabs_in_bucket,
bucket_size
"allocation of {allocation_size} bytes failed; no free slabs left; {slabs_in_bucket} slabs used for {bucket_size:?}",
);
return None;
}
Expand Down
6 changes: 1 addition & 5 deletions cmd/zfs_object_agent/zettacache/src/block_based_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,7 @@ impl<T: BlockBasedLogEntry> BlockBasedLogPhys<T> {
.read_raw(slab_extent, DiskIoType::MaintenanceRead)
.await;
block_access
.write_raw(
new_slab_extent.location,
bytes,
DiskIoType::MaintenanceWrite,
)
.write_raw(new_slab_extent, bytes, DiskIoType::MaintenanceWrite)
.await;
slab_allocator.free(*slab);
*slab = new_slab;
Expand Down
6 changes: 1 addition & 5 deletions cmd/zfs_object_agent/zettacache/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ impl CheckpointPhys {
.range(0, chunk.len() as u64);
async move {
block_access
.write_raw(
extent.location,
raw.slice_ref(chunk),
DiskIoType::MaintenanceWrite,
)
.write_raw(extent, raw.slice_ref(chunk), DiskIoType::MaintenanceWrite)
.await;
extent
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/zettacache/src/superblock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl SuperblockPhys {
// XXX pad it out to SUPERBLOCK_SIZE?
block_access
.write_raw(
DiskLocation::new(disk, 0),
Extent::new(disk, 0, raw.len() as u64),
raw,
DiskIoType::MaintenanceWrite,
)
Expand Down
6 changes: 5 additions & 1 deletion cmd/zfs_object_agent/zettacache/src/zettacache/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,11 @@ impl MergeState {
.read_raw(*old, DiskIoType::MaintenanceRead)
.await;
block_access
.write_raw(*new, bytes, DiskIoType::MaintenanceWrite)
.write_raw(
Extent::new(new.disk(), new.offset(), old.size),
bytes,
DiskIoType::MaintenanceWrite,
)
.await;
}
},
Expand Down
23 changes: 12 additions & 11 deletions cmd/zfs_object_agent/zettacache/src/zettacache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1781,17 +1781,20 @@ impl Locked {
return future::ready(Err(InsertError::PendingChanges)).left_future();
}

let buf_size = bytes.len();
let location = match self.allocate_block(u32::try_from(buf_size).unwrap()) {
Some(location) => location,
let extent = match self.allocate_block(u32::try_from(bytes.len()).unwrap()) {
Some(extent) => extent,
None => return future::ready(Err(InsertError::Allocation)).left_future(),
};

// XXX if this is past the last block of the main index, we can write it
// there (and location_dirty:false) instead of logging it

let key = locked_key.key(pool_guids);
let value = IndexValue::new(Some(location), u32::try_from(buf_size).unwrap(), self.atime);
let value = IndexValue::new(
Some(extent.location),
u32::try_from(extent.size).unwrap(),
self.atime,
);

if let Some(pc) = with_alloctag(Self::PENDING_CHANGES_TAG, || {
self.pending_changes
Expand Down Expand Up @@ -1832,7 +1835,7 @@ impl Locked {
let block_access = self.block_access.clone();
async move {
block_access
.write_raw(location, bytes, DiskIoType::WriteDataForInsert)
.write_raw(extent, bytes, DiskIoType::WriteDataForInsert)
.await;

// We need to move the write_permit and locked_key guards into this closure so that
Expand All @@ -1851,12 +1854,10 @@ impl Locked {
.right_future()
}

/// returns offset, or None if there's no space
fn allocate_block(&mut self, size: u32) -> Option<DiskLocation> {
self.block_allocator.allocate(size).map(|extent| {
self.block_access.verify_aligned(extent.location.offset());
extent.location
})
/// The returned Extent may be larger than the requested size, e.g. due to padding to the
/// disk's sector size. Returns None if there's insufficient space.
fn allocate_block(&mut self, size: u32) -> Option<Extent> {
self.block_allocator.allocate(size)
}

/// Flush out the current set of pending index changes. This is a recovery point in case of
Expand Down
3 changes: 3 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/root_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ impl RootConnectionState {
measure!("RootConnectionState::read_block").spawn(async move {
let mut data = pool.read_block(BlockId(request.block), heal).await;

// The zettacache may return a larger block than we expected, due to padding.
data.truncate(request.size as usize);

//
// If the cache has the wrong content/size for this BlockId, then proactively do a
// healing read from the object store.
Expand Down

0 comments on commit afcce54

Please sign in to comment.