Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add sst-metadata tool to query sst metadata #927

Merged
merged 12 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ build-arm64:
build-with-console:
ls -alh
cd $(DIR); RUSTFLAGS="--cfg tokio_unstable" cargo build --release

test:
cd $(DIR); cargo test --workspace -- --test-threads=4

Expand Down Expand Up @@ -124,4 +125,8 @@ else
dev-setup:
echo "Error: Unsupported OS. Exiting..."
exit 1
endif
endif

fix:
cargo fmt
cargo sort --workspace
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
15 changes: 9 additions & 6 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,9 @@ impl<'a> Reader<'a> {

async fn load_meta_data_from_storage(&self) -> Result<parquet_ext::ParquetMetaDataRef> {
let file_size = self.load_file_size().await?;
let chunk_reader_adapter = ChunkReaderAdapter {
path: self.path,
store: self.store,
};
let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store);

let meta_data =
let (meta_data, _) =
parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter)
.await
.with_context(|| FetchAndDecodeSstMeta {
Expand Down Expand Up @@ -440,11 +437,17 @@ impl AsyncFileReader for ObjectStoreReader {
}
}

struct ChunkReaderAdapter<'a> {
pub struct ChunkReaderAdapter<'a> {
path: &'a Path,
store: &'a ObjectStoreRef,
}

impl<'a> ChunkReaderAdapter<'a> {
pub fn new(path: &'a Path, store: &'a ObjectStoreRef) -> Self {
Self { path, store }
}
}

#[async_trait]
impl<'a> ChunkReader for ChunkReaderAdapter<'a> {
async fn get_bytes(&self, range: Range<usize>) -> GenericResult<Bytes> {
Expand Down
32 changes: 28 additions & 4 deletions analytic_engine/src/sst/parquet/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ trait Filter: fmt::Debug {
/// Serialize the bitmap index to binary array.
fn to_bytes(&self) -> Vec<u8>;

/// Serialized size
fn size(&self) -> usize {
self.to_bytes().len()
}

/// Deserialize the binary array to bitmap index.
fn from_bytes(buf: Vec<u8>) -> Result<Self>
where
Expand Down Expand Up @@ -189,6 +194,13 @@ impl RowGroupFilter {
.as_ref()
.map(|v| v.contains(data))
}

fn size(&self) -> usize {
self.column_filters
.iter()
.map(|cf| cf.as_ref().map(|cf| cf.size()).unwrap_or(0))
.sum()
}
}

#[derive(Debug, Clone, PartialEq, Default)]
Expand All @@ -209,6 +221,10 @@ impl ParquetFilter {
pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn size(&self) -> usize {
self.row_group_filters.iter().map(|f| f.size()).sum()
}
}

impl Index<usize> for ParquetFilter {
Expand Down Expand Up @@ -324,14 +340,22 @@ impl From<ParquetMetaData> for MetaData {

impl fmt::Debug for ParquetMetaData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use common_util::byte::encode;

f.debug_struct("ParquetMetaData")
.field("min_key", &self.min_key)
.field("max_key", &self.max_key)
.field("min_key", &encode(&self.min_key))
.field("max_key", &encode(&self.max_key))
.field("time_range", &self.time_range)
.field("max_sequence", &self.max_sequence)
.field("schema", &self.schema)
// Avoid the messy output from filter.
.field("has_filter", &self.parquet_filter.is_some())
.field(
"filter_size",
&self
.parquet_filter
.as_ref()
.map(|filter| filter.size())
.unwrap_or(0),
)
.field("collapsible_cols_idx", &self.collapsible_cols_idx)
.finish()
}
Expand Down
1 change: 1 addition & 0 deletions common_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ chrono = { workspace = true }
common_types = { workspace = true, features = ["test"] }
crossbeam-utils = "0.8.7"
env_logger = { workspace = true, optional = true }
hex = "0.4.3"
lazy_static = { workspace = true }
libc = "0.2"
log = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions common_util/src/byte.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

pub use hex::encode;
1 change: 1 addition & 0 deletions common_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod macros;

// TODO(yingwen): Move some mod into components as a crate
pub mod alloc_tracker;
pub mod byte;
pub mod codec;
pub mod config;
pub mod error;
Expand Down
12 changes: 9 additions & 3 deletions common_util/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

use std::{
convert::TryInto,
time::{Duration, Instant},
time::{Duration, Instant, UNIX_EPOCH},
};

use chrono::Utc;
use chrono::{DateTime, Utc};

pub trait DurationExt {
/// Convert into u64.
Expand Down Expand Up @@ -47,7 +47,7 @@ impl InstantExt for Instant {

#[inline]
pub fn secs_to_nanos(s: u64) -> u64 {
s * 1000000000
s * 1_000_000_000
}

#[inline]
Expand All @@ -60,6 +60,12 @@ pub fn current_as_rfc3339() -> String {
Utc::now().to_rfc3339()
}

#[inline]
pub fn format_as_ymdhms(unix_timestamp: i64) -> String {
let dt = DateTime::<Utc>::from(UNIX_EPOCH + Duration::from_millis(unix_timestamp as u64));
dt.format("%Y-%m-%d %H:%M:%S").to_string()
}

#[cfg(test)]
mod tests {
use std::thread;
Expand Down
4 changes: 2 additions & 2 deletions components/parquet_ext/src/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait ChunkReader: Sync + Send {
pub async fn fetch_parquet_metadata(
file_size: usize,
file_reader: &dyn ChunkReader,
) -> Result<ParquetMetaData> {
) -> Result<(ParquetMetaData, usize)> {
const FOOTER_LEN: usize = 8;

if file_size < FOOTER_LEN {
Expand Down Expand Up @@ -63,5 +63,5 @@ pub async fn fetch_parquet_metadata(
ParquetError::General(err_msg)
})?;

footer::decode_metadata(&metadata_bytes)
footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len))
}
1 change: 1 addition & 0 deletions tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ common_types = { workspace = true }
common_util = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
num_cpus = "1.15.0"
object_store = { workspace = true }
parquet = { workspace = true }
parquet_ext = { workspace = true }
Expand Down
148 changes: 148 additions & 0 deletions tools/src/bin/sst-metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! A cli to query sst meta data

use std::sync::Arc;

use analytic_engine::sst::{meta_data::cache::MetaData, parquet::async_reader::ChunkReaderAdapter};
use anyhow::{Context, Result};
use clap::Parser;
use common_util::{
runtime::{self, Runtime},
time::format_as_ymdhms,
};
use futures::StreamExt;
use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path};
use parquet_ext::meta_data::fetch_parquet_metadata;
use tokio::{runtime::Handle, task::JoinSet};

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// SST directory
#[clap(short, long, required(true))]
dir: String,

/// Verbose print
#[clap(short, long, required(false))]
verbose: bool,

/// Thread num, 0 means cpu num
#[clap(short, long, default_value_t = 0)]
threads: usize,
}

fn new_runtime(thread_num: usize) -> Runtime {
runtime::Builder::default()
.thread_name("sst-metadata")
.worker_threads(thread_num)
.enable_all()
.build()
.unwrap()
}

fn main() {
let args = Args::parse();
let thread_num = if args.threads == 0 {
num_cpus::get()
} else {
args.threads
};
let rt = Arc::new(new_runtime(thread_num));
rt.block_on(async move {
if let Err(e) = run(args).await {
eprintln!("Run failed, err:{e}");
}
});
}

async fn run(args: Args) -> Result<()> {
let handle = Handle::current();
let storage = LocalFileSystem::new_with_prefix(&args.dir)?;
let storage: ObjectStoreRef = Arc::new(storage);

let mut join_set = JoinSet::new();
let mut ssts = storage.list(None).await?;
while let Some(object_meta) = ssts.next().await {
let object_meta = object_meta?;
let storage = storage.clone();
let location = object_meta.location.clone();
join_set.spawn_on(
async move {
let (metadata, metadata_size, kv_size) =
parse_metadata(storage, location, object_meta.size).await?;
Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size))
},
&handle,
);
}

let mut metas = Vec::with_capacity(join_set.len());
while let Some(meta) = join_set.join_next().await {
let meta = meta.context("join err")?;
let meta = meta.context("parse metadata err")?;
metas.push(meta);
}

// sort by time_range asc
metas.sort_by(|a, b| {
a.1.custom()
.time_range
.inclusive_start()
.cmp(&b.1.custom().time_range.inclusive_start())
});

for (object_meta, sst_metadata, metadata_size, kv_size) in metas {
let ObjectMeta { location, size, .. } = &object_meta;
let custom_meta = sst_metadata.custom();
let parquet_meta = sst_metadata.parquet();
let time_range = custom_meta.time_range;
let start = format_as_ymdhms(time_range.inclusive_start().as_i64());
let end = format_as_ymdhms(time_range.exclusive_end().as_i64());
let seq = custom_meta.max_sequence;
let filter_size = custom_meta
.parquet_filter
.as_ref()
.map(|f| f.size())
.unwrap_or(0);
let file_metadata = parquet_meta.file_metadata();
let row_num = file_metadata.num_rows();
if args.verbose {
println!("object_meta:{object_meta:?}, parquet_meta:{parquet_meta:?}");
} else {
let size_mb = as_mb(*size);
let metadata_mb = as_mb(metadata_size);
let filter_mb = as_mb(filter_size);
let kv_mb = as_mb(kv_size);
println!(
"Location:{location}, time_range:[{start}, {end}), max_seq:{seq}, size:{size_mb:.3}M, metadata:{metadata_mb:.3}M, kv:{kv_mb:.3}M, filter:{filter_mb:.3}M, row_num:{row_num}"
);
}
}

Ok(())
}

fn as_mb(v: usize) -> f64 {
v as f64 / 1024.0 / 1024.0
}

async fn parse_metadata(
storage: ObjectStoreRef,
path: Path,
size: usize,
) -> Result<(MetaData, usize, usize)> {
let reader = ChunkReaderAdapter::new(&path, &storage);
let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?;
let kv_metadata = parquet_metadata.file_metadata().key_value_metadata();
let kv_size = kv_metadata
.map(|kvs| {
kvs.iter()
.map(|kv| kv.key.as_bytes().len() + kv.value.as_ref().map(|v| v.len()).unwrap_or(0))
.sum()
})
.unwrap_or(0);

let md = MetaData::try_new(&parquet_metadata, false)?;
Ok((md, metadata_size, kv_size))
}