Skip to content

Commit

Permalink
feat: separate metadata from parquet's kv_metadata (#1120)
Browse files Browse the repository at this point in the history
## Rationale
Close #955
see title
## Detailed Changes
- Introduce another independent file to store metadata of sst

## Test Plan
- UT
  - `test_parquet_build_and_read` tests write and read.
  - `test_arrow_meta_data` tests compatible with older version.
- Manually
  - Upgrade from old deployments
  - Start a new deploy, and run tsbs

---------

Co-authored-by: jiacai2050 <dev@liujiacai.net>
  • Loading branch information
tanruixiang and jiacai2050 authored Aug 24, 2023
1 parent e091f5b commit fc0f314
Show file tree
Hide file tree
Showing 20 changed files with 466 additions and 100 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.10"
ceresdbproto = "1.0.11"
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ mod tests {
row_num: 0,
max_seq: 0,
storage_format: StorageFormat::default(),
associated_files: Vec::new(),
};
let queue = FilePurgeQueue::new(1, 1.into(), tx.clone());
FileHandle::new(file_meta, queue)
Expand All @@ -893,6 +894,7 @@ mod tests {
row_num: 0,
max_seq,
storage_format: StorageFormat::default(),
associated_files: Vec::new(),
};
let queue = FilePurgeQueue::new(1, 1.into(), tx.clone());
FileHandle::new(file_meta, queue)
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,6 @@ impl FlushTask {
.context(AllocFileId)?;

let sst_file_path = self.table_data.set_sst_file_path(file_id);

// TODO: `min_key` & `max_key` should be figured out when writing sst.
let sst_meta = MetaData {
min_key: min_key.clone(),
Expand Down Expand Up @@ -583,6 +582,7 @@ impl FlushTask {
time_range: sst_meta.time_range,
max_seq: sst_meta.max_sequence,
storage_format: sst_info.storage_format,
associated_files: vec![sst_info.meta_path],
},
})
}
Expand Down Expand Up @@ -621,7 +621,6 @@ impl FlushTask {
.context(AllocFileId)?;

let sst_file_path = self.table_data.set_sst_file_path(file_id);

let storage_format_hint = self.table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
Expand Down Expand Up @@ -665,6 +664,7 @@ impl FlushTask {
time_range: memtable_state.time_range,
max_seq: memtable_state.last_sequence(),
storage_format: sst_info.storage_format,
associated_files: vec![sst_info.meta_path],
}))
}
}
Expand Down Expand Up @@ -851,6 +851,7 @@ impl SpaceStore {
.fetch_metas(&input.files)
.await
.context(ReadSstMeta)?;

MetaData::merge(sst_metas.into_iter().map(MetaData::from), schema)
};

Expand All @@ -861,7 +862,6 @@ impl SpaceStore {
.context(AllocFileId)?;

let sst_file_path = table_data.set_sst_file_path(file_id);

let mut sst_writer = self
.sst_factory
.create_writer(
Expand Down Expand Up @@ -926,6 +926,7 @@ impl SpaceStore {
max_seq: sst_meta.max_sequence,
time_range: sst_meta.time_range,
storage_format: sst_info.storage_format,
associated_files: vec![sst_info.meta_path],
},
});

Expand Down
39 changes: 28 additions & 11 deletions analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use common_types::{
time::{TimeRange, Timestamp},
SequenceNumber,
};
use future_ext::{retry_async, RetryConfig};
use log::{error, info, warn};
use macros::define_result;
use metric_ext::Meter;
use object_store::ObjectStoreRef;
use object_store::{ObjectStoreRef, Path};
use runtime::{JoinHandle, Runtime};
use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
Expand Down Expand Up @@ -303,7 +305,7 @@ impl Drop for FileHandleInner {
info!("FileHandle is dropped, meta:{:?}", self.meta);

// Push file cannot block or be async because we are in drop().
self.purge_queue.push_file(self.meta.id);
self.purge_queue.push_file(&self.meta);
}
}

Expand Down Expand Up @@ -441,6 +443,8 @@ pub struct FileMeta {
pub max_seq: u64,
/// The format of the file.
pub storage_format: StorageFormat,
/// Associated files, such as: meta_path
pub associated_files: Vec<String>,
}

impl FileMeta {
Expand Down Expand Up @@ -475,9 +479,9 @@ impl FilePurgeQueue {
self.inner.closed.store(true, Ordering::SeqCst);
}

fn push_file(&self, file_id: FileId) {
fn push_file(&self, file_meta: &FileMeta) {
if self.inner.closed.load(Ordering::SeqCst) {
warn!("Purger closed, ignore file_id:{file_id}");
warn!("Purger closed, ignore file_id:{}", file_meta.id);
return;
}

Expand All @@ -486,7 +490,8 @@ impl FilePurgeQueue {
let request = FilePurgeRequest {
space_id: self.inner.space_id,
table_id: self.inner.table_id,
file_id,
file_id: file_meta.id,
associated_files: file_meta.associated_files.clone(),
};

if let Err(send_res) = self.inner.sender.send(Request::Purge(request)) {
Expand All @@ -510,6 +515,7 @@ pub struct FilePurgeRequest {
space_id: SpaceId,
table_id: TableId,
file_id: FileId,
associated_files: Vec<String>,
}

#[derive(Debug)]
Expand All @@ -525,6 +531,11 @@ pub struct FilePurger {
}

impl FilePurger {
const RETRY_CONFIG: RetryConfig = RetryConfig {
max_retries: 3,
interval: Duration::from_millis(500),
};

pub fn start(runtime: &Runtime, store: ObjectStoreRef) -> Self {
// We must use unbound channel, so the sender wont block when the handle is
// dropped.
Expand Down Expand Up @@ -561,6 +572,13 @@ impl FilePurger {
FilePurgeQueue::new(space_id, table_id, self.sender.clone())
}

// TODO: currently we ignore errors when delete.
async fn delete_file(store: &ObjectStoreRef, path: &Path) {
if let Err(e) = retry_async(|| store.delete(path), &Self::RETRY_CONFIG).await {
error!("File purger failed to delete file, path:{path}, err:{e}");
}
}

async fn purge_file_loop(store: ObjectStoreRef, mut receiver: UnboundedReceiver<Request>) {
info!("File purger start");

Expand All @@ -579,13 +597,12 @@ impl FilePurger {
sst_file_path.to_string()
);

if let Err(e) = store.delete(&sst_file_path).await {
error!(
"File purger failed to delete file, sst_file_path:{}, err:{}",
sst_file_path.to_string(),
e
);
for path in purge_request.associated_files {
let path = Path::from(path);
Self::delete_file(&store, &path).await;
}

Self::delete_file(&store, &sst_file_path).await;
}
Request::Exit => break,
}
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub mod tests {
time_range: sst_meta.time_range(),
max_seq: sst_meta.max_sequence(),
storage_format: StorageFormat::Columnar,
associated_files: Vec::new(),
},
);
}
Expand Down
59 changes: 36 additions & 23 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ use std::{
};

use lru::LruCache;
use parquet::file::metadata::FileMetaData;
use snafu::{ensure, OptionExt, ResultExt};
use object_store::{ObjectStoreRef, Path};
use parquet::{file::metadata::FileMetaData, format::KeyValue};
use snafu::{ensure, OptionExt};

use crate::sst::{
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
meta_data::{
metadata_reader::parse_metadata, KvMetaDataNotFound, KvMetaVersionEmpty,
ParquetMetaDataRef, Result,
},
parquet::encoding,
};

Expand All @@ -45,38 +49,43 @@ impl MetaData {
/// contains no extended custom information.
// TODO: remove it and use the suggested api.
#[allow(deprecated)]
pub fn try_new(
pub async fn try_new(
parquet_meta_data: &parquet_ext::ParquetMetaData,
ignore_sst_filter: bool,
store: ObjectStoreRef,
) -> Result<Self> {
let file_meta_data = parquet_meta_data.file_metadata();
let kv_metas = file_meta_data
.key_value_metadata()
.context(KvMetaDataNotFound)?;

ensure!(!kv_metas.is_empty(), KvMetaDataNotFound);
let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1);

let mut meta_path = None;
let mut other_kv_metas: Vec<KeyValue> = Vec::with_capacity(kv_metas.len() - 1);
let mut custom_kv_meta = None;
let mut meta_version = encoding::META_VERSION_V1; // default is v1

for kv_meta in kv_metas {
// Remove our extended custom meta data from the parquet metadata for small
// memory consumption in the cache.
if kv_meta.key == encoding::META_KEY {
custom_kv_meta = Some(kv_meta);
} else if kv_meta.key == encoding::META_PATH_KEY {
meta_path = kv_meta.value.as_ref().map(|path| Path::from(path.as_str()))
} else if kv_meta.key == encoding::META_VERSION_KEY {
meta_version = kv_meta.value.as_ref().context(KvMetaVersionEmpty)?;
} else {
other_kv_metas.push(kv_meta.clone());
}
}

let custom = {
let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?;
let mut sst_meta =
encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?;
if ignore_sst_filter {
sst_meta.parquet_filter = None;
}

Arc::new(sst_meta)
};
let custom = parse_metadata(
meta_version,
custom_kv_meta,
ignore_sst_filter,
meta_path.clone(),
store,
)
.await?;

// let's build a new parquet metadata without the extended key value
// metadata.
Expand All @@ -103,7 +112,6 @@ impl MetaData {

Arc::new(thin_parquet_meta_data)
};

Ok(Self { parquet, custom })
}

Expand Down Expand Up @@ -155,6 +163,7 @@ mod tests {
schema::Builder as CustomSchemaBuilder,
time::{TimeRange, Timestamp},
};
use object_store::LocalFileSystem;
use parquet::{arrow::ArrowWriter, file::footer};
use parquet_ext::ParquetMetaData;

Expand Down Expand Up @@ -238,15 +247,16 @@ mod tests {
.unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();

let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
let encoded_meta_data =
encoding::encode_sst_meta_data_v1(custom_meta_data.clone()).unwrap();
writer.append_key_value_metadata(encoded_meta_data);

writer.write(&batch).unwrap();
writer.close().unwrap();
}

#[test]
fn test_arrow_meta_data() {
#[tokio::test]
async fn test_arrow_meta_data() {
let temp_dir = tempfile::tempdir().unwrap();
let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par");
let schema = {
Expand Down Expand Up @@ -284,8 +294,11 @@ mod tests {

let parquet_file = File::open(parquet_file_path.as_path()).unwrap();
let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap();

let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap();
let store =
Arc::new(LocalFileSystem::new_with_prefix(parquet_file_path.as_path()).unwrap());
let meta_data = MetaData::try_new(&parquet_meta_data, false, store)
.await
.unwrap();

assert_eq!(**meta_data.custom(), custom_meta_data);
check_parquet_meta_data(&parquet_meta_data, meta_data.parquet());
Expand Down
Loading

0 comments on commit fc0f314

Please sign in to comment.