diff --git a/cmd/zfs_object_agent/Cargo.lock b/cmd/zfs_object_agent/Cargo.lock index da31414a4b63..8ec0d972c92a 100644 --- a/cmd/zfs_object_agent/Cargo.lock +++ b/cmd/zfs_object_agent/Cargo.lock @@ -3422,6 +3422,7 @@ dependencies = [ "futures-core", "hostname", "http", + "itertools", "lazy_static", "libc", "log", diff --git a/cmd/zfs_object_agent/zettaobject/Cargo.toml b/cmd/zfs_object_agent/zettaobject/Cargo.toml index 61c0c253f661..2ede3fcd0ba2 100644 --- a/cmd/zfs_object_agent/zettaobject/Cargo.toml +++ b/cmd/zfs_object_agent/zettaobject/Cargo.toml @@ -31,6 +31,7 @@ futures = "0.3.13" futures-core = "0.3.13" hostname = "0.3.1" http = "0.2.4" +itertools = "0.10.3" lazy_static = "1.4.0" libc = "0.2" log = "0.4" diff --git a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs index 4abca05945d1..9d4c35eb1387 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs @@ -9,6 +9,7 @@ use futures::future::join_all; use futures::stream; use futures::stream::StreamExt; use futures_core::Stream; +use itertools::Either; use log::*; use serde::de::DeserializeOwned; use serde::Deserialize; @@ -252,7 +253,7 @@ impl ObjectBasedLog { } /// Iterates the on-disk state; panics if there are pending changes. - pub fn iterate(&self) -> impl Stream { + pub fn iterate(&self) -> impl Stream> { assert_eq!(self.num_flushed_chunks, self.num_chunks); assert!(self.pending_entries.is_empty()); assert!(self.pending_flushes.is_empty()); @@ -265,7 +266,7 @@ impl ObjectBasedLog { fn iter_impl( &self, first_chunk_opt: Option, - ) -> (impl Stream, ObjectBasedLogRemainder) { + ) -> (impl Stream>, ObjectBasedLogRemainder) { let first_chunk = match first_chunk_opt { Some(remainder) => remainder.chunk, None => 0, @@ -277,9 +278,8 @@ impl ObjectBasedLog { let shared_state = shared_state.clone(); let name = name.clone(); async move { - ObjectBasedLogChunk::get(&shared_state.object_access, &name, generation, chunk) + ObjectBasedLogChunk::::get(&shared_state.object_access, &name, generation, chunk) .await - .unwrap() } }); ( @@ -288,7 +288,12 @@ impl ObjectBasedLog { // connections we'd run into the open file descriptor limit. stream::iter(futures) .buffered(*OBJECT_LOG_ITERATE_QUEUE_DEPTH) - .flat_map(|chunk| stream::iter(chunk.entries.into_iter())), + .flat_map(|chunk| { + stream::iter(match chunk { + Ok(c) => Either::Left(c.entries.into_iter().map(|e| Ok(e))), + Err(e) => Either::Right(vec![Err(e)].into_iter()), + }) + }), ObjectBasedLogRemainder { chunk: self.num_flushed_chunks, }, @@ -298,7 +303,7 @@ impl ObjectBasedLog { /// Iterates the on-disk state; pending changes (including pending_entries /// and pending_flushes) will not be visited. Returns token for iterating /// the remainder (entries after those visited here). - pub fn iter_most(&self) -> (impl Stream, ObjectBasedLogRemainder) { + pub fn iter_most(&self) -> (impl Stream>, ObjectBasedLogRemainder) { self.iter_impl(None) } @@ -312,6 +317,6 @@ impl ObjectBasedLog { self.flush(txg).await; // XXX It would be faster if we kept all the "remainder" entries in RAM // until we iter the remainder and transfer it to the new generation. - self.iter_impl(Some(first_chunk)).0 + self.iter_impl(Some(first_chunk)).0.map(|r| r.unwrap()) } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs b/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs index b62464810863..4c273b193896 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_block_map.rs @@ -4,7 +4,7 @@ use std::sync::RwLock; use std::time::Instant; use futures::future; -use futures::StreamExt; +use futures::TryStreamExt; use log::*; use more_asserts::*; use serde::Deserialize; @@ -41,14 +41,14 @@ impl ObjectBlockMap { pub async fn load( storage_object_log: &ObjectBasedLog, next_block: BlockId, - ) -> Self { + ) -> Result { let begin = Instant::now(); let mut num_alloc_entries: u64 = 0; let mut num_free_entries: u64 = 0; let mut map: BTreeSet = BTreeSet::new(); storage_object_log .iterate() - .for_each(|ent| { + .try_for_each(|ent| { match ent { StorageObjectLogEntry::Alloc { object } => { let inserted = with_alloctag(Self::MAP_TAG, || map.insert(object)); @@ -62,20 +62,19 @@ impl ObjectBlockMap { } } - future::ready(()) + future::ready(Ok(())) }) - .await; + .await?; info!( "loaded mapping from {} objects with {} allocs and {} frees in {}ms", storage_object_log.num_chunks, num_alloc_entries, num_free_entries, - begin.elapsed().as_millis() + begin.elapsed().as_millis(), ); - - ObjectBlockMap { + Ok(ObjectBlockMap { state: RwLock::new(ObjectBlockMapState { map, next_block }), - } + }) } pub fn insert(&self, object: ObjectId, next_block: BlockId) { diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index 4f223abe8321..ec0864d0272e 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -815,7 +815,7 @@ impl Pool { // load block -> object mapping let storage_object_log = ObjectBasedLog::open_by_phys(shared_state.clone(), &phys.storage_object_log); - let object_block_map = ObjectBlockMap::load(&storage_object_log, phys.next_block).await; + let object_block_map = ObjectBlockMap::load(&storage_object_log, phys.next_block).await?; let mut logs = Vec::new(); for (i, log_phys) in phys.reclaim_info.reclaim_logs.iter().enumerate() { logs.push(ReclaimLog { @@ -922,7 +922,7 @@ impl Pool { shared_state.clone(), &format!("zfs/{}/StorageObjectLog", guid), ); - let object_block_map = ObjectBlockMap::load(&storage_object_log, BlockId(0)).await; + let object_block_map = ObjectBlockMap::load(&storage_object_log, BlockId(0)).await?; // start with a table of size 1 and a single log let mut logs = Vec::new(); @@ -2236,15 +2236,16 @@ async fn split_pending_frees_log( reclaim_log.pending_free_bytes = 0; pending_frees_log_stream - .for_each(|ent| { + .try_for_each(|ent| { let id = syncing_state.get_log_id(state.object_block_map.block_to_object(ent.block)); assert!(id == original_id || id == sibling_id); let this_reclaim_log = syncing_state.get_pending_frees_log(id); this_reclaim_log.pending_frees_log.append(txg, ent); this_reclaim_log.pending_free_bytes += u64::from(ent.size); - future::ready(()) + future::ready(Ok(())) }) - .await; + .await + .unwrap(); for id in &[original_id, sibling_id] { let log = syncing_state.get_pending_frees_log(*id); @@ -2274,7 +2275,7 @@ async fn split_object_sizes_log( reclaim_log.object_size_log.clear(txg).await; object_size_log_stream - .for_each(|ent| { + .try_for_each(|ent| { let object = match ent { ObjectSizeLogEntry::Exists(object_size) => object_size.object, ObjectSizeLogEntry::Freed { object } => object, @@ -2285,9 +2286,10 @@ async fn split_object_sizes_log( .get_pending_frees_log(id) .object_size_log .append(txg, ent); - future::ready(()) + future::ready(Ok(())) }) - .await; + .await + .unwrap(); for id in &[original_id, sibling_id] { let log = syncing_state.get_pending_frees_log(*id); @@ -2459,7 +2461,7 @@ fn try_reclaim_frees(state: Arc, syncing_state: &mut PoolSyncingState measure!("try_reclaim_frees()").spawn(async move { // load pending frees let (freed_bytes, mut frees_per_object) = - get_frees_per_obj(&state, pending_frees_log_stream).await; + get_frees_per_obj(&state, pending_frees_log_stream.map(|r| r.unwrap())).await; let required_free_bytes = FREE_LOWWATER_PCT.apply(freed_bytes); @@ -2479,7 +2481,7 @@ fn try_reclaim_frees(state: Arc, syncing_state: &mut PoolSyncingState } // load object sizes - let object_sizes = get_object_sizes(object_size_log_stream).await; + let object_sizes = get_object_sizes(object_size_log_stream.map(|r| r.unwrap())).await; let begin = Instant::now();