Skip to content

Commit

Permalink
feat: add sst-metadata tool to query sst metadata (#927)
Browse files Browse the repository at this point in the history
## Related Issues
Closes #

When debug compaction related issue, it's helpful to check sst metadata.

## Detailed Changes
- Add new cli tool `sst-metadata`.

## Test Plan 
Manually
```
$ ./target/release/sst-metadata --dir ~/bench/data/store/2/2199023255817/

Location:2/2199023255817/51491.sst, time_range:[2022-09-05 10:00:00, 2022-09-05 12:00:00), max_seq:305309, size:440.000M, metadata:55.219M, kv:51.565M, filter:38.470M, row_num:14480000
Location:2/2199023255817/53873.sst, time_range:[2022-09-05 10:00:00, 2022-09-05 12:00:00), max_seq:319320, size:666.969M, metadata:83.759M, kv:78.198M, filter:58.342M, row_num:21960000
Location:2/2199023255817/51538.sst, time_range:[2022-09-05 10:00:00, 2022-09-05 12:00:00), max_seq:305001, size:1161.511M, metadata:145.723M, kv:136.025M, filter:101.489M, row_num:38200000
Location:2/2199023255817/53269.sst, time_range:[2022-09-05 10:00:00, 2022-09-05 12:00:00), max_seq:315057, size:1176.751M, metadata:147.631M, kv:137.805M, filter:102.817M, row_num:38700000
Location:2/2199023255817/53973.sst, time_range:[2022-09-05 10:00:00, 2022-09-05 12:00:00), max_seq:320198, size:1183.643M, metadata:148.508M, kv:138.623M, filter:103.428M, row_num:38930000


```
  • Loading branch information
jiacai2050 authored May 26, 2023
1 parent 6303f4c commit 7692507
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 16 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ build-arm64:
build-with-console:
ls -alh
cd $(DIR); RUSTFLAGS="--cfg tokio_unstable" cargo build --release

test:
cd $(DIR); cargo test --workspace -- --test-threads=4

Expand Down Expand Up @@ -124,4 +125,8 @@ else
dev-setup:
echo "Error: Unsupported OS. Exiting..."
exit 1
endif
endif

fix:
cargo fmt
cargo sort --workspace
15 changes: 9 additions & 6 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,9 @@ impl<'a> Reader<'a> {

async fn load_meta_data_from_storage(&self) -> Result<parquet_ext::ParquetMetaDataRef> {
let file_size = self.load_file_size().await?;
let chunk_reader_adapter = ChunkReaderAdapter {
path: self.path,
store: self.store,
};
let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store);

let meta_data =
let (meta_data, _) =
parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter)
.await
.with_context(|| FetchAndDecodeSstMeta {
Expand Down Expand Up @@ -440,11 +437,17 @@ impl AsyncFileReader for ObjectStoreReader {
}
}

struct ChunkReaderAdapter<'a> {
pub struct ChunkReaderAdapter<'a> {
path: &'a Path,
store: &'a ObjectStoreRef,
}

impl<'a> ChunkReaderAdapter<'a> {
pub fn new(path: &'a Path, store: &'a ObjectStoreRef) -> Self {
Self { path, store }
}
}

#[async_trait]
impl<'a> ChunkReader for ChunkReaderAdapter<'a> {
async fn get_bytes(&self, range: Range<usize>) -> GenericResult<Bytes> {
Expand Down
32 changes: 28 additions & 4 deletions analytic_engine/src/sst/parquet/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ trait Filter: fmt::Debug {
/// Serialize the bitmap index to binary array.
fn to_bytes(&self) -> Vec<u8>;

/// Serialized size
fn size(&self) -> usize {
self.to_bytes().len()
}

/// Deserialize the binary array to bitmap index.
fn from_bytes(buf: Vec<u8>) -> Result<Self>
where
Expand Down Expand Up @@ -189,6 +194,13 @@ impl RowGroupFilter {
.as_ref()
.map(|v| v.contains(data))
}

fn size(&self) -> usize {
self.column_filters
.iter()
.map(|cf| cf.as_ref().map(|cf| cf.size()).unwrap_or(0))
.sum()
}
}

#[derive(Debug, Clone, PartialEq, Default)]
Expand All @@ -209,6 +221,10 @@ impl ParquetFilter {
pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn size(&self) -> usize {
self.row_group_filters.iter().map(|f| f.size()).sum()
}
}

impl Index<usize> for ParquetFilter {
Expand Down Expand Up @@ -324,14 +340,22 @@ impl From<ParquetMetaData> for MetaData {

impl fmt::Debug for ParquetMetaData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use common_util::byte::encode;

f.debug_struct("ParquetMetaData")
.field("min_key", &self.min_key)
.field("max_key", &self.max_key)
.field("min_key", &encode(&self.min_key))
.field("max_key", &encode(&self.max_key))
.field("time_range", &self.time_range)
.field("max_sequence", &self.max_sequence)
.field("schema", &self.schema)
// Avoid the messy output from filter.
.field("has_filter", &self.parquet_filter.is_some())
.field(
"filter_size",
&self
.parquet_filter
.as_ref()
.map(|filter| filter.size())
.unwrap_or(0),
)
.field("collapsible_cols_idx", &self.collapsible_cols_idx)
.finish()
}
Expand Down
1 change: 1 addition & 0 deletions common_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ chrono = { workspace = true }
common_types = { workspace = true, features = ["test"] }
crossbeam-utils = "0.8.7"
env_logger = { workspace = true, optional = true }
hex = "0.4.3"
lazy_static = { workspace = true }
libc = "0.2"
log = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions common_util/src/byte.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

pub use hex::encode;
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod macros;

// TODO(yingwen): Move some mod into components as a crate
pub mod alloc_tracker;
pub mod byte;
pub mod codec;
pub mod config;
pub mod error;
Expand Down
12 changes: 9 additions & 3 deletions common_util/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

use std::{
convert::TryInto,
time::{Duration, Instant},
time::{Duration, Instant, UNIX_EPOCH},
};

use chrono::Utc;
use chrono::{DateTime, Utc};

pub trait DurationExt {
/// Convert into u64.
Expand Down Expand Up @@ -47,7 +47,7 @@ impl InstantExt for Instant {

#[inline]
pub fn secs_to_nanos(s: u64) -> u64 {
s * 1000000000
s * 1_000_000_000
}

#[inline]
Expand All @@ -60,6 +60,12 @@ pub fn current_as_rfc3339() -> String {
Utc::now().to_rfc3339()
}

#[inline]
pub fn format_as_ymdhms(unix_timestamp: i64) -> String {
let dt = DateTime::<Utc>::from(UNIX_EPOCH + Duration::from_millis(unix_timestamp as u64));
dt.format("%Y-%m-%d %H:%M:%S").to_string()
}

#[cfg(test)]
mod tests {
use std::thread;
Expand Down
4 changes: 2 additions & 2 deletions components/parquet_ext/src/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait ChunkReader: Sync + Send {
pub async fn fetch_parquet_metadata(
file_size: usize,
file_reader: &dyn ChunkReader,
) -> Result<ParquetMetaData> {
) -> Result<(ParquetMetaData, usize)> {
const FOOTER_LEN: usize = 8;

if file_size < FOOTER_LEN {
Expand Down Expand Up @@ -63,5 +63,5 @@ pub async fn fetch_parquet_metadata(
ParquetError::General(err_msg)
})?;

footer::decode_metadata(&metadata_bytes)
footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len))
}
1 change: 1 addition & 0 deletions tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ common_types = { workspace = true }
common_util = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
num_cpus = "1.15.0"
object_store = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
Expand Down
148 changes: 148 additions & 0 deletions tools/src/bin/sst-metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! A cli to query sst meta data
use std::sync::Arc;

use analytic_engine::sst::{meta_data::cache::MetaData, parquet::async_reader::ChunkReaderAdapter};
use anyhow::{Context, Result};
use clap::Parser;
use common_util::{
runtime::{self, Runtime},
time::format_as_ymdhms,
};
use futures::StreamExt;
use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path};
use parquet_ext::meta_data::fetch_parquet_metadata;
use tokio::{runtime::Handle, task::JoinSet};

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// SST directory
#[clap(short, long, required(true))]
dir: String,

/// Verbose print
#[clap(short, long, required(false))]
verbose: bool,

/// Thread num, 0 means cpu num
#[clap(short, long, default_value_t = 0)]
threads: usize,
}

fn new_runtime(thread_num: usize) -> Runtime {
runtime::Builder::default()
.thread_name("sst-metadata")
.worker_threads(thread_num)
.enable_all()
.build()
.unwrap()
}

fn main() {
let args = Args::parse();
let thread_num = if args.threads == 0 {
num_cpus::get()
} else {
args.threads
};
let rt = Arc::new(new_runtime(thread_num));
rt.block_on(async move {
if let Err(e) = run(args).await {
eprintln!("Run failed, err:{e}");
}
});
}

async fn run(args: Args) -> Result<()> {
let handle = Handle::current();
let storage = LocalFileSystem::new_with_prefix(&args.dir)?;
let storage: ObjectStoreRef = Arc::new(storage);

let mut join_set = JoinSet::new();
let mut ssts = storage.list(None).await?;
while let Some(object_meta) = ssts.next().await {
let object_meta = object_meta?;
let storage = storage.clone();
let location = object_meta.location.clone();
join_set.spawn_on(
async move {
let (metadata, metadata_size, kv_size) =
parse_metadata(storage, location, object_meta.size).await?;
Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size))
},
&handle,
);
}

let mut metas = Vec::with_capacity(join_set.len());
while let Some(meta) = join_set.join_next().await {
let meta = meta.context("join err")?;
let meta = meta.context("parse metadata err")?;
metas.push(meta);
}

// sort by time_range asc
metas.sort_by(|a, b| {
a.1.custom()
.time_range
.inclusive_start()
.cmp(&b.1.custom().time_range.inclusive_start())
});

for (object_meta, sst_metadata, metadata_size, kv_size) in metas {
let ObjectMeta { location, size, .. } = &object_meta;
let custom_meta = sst_metadata.custom();
let parquet_meta = sst_metadata.parquet();
let time_range = custom_meta.time_range;
let start = format_as_ymdhms(time_range.inclusive_start().as_i64());
let end = format_as_ymdhms(time_range.exclusive_end().as_i64());
let seq = custom_meta.max_sequence;
let filter_size = custom_meta
.parquet_filter
.as_ref()
.map(|f| f.size())
.unwrap_or(0);
let file_metadata = parquet_meta.file_metadata();
let row_num = file_metadata.num_rows();
if args.verbose {
println!("object_meta:{object_meta:?}, parquet_meta:{parquet_meta:?}");
} else {
let size_mb = as_mb(*size);
let metadata_mb = as_mb(metadata_size);
let filter_mb = as_mb(filter_size);
let kv_mb = as_mb(kv_size);
println!(
"Location:{location}, time_range:[{start}, {end}), max_seq:{seq}, size:{size_mb:.3}M, metadata:{metadata_mb:.3}M, kv:{kv_mb:.3}M, filter:{filter_mb:.3}M, row_num:{row_num}"
);
}
}

Ok(())
}

fn as_mb(v: usize) -> f64 {
v as f64 / 1024.0 / 1024.0
}

async fn parse_metadata(
storage: ObjectStoreRef,
path: Path,
size: usize,
) -> Result<(MetaData, usize, usize)> {
let reader = ChunkReaderAdapter::new(&path, &storage);
let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?;
let kv_metadata = parquet_metadata.file_metadata().key_value_metadata();
let kv_size = kv_metadata
.map(|kvs| {
kvs.iter()
.map(|kv| kv.key.as_bytes().len() + kv.value.as_ref().map(|v| v.len()).unwrap_or(0))
.sum()
})
.unwrap_or(0);

let md = MetaData::try_new(&parquet_metadata, false)?;
Ok((md, metadata_size, kv_size))
}

0 comments on commit 7692507

Please sign in to comment.