Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
refactor untar_snapshot_in to push parallelism deeper for further ref…
Browse files Browse the repository at this point in the history
…actoring (#18310)
  • Loading branch information
jeffwashington authored Jun 29, 2021
1 parent 52fd10c commit ce53b84
Showing 1 changed file with 74 additions and 43 deletions.
117 changes: 74 additions & 43 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,9 @@ pub struct BankFromArchiveTimings {
pub verify_snapshot_bank_us: u64,
}

// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
pub const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;

#[allow(clippy::too_many_arguments)]
pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
account_paths: &[PathBuf],
Expand All @@ -620,26 +623,17 @@ pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
.tempdir_in(snapshot_path)?;

let mut untar = Measure::start("snapshot untar");
// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
let divisions = std::cmp::min(4, std::cmp::max(1, num_cpus::get() / 4));
// create 'divisions' # of parallel workers, each responsible for 1/divisions of all the files to extract.
let all_unpacked_append_vec_map = (0..divisions)
.into_par_iter()
.map(|index| {
untar_snapshot_in(
&snapshot_tar,
unpack_dir.as_ref(),
account_paths,
archive_format,
Some(ParallelSelector { index, divisions }),
)
})
.collect::<Vec<_>>();
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for h in all_unpacked_append_vec_map {
unpacked_append_vec_map.extend(h?);
}

let divisions = std::cmp::min(
PARALLEL_UNTAR_READERS_DEFAULT,
std::cmp::max(1, num_cpus::get() / 4),
);
let unpacked_append_vec_map = untar_snapshot_in(
&snapshot_tar,
unpack_dir.as_ref(),
account_paths,
archive_format,
divisions,
)?;
untar.stop();
info!("{}", untar);

Expand Down Expand Up @@ -783,35 +777,72 @@ pub fn purge_old_snapshot_archives<P: AsRef<Path>>(
}
}

fn unpack_snapshot_local<T: 'static + Read + std::marker::Send, F: Fn() -> T>(
reader: F,
ledger_dir: &Path,
account_paths: &[PathBuf],
parallel_archivers: usize,
) -> Result<UnpackedAppendVecMap> {
assert!(parallel_archivers > 0);
let readers = (0..parallel_archivers)
.into_iter()
.map(|_| reader())
.collect::<Vec<_>>();

// create 'parallel_archivers' # of parallel workers, each responsible for 1/parallel_archivers of all the files to extract.
let all_unpacked_append_vec_map = readers
.into_par_iter()
.enumerate()
.map(|(index, reader)| {
let parallel_selector = Some(ParallelSelector {
index,
divisions: parallel_archivers,
});
let mut archive = Archive::new(reader);
unpack_snapshot(&mut archive, ledger_dir, account_paths, parallel_selector)
})
.collect::<Vec<_>>();
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for h in all_unpacked_append_vec_map {
unpacked_append_vec_map.extend(h?);
}

Ok(unpacked_append_vec_map)
}

fn untar_snapshot_in<P: AsRef<Path>>(
snapshot_tar: P,
unpack_dir: &Path,
account_paths: &[PathBuf],
archive_format: ArchiveFormat,
parallel_selector: Option<ParallelSelector>,
parallel_divisions: usize,
) -> Result<UnpackedAppendVecMap> {
let tar_name = File::open(&snapshot_tar)?;
let open_file = || File::open(&snapshot_tar).unwrap();
let account_paths_map = match archive_format {
ArchiveFormat::TarBzip2 => {
let tar = BzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarGzip => {
let tar = GzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarZstd => {
let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?;
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::Tar => {
let tar = BufReader::new(tar_name);
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarBzip2 => unpack_snapshot_local(
|| BzDecoder::new(BufReader::new(open_file())),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::TarGzip => unpack_snapshot_local(
|| GzDecoder::new(BufReader::new(open_file())),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::TarZstd => unpack_snapshot_local(
|| zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::Tar => unpack_snapshot_local(
|| BufReader::new(open_file()),
unpack_dir,
account_paths,
parallel_divisions,
)?,
};
Ok(account_paths_map)
}
Expand Down Expand Up @@ -929,7 +960,7 @@ pub fn verify_snapshot_archive<P, Q, R>(
unpack_dir,
&[unpack_dir.to_path_buf()],
archive_format,
None,
1,
)
.unwrap();

Expand Down

0 comments on commit ce53b84

Please sign in to comment.