Skip to content

Commit

Permalink
DLPX-81923 zfs object agent panicked while retrieving blob and is not…
Browse files Browse the repository at this point in the history
… recoverable (blob doesn't exist) (openzfs#512)

Signed-off-by: Paul Dagnelie <pcd@delphix.com>
  • Loading branch information
pcd1193182 authored Jul 18, 2022
1 parent a8c3356 commit 9ed4bb6
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 26 deletions.
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/zfs_object_agent/zettaobject/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = "0.3.13"
futures-core = "0.3.13"
hostname = "0.3.1"
http = "0.2.4"
itertools = "0.10.3"
lazy_static = "1.4.0"
libc = "0.2"
log = "0.4"
Expand Down
19 changes: 12 additions & 7 deletions cmd/zfs_object_agent/zettaobject/src/object_based_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::future::join_all;
use futures::stream;
use futures::stream::StreamExt;
use futures_core::Stream;
use itertools::Either;
use log::*;
use serde::de::DeserializeOwned;
use serde::Deserialize;
Expand Down Expand Up @@ -252,7 +253,7 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
}

/// Iterates the on-disk state; panics if there are pending changes.
pub fn iterate(&self) -> impl Stream<Item = T> {
pub fn iterate(&self) -> impl Stream<Item = Result<T>> {
assert_eq!(self.num_flushed_chunks, self.num_chunks);
assert!(self.pending_entries.is_empty());
assert!(self.pending_flushes.is_empty());
Expand All @@ -265,7 +266,7 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
fn iter_impl(
&self,
first_chunk_opt: Option<ObjectBasedLogRemainder>,
) -> (impl Stream<Item = T>, ObjectBasedLogRemainder) {
) -> (impl Stream<Item = Result<T>>, ObjectBasedLogRemainder) {
let first_chunk = match first_chunk_opt {
Some(remainder) => remainder.chunk,
None => 0,
Expand All @@ -277,9 +278,8 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
let shared_state = shared_state.clone();
let name = name.clone();
async move {
ObjectBasedLogChunk::get(&shared_state.object_access, &name, generation, chunk)
ObjectBasedLogChunk::<T>::get(&shared_state.object_access, &name, generation, chunk)
.await
.unwrap()
}
});
(
Expand All @@ -288,7 +288,12 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
// connections we'd run into the open file descriptor limit.
stream::iter(futures)
.buffered(*OBJECT_LOG_ITERATE_QUEUE_DEPTH)
.flat_map(|chunk| stream::iter(chunk.entries.into_iter())),
.flat_map(|chunk| {
stream::iter(match chunk {
Ok(c) => Either::Left(c.entries.into_iter().map(|e| Ok(e))),
Err(e) => Either::Right(vec![Err(e)].into_iter()),
})
}),
ObjectBasedLogRemainder {
chunk: self.num_flushed_chunks,
},
Expand All @@ -298,7 +303,7 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
/// Iterates the on-disk state; pending changes (including pending_entries
/// and pending_flushes) will not be visited. Returns token for iterating
/// the remainder (entries after those visited here).
pub fn iter_most(&self) -> (impl Stream<Item = T>, ObjectBasedLogRemainder) {
pub fn iter_most(&self) -> (impl Stream<Item = Result<T>>, ObjectBasedLogRemainder) {
self.iter_impl(None)
}

Expand All @@ -312,6 +317,6 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
self.flush(txg).await;
// XXX It would be faster if we kept all the "remainder" entries in RAM
// until we iter the remainder and transfer it to the new generation.
self.iter_impl(Some(first_chunk)).0
self.iter_impl(Some(first_chunk)).0.map(|r| r.unwrap())
}
}
17 changes: 8 additions & 9 deletions cmd/zfs_object_agent/zettaobject/src/object_block_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::RwLock;
use std::time::Instant;

use futures::future;
use futures::StreamExt;
use futures::TryStreamExt;
use log::*;
use more_asserts::*;
use serde::Deserialize;
Expand Down Expand Up @@ -41,14 +41,14 @@ impl ObjectBlockMap {
pub async fn load(
storage_object_log: &ObjectBasedLog<StorageObjectLogEntry>,
next_block: BlockId,
) -> Self {
) -> Result<Self, anyhow::Error> {
let begin = Instant::now();
let mut num_alloc_entries: u64 = 0;
let mut num_free_entries: u64 = 0;
let mut map: BTreeSet<ObjectId> = BTreeSet::new();
storage_object_log
.iterate()
.for_each(|ent| {
.try_for_each(|ent| {
match ent {
StorageObjectLogEntry::Alloc { object } => {
let inserted = with_alloctag(Self::MAP_TAG, || map.insert(object));
Expand All @@ -62,20 +62,19 @@ impl ObjectBlockMap {
}
}

future::ready(())
future::ready(Ok(()))
})
.await;
.await?;
info!(
"loaded mapping from {} objects with {} allocs and {} frees in {}ms",
storage_object_log.num_chunks,
num_alloc_entries,
num_free_entries,
begin.elapsed().as_millis()
begin.elapsed().as_millis(),
);

ObjectBlockMap {
Ok(ObjectBlockMap {
state: RwLock::new(ObjectBlockMapState { map, next_block }),
}
})
}

pub fn insert(&self, object: ObjectId, next_block: BlockId) {
Expand Down
22 changes: 12 additions & 10 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ impl Pool {
// load block -> object mapping
let storage_object_log =
ObjectBasedLog::open_by_phys(shared_state.clone(), &phys.storage_object_log);
let object_block_map = ObjectBlockMap::load(&storage_object_log, phys.next_block).await;
let object_block_map = ObjectBlockMap::load(&storage_object_log, phys.next_block).await?;
let mut logs = Vec::new();
for (i, log_phys) in phys.reclaim_info.reclaim_logs.iter().enumerate() {
logs.push(ReclaimLog {
Expand Down Expand Up @@ -922,7 +922,7 @@ impl Pool {
shared_state.clone(),
&format!("zfs/{}/StorageObjectLog", guid),
);
let object_block_map = ObjectBlockMap::load(&storage_object_log, BlockId(0)).await;
let object_block_map = ObjectBlockMap::load(&storage_object_log, BlockId(0)).await?;

// start with a table of size 1 and a single log
let mut logs = Vec::new();
Expand Down Expand Up @@ -2236,15 +2236,16 @@ async fn split_pending_frees_log(
reclaim_log.pending_free_bytes = 0;

pending_frees_log_stream
.for_each(|ent| {
.try_for_each(|ent| {
let id = syncing_state.get_log_id(state.object_block_map.block_to_object(ent.block));
assert!(id == original_id || id == sibling_id);
let this_reclaim_log = syncing_state.get_pending_frees_log(id);
this_reclaim_log.pending_frees_log.append(txg, ent);
this_reclaim_log.pending_free_bytes += u64::from(ent.size);
future::ready(())
future::ready(Ok(()))
})
.await;
.await
.unwrap();

for id in &[original_id, sibling_id] {
let log = syncing_state.get_pending_frees_log(*id);
Expand Down Expand Up @@ -2274,7 +2275,7 @@ async fn split_object_sizes_log(
reclaim_log.object_size_log.clear(txg).await;

object_size_log_stream
.for_each(|ent| {
.try_for_each(|ent| {
let object = match ent {
ObjectSizeLogEntry::Exists(object_size) => object_size.object,
ObjectSizeLogEntry::Freed { object } => object,
Expand All @@ -2285,9 +2286,10 @@ async fn split_object_sizes_log(
.get_pending_frees_log(id)
.object_size_log
.append(txg, ent);
future::ready(())
future::ready(Ok(()))
})
.await;
.await
.unwrap();

for id in &[original_id, sibling_id] {
let log = syncing_state.get_pending_frees_log(*id);
Expand Down Expand Up @@ -2459,7 +2461,7 @@ fn try_reclaim_frees(state: Arc<PoolState>, syncing_state: &mut PoolSyncingState
measure!("try_reclaim_frees()").spawn(async move {
// load pending frees
let (freed_bytes, mut frees_per_object) =
get_frees_per_obj(&state, pending_frees_log_stream).await;
get_frees_per_obj(&state, pending_frees_log_stream.map(|r| r.unwrap())).await;

let required_free_bytes = FREE_LOWWATER_PCT.apply(freed_bytes);

Expand All @@ -2479,7 +2481,7 @@ fn try_reclaim_frees(state: Arc<PoolState>, syncing_state: &mut PoolSyncingState
}

// load object sizes
let object_sizes = get_object_sizes(object_size_log_stream).await;
let object_sizes = get_object_sizes(object_size_log_stream.map(|r| r.unwrap())).await;

let begin = Instant::now();

Expand Down

0 comments on commit 9ed4bb6

Please sign in to comment.