Skip to content

Commit

Permalink
fix: more robustness in the face of a trampling-herd of threads loadi…
Browse files Browse the repository at this point in the history
…ng a single index.

The motivating example is here: praetorian-inc/noseyparker#179
  • Loading branch information
Byron committed May 11, 2024
1 parent 7b3dc92 commit 9ca81df
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 19 deletions.
77 changes: 68 additions & 9 deletions gitoxide-core/src/repository/odb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::io;
use std::sync::atomic::Ordering;

use anyhow::bail;

Expand Down Expand Up @@ -50,6 +51,8 @@ pub mod statistics {
pub struct Options {
pub format: OutputFormat,
pub thread_limit: Option<usize>,
/// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
pub extra_header_lookup: bool,
}
}

Expand All @@ -59,7 +62,11 @@ pub fn statistics(
mut progress: impl gix::Progress,
out: impl io::Write,
mut err: impl io::Write,
statistics::Options { format, thread_limit }: statistics::Options,
statistics::Options {
format,
thread_limit,
extra_header_lookup,
}: statistics::Options,
) -> anyhow::Result<()> {
use bytesize::ByteSize;
use gix::odb::{find, HeaderExt};
Expand All @@ -76,6 +83,10 @@ pub fn statistics(
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
#[derive(Default)]
struct Statistics {
/// All objects that were used to produce these statistics.
/// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
#[cfg_attr(feature = "serde", serde(skip_serializing))]
ids: Option<Vec<gix::ObjectId>>,
total_objects: usize,
loose_objects: usize,
packed_objects: usize,
Expand Down Expand Up @@ -135,14 +146,17 @@ pub fn statistics(
}

impl gix::parallel::Reduce for Reduce {
type Input = Result<Vec<gix::odb::find::Header>, anyhow::Error>;
type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
type FeedProduce = ();
type Output = Statistics;
type Error = anyhow::Error;

fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
for item in items? {
for (id, item) in items? {
self.stats.consume(item);
if let Some(ids) = self.stats.ids.as_mut() {
ids.push(id);
}
}
Ok(())
}
Expand All @@ -154,9 +168,9 @@ pub fn statistics(
}

let cancelled = || anyhow::anyhow!("Cancelled by user");
let object_ids = repo.objects.store_ref().iter()?.filter_map(Result::ok);
let object_ids = repo.objects.iter()?.filter_map(Result::ok);
let chunk_size = 1_000;
let stats = if gix::parallel::num_threads(thread_limit) > 1 {
let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
gix::parallel::in_parallel(
gix::interrupt::Iter::new(
gix::features::iter::Chunks {
Expand All @@ -166,19 +180,30 @@ pub fn statistics(
cancelled,
),
thread_limit,
move |_| (repo.objects.clone().into_inner(), counter),
{
let objects = repo.objects.clone();
move |_| (objects.clone().into_inner(), counter)
},
|ids, (handle, counter)| {
let ids = ids?;
counter.fetch_add(ids.len(), std::sync::atomic::Ordering::Relaxed);
counter.fetch_add(ids.len(), Ordering::Relaxed);
let out = ids
.into_iter()
.map(|id| handle.header(id))
.map(|id| handle.header(id).map(|hdr| (id, hdr)))
.collect::<Result<Vec<_>, _>>()?;
Ok(out)
},
Reduce::default(),
Reduce {
stats: Statistics {
ids: extra_header_lookup.then(Vec::new),
..Default::default()
},
},
)?
} else {
if extra_header_lookup {
bail!("extra-header-lookup is only meaningful in threaded mode");
}
let mut stats = Statistics::default();

for (count, id) in object_ids.enumerate() {
Expand All @@ -193,6 +218,40 @@ pub fn statistics(

progress.show_throughput(start);

if let Some(mut ids) = stats.ids.take() {
// Critical to re-open the repo to assure we don't have any ODB state and start fresh.
let start = std::time::Instant::now();
let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
progress.set_name("re-counting".into());
progress.init(Some(ids.len()), gix::progress::count("objects"));
let counter = progress.counter();
counter.store(0, Ordering::Relaxed);
dbg!("starting");
let errors = gix::parallel::in_parallel_with_slice(
&mut ids,
thread_limit,
{
let objects = repo.objects.clone();
move |_| (objects.clone().into_inner(), counter, false)
},
|id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
counter.fetch_add(1, Ordering::Relaxed);
if let Err(_err) = odb.header(id) {
*has_error = true;
gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found");
}
Ok(())
},
|| Some(std::time::Duration::from_millis(100)),
|(_, _, has_error)| has_error,
)?;

progress.show_throughput(start);
if errors.contains(&true) {
bail!("At least one object couldn't be looked up even though it must exist");
}
}

#[cfg(feature = "serde")]
{
serde_json::to_writer_pretty(out, &stats)?;
Expand Down
24 changes: 18 additions & 6 deletions gix-odb/src/store_impls/dynamic/load_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl super::Store {
Ok(Some(self.collect_snapshot()))
} else {
// always compare to the latest state
// Nothing changed in the mean time, try to load another index…
// Nothing changed in the meantime, try to load another index…
if self.load_next_index(index) {
Ok(Some(self.collect_snapshot()))
} else {
Expand Down Expand Up @@ -119,7 +119,7 @@ impl super::Store {
let slot = &self.files[index.slot_indices[slot_map_index]];
let _lock = slot.write.lock();
if slot.generation.load(Ordering::SeqCst) > index.generation {
// There is a disk consolidation in progress which just overwrote a slot that cold be disposed with some other
// There is a disk consolidation in progress which just overwrote a slot that could be disposed with some other
// index, one we didn't intend to load.
// Continue with the next slot index in the hope there is something else we can do…
continue 'retry_with_next_slot_index;
Expand All @@ -134,7 +134,8 @@ impl super::Store {
slot.files.store(bundle);
break 'retry_with_next_slot_index;
}
Err(_) => {
Err(_err) => {
gix_features::trace::error!(err=?_err, "Failed to load index file - some objects may seem to not exist");
slot.files.store(bundle);
continue 'retry_with_next_slot_index;
}
Expand Down Expand Up @@ -183,27 +184,30 @@ impl super::Store {
let previous_index_state = Arc::as_ptr(&index) as usize;

// IMPORTANT: get a lock after we recorded the previous state.
eprintln!("needs_init: {needs_init}, load_new_index: {load_new_index}");
let write = self.write.lock();
let objects_directory = &self.path;

// Now we know the index isn't going to change anymore, even though threads might still load indices in the meantime.
let index = self.index.load();
if previous_index_state != Arc::as_ptr(&index) as usize {
// Someone else took the look before and changed the index. Return it without doing any additional work.
dbg!("somebody already did the work - arc pointer changed");
return Ok(Some(self.collect_snapshot()));
}

let was_uninitialized = !index.is_initialized();

// We might not be able to detect by pointer if the state changed, as this itself is racy. So we keep track of double-initialization
// using a flag, which means that if `needs_init` was true we saw the index uninitialized once, but now that we are here it's
// initialized meaning that somebody was faster and we couldn't detect it by comparisons to the index.
// initialized meaning that somebody was faster, and we couldn't detect it by comparisons to the index.
// If so, make sure we collect the snapshot instead of returning None in case nothing actually changed, which is likely with a
// race like this.
if !was_uninitialized && needs_init {
dbg!("somebody already did the work - isn't uninitialized anymore");
return Ok(Some(self.collect_snapshot()));
}
self.num_disk_state_consolidation.fetch_add(1, Ordering::Relaxed);
dbg!(self.num_disk_state_consolidation.fetch_add(1, Ordering::Relaxed));

let db_paths: Vec<_> = std::iter::once(objects_directory.to_owned())
.chain(crate::alternate::resolve(objects_directory.clone(), &self.current_dir)?)
Expand Down Expand Up @@ -382,6 +386,7 @@ impl super::Store {
.unwrap_or_else(|| Arc::new(num_loaded_indices.into())),
num_indices_currently_being_loaded: Default::default(),
});
dbg!("storing new index");
self.index.store(new_index);
}

Expand All @@ -408,7 +413,14 @@ impl super::Store {

let new_index = self.index.load();
Ok(if index.state_id() == new_index.state_id() {
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops
dbg!("nothing changed after reloading");
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops.
// However, if we came here in desperation for something, we should collect what's there for a chance that
// it will be what was needed.
// The original problem here is that a trampling-herd of threads want to load just a single index. Most of these threads
// notice that something changed in the meantime and can avoid checking disk once again. However, one might not get it
// as their index is just too new. Maybe there is a way to detect this kind of race or to prevent it?
// load_new_index.then(|| self.collect_snapshot())
None
} else {
if load_new_index {
Expand Down
2 changes: 1 addition & 1 deletion gix-odb/src/store_impls/dynamic/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(crate) type AtomicGeneration = AtomicU32;

/// A way to indicate which pack indices we have seen already and which of them are loaded, along with an idea
/// of whether stored `PackId`s are still usable.
#[derive(Default, Copy, Clone)]
#[derive(Default, Copy, Clone, Debug)]
pub struct SlotIndexMarker {
/// The generation the `loaded_until_index` belongs to. Indices of different generations are completely incompatible.
/// This value changes once the internal representation is compacted, something that may happen only if there is no handle
Expand Down
8 changes: 6 additions & 2 deletions src/plumbing/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ pub fn main() -> Result<()> {
),
},
Subcommands::Odb(cmd) => match cmd {
odb::Subcommands::Stats => prepare_and_run(
odb::Subcommands::Stats { extra_header_lookup } => prepare_and_run(
"odb-stats",
trace,
auto_verbose,
Expand All @@ -1166,7 +1166,11 @@ pub fn main() -> Result<()> {
progress,
out,
err,
core::repository::odb::statistics::Options { format, thread_limit },
core::repository::odb::statistics::Options {
format,
thread_limit,
extra_header_lookup,
},
)
},
),
Expand Down
6 changes: 5 additions & 1 deletion src/plumbing/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,11 @@ pub mod odb {
Info,
/// Count and obtain information on all, possibly duplicate, objects in the database.
#[clap(visible_alias = "statistics")]
Stats,
Stats {
/// Lookup headers again, but without preloading indices.
#[clap(long)]
extra_header_lookup: bool,
},
}
}

Expand Down

0 comments on commit 9ca81df

Please sign in to comment.