Skip to content

Commit

Permalink
decode metadata concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed May 24, 2023
1 parent d1ce062 commit 3adc7cf
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ common_types = { workspace = true }
common_util = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
num_cpus = "1.15.0"
object_store = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
Expand Down
63 changes: 44 additions & 19 deletions tools/src/bin/sst-metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use analytic_engine::sst::{
meta_data::cache::MetaData,
parquet::{async_reader::ChunkReaderAdapter, meta_data::ParquetMetaDataRef},
};
use anyhow::Result;
use anyhow::{Context, Result};
use clap::Parser;
use common_util::{
runtime::{self, Runtime},
Expand All @@ -17,6 +17,7 @@ use common_util::{
use futures::StreamExt;
use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path};
use parquet_ext::meta_data::fetch_parquet_metadata;
use tokio::{runtime::Handle, task::JoinSet};

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
Expand All @@ -28,11 +29,15 @@ struct Args {
/// Sst directory(relative to store_path)
#[clap(short, long, required(true))]
dir: String,

/// Verbose print
#[clap(short, long, required(false))]
verbose: bool,
}

fn new_runtime(thread_num: usize) -> Runtime {
runtime::Builder::default()
.thread_name("tools")
.thread_name("sst-metadata")
.worker_threads(thread_num)
.enable_all()
.build()
Expand All @@ -41,7 +46,7 @@ fn new_runtime(thread_num: usize) -> Runtime {

fn main() {
let args = Args::parse();
let rt = Arc::new(new_runtime(2));
let rt = Arc::new(new_runtime(num_cpus::get()));
rt.block_on(async move {
if let Err(e) = run(args).await {
eprintln!("Run failed, err:{e}");
Expand All @@ -50,46 +55,66 @@ fn main() {
}

async fn run(args: Args) -> Result<()> {
let handle = Handle::current();
let storage = LocalFileSystem::new_with_prefix(args.store_path)?;
let storage: ObjectStoreRef = Arc::new(storage);
let prefix_path = Path::parse(args.dir)?;
let mut ssts = storage.list(Some(&prefix_path)).await?;

let mut metas = Vec::new();
let mut join_set = JoinSet::new();
let mut ssts = storage.list(Some(&prefix_path)).await?;
while let Some(object_meta) = ssts.next().await {
let object_meta = object_meta?;
let md = parse_metadata(&storage, &object_meta.location, object_meta.size).await?;
metas.push((object_meta, md));
let storage = storage.clone();
let location = object_meta.location.clone();
join_set.spawn_on(
async move {
let md = parse_metadata(storage, location, object_meta.size)
.await
.context("parse metadata")?;
Ok::<_, anyhow::Error>((object_meta, md))
},
&handle,
);
}

let mut metas = Vec::new();
while let Some(res) = join_set.join_next().await {
let res = res.context("join err")?;
let res = res.context("parse metadata err")?;
metas.push(res);
}

// sort by time_range asc
metas.sort_unstable_by(|a, b| {
metas.sort_by(|a, b| {
a.1.time_range
.inclusive_start()
.cmp(&b.1.time_range.inclusive_start())
});

for (object_meta, md) in metas {
let ObjectMeta { location, size, .. } = object_meta;
let time_range = md.time_range;
for (object_meta, parquet_meta) in metas {
let ObjectMeta { location, size, .. } = &object_meta;
let time_range = parquet_meta.time_range;
let start = format_as_ymdhms(time_range.inclusive_start().as_i64());
let end = format_as_ymdhms(time_range.exclusive_end().as_i64());
let seq = md.max_sequence;
println!(
"Location:{location}, time_range:[{start}, {end}), size:{size},
max_seq:{seq}"
);
let seq = parquet_meta.max_sequence;
if args.verbose {
println!("object_meta:{object_meta:?}, parquet_meta:{parquet_meta:?}, time_range::[{start}, {end})");
} else {
println!(
"Location:{location}, time_range:[{start}, {end}), size:{size}, max_seq:{seq}"
);
}
}

Ok(())
}

async fn parse_metadata(
storage: &ObjectStoreRef,
path: &Path,
storage: ObjectStoreRef,
path: Path,
size: usize,
) -> Result<ParquetMetaDataRef> {
let reader = ChunkReaderAdapter::new(path, storage);
let reader = ChunkReaderAdapter::new(&path, &storage);
let parquet_meta_data = fetch_parquet_metadata(size, &reader).await?;

let md = MetaData::try_new(&parquet_meta_data, true)?;
Expand Down

0 comments on commit 3adc7cf

Please sign in to comment.