Skip to content

Commit

Permalink
Streams cleanup (#56)
Browse files Browse the repository at this point in the history
With regards to Streams, and specifically Streams of Futures, the
relationship of .buffered() and Futures[Un]Ordered wasn't clear to me
when I started using them. There's substantial simplification that we
can do in this area.

.buffered() consumes a Stream of Futures. This stream can be created
more simply from an iterator of Futures; FuturesOrdered need not be
involved.

In some cases, the stream! macro isn't needed, we can implement the
stream by using the stream combinators in StreamExt.
  • Loading branch information
ahrens authored Dec 14, 2021
1 parent 808f37e commit c683ce2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 98 deletions.
17 changes: 3 additions & 14 deletions cmd/zfs_object_agent/zettacache/src/block_based_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::extent_allocator::ExtentAllocator;
use crate::extent_allocator::ExtentAllocatorBuilder;
use anyhow::Context;
use async_stream::stream;
use futures::stream;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures_core::Stream;
Expand Down Expand Up @@ -118,20 +119,8 @@ impl<T: BlockBasedLogEntry> BlockBasedLogPhys<T> {
}

pub fn iter_entries(&self, block_access: Arc<BlockAccess>) -> impl Stream<Item = T> {
let stream = self.iter_chunks(block_access);
let phys_entries = self.num_entries;

stream! {
let mut num_entries = 0;
for await chunk in stream {
for entry in chunk.entries {
yield entry;
num_entries += 1;
}
};

assert_eq!(phys_entries, num_entries);
}
self.iter_chunks(block_access)
.flat_map(|chunk| stream::iter(chunk.entries.into_iter()))
}

pub fn bytes(&self) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/zettaobject/src/object_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ impl ObjectAccess {

let result = retry(&format!("put {} ({} bytes)", key, len), timeout, || async {
let my_bytes = bytes.clone();
let stream = ByteStream::new_with_size(stream! { yield Ok(my_bytes)}, len);
let stream = ByteStream::new_with_size(stream::iter(iter::once(Ok(my_bytes))), len);

let req = PutObjectRequest {
bucket: self.bucket_str.clone(),
Expand Down
45 changes: 20 additions & 25 deletions cmd/zfs_object_agent/zettaobject/src/object_based_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::base_types::*;
use crate::object_access::{ObjectAccess, ObjectAccessStatType};
use crate::pool::PoolSharedState;
use anyhow::{Context, Result};
use async_stream::stream;
use futures::future;
use futures::future::join;
use futures::future::join_all;
use futures::future::{self, join};
use futures::stream::{FuturesOrdered, StreamExt};
use futures::stream::{self, StreamExt};
use futures_core::Stream;
use lazy_static::lazy_static;
use log::*;
Expand Down Expand Up @@ -327,34 +327,29 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
&self,
first_chunk_opt: Option<ObjectBasedLogRemainder>,
) -> (impl Stream<Item = T>, ObjectBasedLogRemainder) {
let mut stream = FuturesOrdered::new();
let generation = self.generation;
let first_chunk = match first_chunk_opt {
Some(rem) => rem.chunk,
Some(remainder) => remainder.chunk,
None => 0,
};
for chunk in first_chunk..self.num_flushed_chunks {
let shared_state = self.shared_state.clone();
let n = self.name.clone();
stream.push(future::ready(async move {
ObjectBasedLogChunk::get(&shared_state.object_access, &n, generation, chunk)
let shared_state = self.shared_state.clone();
let name = self.name.clone();
let generation = self.generation;
let futures = (first_chunk..self.num_flushed_chunks).map(move |chunk| {
let shared_state = shared_state.clone();
let name = name.clone();
async move {
ObjectBasedLogChunk::get(&shared_state.object_access, &name, generation, chunk)
.await
.unwrap()
}));
}
// Note: buffered() is needed because rust-s3 creates one connection for
// each request, rather than using a connection pool. If we created 1000
// connections we'd run into the open file descriptor limit.
let mut buffered_stream = stream.buffered(*OBJECT_LOG_ITERATE_QUEUE_DEPTH);
}
});
(
stream! {
while let Some(chunk) = buffered_stream.next().await {
trace!("yielding entries of chunk {}", chunk.chunk);
for ent in chunk.entries {
yield ent;
}
}
},
// Note: buffered() is needed because rust-s3 creates one connection for
// each request, rather than using a connection pool. If we created 1000
// connections we'd run into the open file descriptor limit.
stream::iter(futures)
.buffered(*OBJECT_LOG_ITERATE_QUEUE_DEPTH)
.flat_map(|chunk| stream::iter(chunk.entries.into_iter())),
ObjectBasedLogRemainder {
chunk: self.num_flushed_chunks,
},
Expand Down
99 changes: 41 additions & 58 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2099,67 +2099,53 @@ async fn get_frees_per_obj(
}

async fn reclaim_frees_object(
state: Arc<PoolState>,
state: &PoolState,
objects: Vec<(ObjectSize, Vec<PendingFreesLogEntry>)>,
) -> ObjectSize {
let first_object = objects[0].0.object;
let shared_state = state.shared_state.clone();
// objects should be sorted
assert!(objects.windows(2).all(|w| w[0].0.object < w[1].0.object));

let first_object = objects.first().unwrap().0.object;
let last_object = objects.last().unwrap().0.object;

// Note that the .reduce() below can't completely determine the next_block
// because if we skip the GET (because this object doesn't have any
// non-freed blocks), it won't be visited by the .reduce() when the
// filter_map() below returns None.
let next_block = state.object_block_map.object_to_next_block(last_object);
trace!(
"reclaim: consolidating {} objects into {:?} to free {} blocks",
"reclaim: consolidating {} objects into {:?} to free {} blocks (last {:?} next {:?})",
objects.len(),
first_object,
objects.iter().map(|x| x.1.len()).sum::<usize>()
objects.iter().map(|(_, frees)| frees.len()).sum::<usize>(),
last_object,
next_block,
);

struct FirstInfo {
object: ObjectId,
next_block: BlockId,
}

let stream = FuturesUnordered::new();
let mut to_delete = Vec::new();
let mut first = None;
for (object_size, frees) in objects {
let futures = objects.into_iter().filter_map(move |(object_size, frees)| {
let object = object_size.object;
let min_block = object.as_min_block();
let next_block = state.object_block_map.object_to_next_block(object);

match &mut first {
None => {
// This is the first object.
first = Some(FirstInfo { object, next_block })
}
Some(first) => {
// This is not the first object. It needs to be deleted, and
// its min/next_block needs to be folded into the FirstInfo.
// Note that the .reduce() below can't completely determine the
// min/next_block because if we skip the GET (because this
// object doesn't have any non-freed blocks), it won't be
// visited by the .reduce(), because we `continue` here.
assert_gt!(object, first.object);
to_delete.push(object);
first.next_block = max(first.next_block, next_block);
if object_size.num_blocks == 0 {
trace!(
"reclaim: moving 0 blocks from {:?} (BlockID[{},{})) because all {} blocks were freed",
object,
min_block,
next_block,
frees.len(),
);
assert_eq!(object_size.num_bytes, 0);
continue;
}
}
if object != first_object && object_size.num_blocks == 0 {
trace!(
"reclaim: moving 0 blocks from {:?} (BlockID[{},{})) because all {} blocks were freed",
object,
min_block,
next_block,
frees.len(),
);
assert_eq!(object_size.num_bytes, 0);
return None;
}

let my_shared_state = shared_state.clone();
stream.push(future::ready(async move {
let shared_state = state.shared_state.clone();
Some(async move {
// Bypass object cache so that it isn't added, so that when we
// overwrite it with put(), we don't need to copy the data into the
// cache to invalidate.
let mut phys =
DataObject::get(&my_shared_state.object_access, my_shared_state.guid, object, ObjectAccessStatType::ReclaimGet, true)
DataObject::get(&shared_state.object_access, shared_state.guid, object, ObjectAccessStatType::ReclaimGet, true)
.await
.unwrap();

Expand Down Expand Up @@ -2188,8 +2174,7 @@ async fn reclaim_frees_object(
// uncommitted consolidation. Therefore, if the expected size is
// zero, we can remove this object without reading it because it
// doesn't have any required blocks. That happens above, where we
// `continue`.

// `return None`.
if phys.header.next_block != next_block {
debug!("reclaim: {:?} expected next {:?}, found next {:?}, trimming uncommitted consolidation",
object, next_block, phys.header.next_block);
Expand All @@ -2208,9 +2193,9 @@ async fn reclaim_frees_object(
assert_eq!(phys.blocks_len(), object_size.num_blocks);

phys
}));
}
let mut new_phys = stream
})
});
let mut new_phys = stream::iter(futures)
.buffered(*RECLAIM_ONE_BUFFERED)
.reduce(|mut a, mut b| async move {
assert_eq!(a.header.guid, b.header.guid);
Expand Down Expand Up @@ -2256,19 +2241,17 @@ async fn reclaim_frees_object(
.await
.unwrap();

if let Some(first) = first {
// Fold in the next_block info which includes the skipped objects.
assert_eq!(new_phys.header.object, first.object);
assert_le!(new_phys.header.next_block, first.next_block);
new_phys.header.next_block = first.next_block;
}
// Fold in the next_block info which includes the skipped objects.
assert_eq!(new_phys.header.object, first_object);
assert_le!(new_phys.header.next_block, next_block);
new_phys.header.next_block = next_block;

// XXX would be nice to skip this if we didn't actually make any change
// (because we already did it all before crashing)
trace!("reclaim: rewriting {}", new_phys);
new_phys
.put(
&shared_state.object_access,
&state.shared_state.object_access,
ObjectAccessStatType::ReclaimPut,
)
.await;
Expand Down Expand Up @@ -2622,10 +2605,10 @@ fn try_reclaim_frees(state: Arc<PoolState>, syncing_state: &mut PoolSyncingState
.try_into()
.unwrap();
let permit = outstanding.clone().acquire_many_owned(num_permits).await;
let state2 = state.clone();
let state = state.clone();
join_handles.push(tokio::spawn(async move {
let _permit = permit; // force permit to be moved & dropped in the task
reclaim_frees_object(state2, objects_to_consolidate).await
reclaim_frees_object(&state, objects_to_consolidate).await
}));
if freed_blocks_bytes > required_free_bytes {
break;
Expand Down

0 comments on commit c683ce2

Please sign in to comment.