Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

refactor untar_snapshot_in to push parallelism deeper for further refactoring #18310

Merged
merged 1 commit into from
Jun 29, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 74 additions & 43 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,9 @@ pub struct BankFromArchiveTimings {
pub verify_snapshot_bank_us: u64,
}

// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
pub const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4;

#[allow(clippy::too_many_arguments)]
pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
account_paths: &[PathBuf],
Expand All @@ -620,26 +623,17 @@ pub fn bank_from_archive<P: AsRef<Path> + std::marker::Sync>(
.tempdir_in(snapshot_path)?;

let mut untar = Measure::start("snapshot untar");
// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later.
let divisions = std::cmp::min(4, std::cmp::max(1, num_cpus::get() / 4));
// create 'divisions' # of parallel workers, each responsible for 1/divisions of all the files to extract.
let all_unpacked_append_vec_map = (0..divisions)
.into_par_iter()
.map(|index| {
untar_snapshot_in(
&snapshot_tar,
unpack_dir.as_ref(),
account_paths,
archive_format,
Some(ParallelSelector { index, divisions }),
)
})
.collect::<Vec<_>>();
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for h in all_unpacked_append_vec_map {
unpacked_append_vec_map.extend(h?);
}

let divisions = std::cmp::min(
PARALLEL_UNTAR_READERS_DEFAULT,
std::cmp::max(1, num_cpus::get() / 4),
);
let unpacked_append_vec_map = untar_snapshot_in(
&snapshot_tar,
unpack_dir.as_ref(),
account_paths,
archive_format,
divisions,
)?;
untar.stop();
info!("{}", untar);

Expand Down Expand Up @@ -783,35 +777,72 @@ pub fn purge_old_snapshot_archives<P: AsRef<Path>>(
}
}

fn unpack_snapshot_local<T: 'static + Read + std::marker::Send, F: Fn() -> T>(
reader: F,
ledger_dir: &Path,
account_paths: &[PathBuf],
parallel_archivers: usize,
) -> Result<UnpackedAppendVecMap> {
assert!(parallel_archivers > 0);
let readers = (0..parallel_archivers)
.into_iter()
.map(|_| reader())
.collect::<Vec<_>>();

// create 'parallel_archivers' # of parallel workers, each responsible for 1/parallel_archivers of all the files to extract.
let all_unpacked_append_vec_map = readers
.into_par_iter()
.enumerate()
.map(|(index, reader)| {
let parallel_selector = Some(ParallelSelector {
index,
divisions: parallel_archivers,
});
let mut archive = Archive::new(reader);
unpack_snapshot(&mut archive, ledger_dir, account_paths, parallel_selector)
})
.collect::<Vec<_>>();
let mut unpacked_append_vec_map = UnpackedAppendVecMap::new();
for h in all_unpacked_append_vec_map {
unpacked_append_vec_map.extend(h?);
}

Ok(unpacked_append_vec_map)
}

fn untar_snapshot_in<P: AsRef<Path>>(
snapshot_tar: P,
unpack_dir: &Path,
account_paths: &[PathBuf],
archive_format: ArchiveFormat,
parallel_selector: Option<ParallelSelector>,
parallel_divisions: usize,
) -> Result<UnpackedAppendVecMap> {
let tar_name = File::open(&snapshot_tar)?;
let open_file = || File::open(&snapshot_tar).unwrap();
let account_paths_map = match archive_format {
ArchiveFormat::TarBzip2 => {
let tar = BzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarGzip => {
let tar = GzDecoder::new(BufReader::new(tar_name));
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarZstd => {
let tar = zstd::stream::read::Decoder::new(BufReader::new(tar_name))?;
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::Tar => {
let tar = BufReader::new(tar_name);
let mut archive = Archive::new(tar);
unpack_snapshot(&mut archive, unpack_dir, account_paths, parallel_selector)?
}
ArchiveFormat::TarBzip2 => unpack_snapshot_local(
|| BzDecoder::new(BufReader::new(open_file())),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::TarGzip => unpack_snapshot_local(
|| GzDecoder::new(BufReader::new(open_file())),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::TarZstd => unpack_snapshot_local(
|| zstd::stream::read::Decoder::new(BufReader::new(open_file())).unwrap(),
unpack_dir,
account_paths,
parallel_divisions,
)?,
ArchiveFormat::Tar => unpack_snapshot_local(
|| BufReader::new(open_file()),
unpack_dir,
account_paths,
parallel_divisions,
)?,
};
Ok(account_paths_map)
}
Expand Down Expand Up @@ -929,7 +960,7 @@ pub fn verify_snapshot_archive<P, Q, R>(
unpack_dir,
&[unpack_dir.to_path_buf()],
archive_format,
None,
1,
)
.unwrap();

Expand Down