From ff7edcd5bf9ea5af3eff56876cdcdfa5c9ae098b Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Wed, 24 May 2023 22:06:40 +0800 Subject: [PATCH] decode metadata concurrently --- Cargo.lock | 1 + tools/Cargo.toml | 1 + tools/src/bin/sst-metadata.rs | 63 ++++++++++++++++++++++++----------- 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 502de2e6dd..cb044b2ee0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6929,6 +6929,7 @@ dependencies = [ "common_util", "env_logger", "futures 0.3.28", + "num_cpus", "object_store 1.2.0", "parquet", "parquet_ext", diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 9d66b723fd..980fe16e58 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -18,6 +18,7 @@ common_types = { workspace = true } common_util = { workspace = true } env_logger = { workspace = true } futures = { workspace = true } +num_cpus = "1.15.0" object_store = { workspace = true } parquet = { workspace = true } parquet_ext = { workspace = true } diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index 1419bfbafb..d6015315ff 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -8,7 +8,7 @@ use analytic_engine::sst::{ meta_data::cache::MetaData, parquet::{async_reader::ChunkReaderAdapter, meta_data::ParquetMetaDataRef}, }; -use anyhow::Result; +use anyhow::{Context, Result}; use clap::Parser; use common_util::{ runtime::{self, Runtime}, @@ -17,6 +17,7 @@ use common_util::{ use futures::StreamExt; use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path}; use parquet_ext::meta_data::fetch_parquet_metadata; +use tokio::{runtime::Handle, task::JoinSet}; #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] @@ -28,11 +29,15 @@ struct Args { /// Sst directory(relative to store_path) #[clap(short, long, required(true))] dir: String, + + /// Verbose print + #[clap(short, long, required(false))] + verbose: bool, } fn new_runtime(thread_num: usize) -> Runtime { runtime::Builder::default() - .thread_name("tools") + .thread_name("sst-metadata") .worker_threads(thread_num) .enable_all() .build() @@ -41,7 +46,7 @@ fn new_runtime(thread_num: usize) -> Runtime { fn main() { let args = Args::parse(); - let rt = Arc::new(new_runtime(2)); + let rt = Arc::new(new_runtime(num_cpus::get())); rt.block_on(async move { if let Err(e) = run(args).await { eprintln!("Run failed, err:{e}"); @@ -50,46 +55,66 @@ fn main() { } async fn run(args: Args) -> Result<()> { + let handle = Handle::current(); let storage = LocalFileSystem::new_with_prefix(args.store_path)?; let storage: ObjectStoreRef = Arc::new(storage); let prefix_path = Path::parse(args.dir)?; - let mut ssts = storage.list(Some(&prefix_path)).await?; - let mut metas = Vec::new(); + let mut join_set = JoinSet::new(); + let mut ssts = storage.list(Some(&prefix_path)).await?; while let Some(object_meta) = ssts.next().await { let object_meta = object_meta?; - let md = parse_metadata(&storage, &object_meta.location, object_meta.size).await?; - metas.push((object_meta, md)); + let storage = storage.clone(); + let location = object_meta.location.clone(); + join_set.spawn_on( + async move { + let md = parse_metadata(storage, location, object_meta.size) + .await + .context("parse metadata")?; + Ok::<_, anyhow::Error>((object_meta, md)) + }, + &handle, + ); + } + + let mut metas = Vec::new(); + while let Some(res) = join_set.join_next().await { + let res = res.context("join err")?; + let res = res.context("parse metadata err")?; + metas.push(res); } // sort by time_range asc - metas.sort_unstable_by(|a, b| { + metas.sort_by(|a, b| { a.1.time_range .inclusive_start() .cmp(&b.1.time_range.inclusive_start()) }); - for (object_meta, md) in metas { - let ObjectMeta { location, size, .. } = object_meta; - let time_range = md.time_range; + for (object_meta, parquet_meta) in metas { + let ObjectMeta { location, size, .. } = &object_meta; + let time_range = parquet_meta.time_range; let start = format_as_ymdhms(time_range.inclusive_start().as_i64()); let end = format_as_ymdhms(time_range.exclusive_end().as_i64()); - let seq = md.max_sequence; - println!( - "Location:{location}, time_range:[{start}, {end}), size:{size}, - max_seq:{seq}" - ); + let seq = parquet_meta.max_sequence; + if args.verbose { + println!("object_meta:{object_meta:?}, md:{md:?}, time_range::[{start}, {end})"); + } else { + println!( + "Location:{location}, time_range:[{start}, {end}), size:{size}, max_seq:{seq}" + ); + } } Ok(()) } async fn parse_metadata( - storage: &ObjectStoreRef, - path: &Path, + storage: ObjectStoreRef, + path: Path, size: usize, ) -> Result { - let reader = ChunkReaderAdapter::new(path, storage); + let reader = ChunkReaderAdapter::new(&path, &storage); let parquet_meta_data = fetch_parquet_metadata(size, &reader).await?; let md = MetaData::try_new(&parquet_meta_data, true)?;