Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: separate metadata from parquet's kv_metadata #1120

Merged
merged 49 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
84b2141
add critical path comment
tanruixiang Jul 27, 2023
bec3378
draft
tanruixiang Jul 31, 2023
f814484
draft: write into another file success
tanruixiang Jul 31, 2023
a4bb37d
decode encode custom success
tanruixiang Aug 1, 2023
c0e8a86
draft
tanruixiang Aug 1, 2023
0603689
Merge remote-tracking branch 'origin/main' into separate_metadata
tanruixiang Aug 1, 2023
9e58acc
fix compile error
tanruixiang Aug 1, 2023
cf7b45c
fix ut
tanruixiang Aug 1, 2023
7bb1fde
update
tanruixiang Aug 2, 2023
dfa6af3
read sst meta path from parquet's metadata
tanruixiang Aug 2, 2023
0e048ed
fmt
tanruixiang Aug 2, 2023
29a4d7a
Merge branch 'main' into separate_metadata
tanruixiang Aug 7, 2023
422a4b6
beauty code
tanruixiang Aug 7, 2023
8f88b68
add error handle
tanruixiang Aug 8, 2023
3f48468
add error handle
tanruixiang Aug 8, 2023
665c774
clippy
tanruixiang Aug 8, 2023
ab86aff
Merge branch 'main' into separate_metadata
tanruixiang Aug 8, 2023
1b4e500
Merge branch 'main' into separate_metadata
tanruixiang Aug 9, 2023
8734400
Merge branch 'main' into separate_metadata
tanruixiang Aug 9, 2023
c9b5f57
encode decode pb binary
tanruixiang Aug 9, 2023
43ed794
Merge branch 'main' into separate_metadata
tanruixiang Aug 9, 2023
f6d4ec3
update
tanruixiang Aug 9, 2023
350f4b3
store path version in parquet kv
tanruixiang Aug 9, 2023
146c950
update comment
tanruixiang Aug 9, 2023
e0c422b
fix bug
tanruixiang Aug 9, 2023
e07391e
fix
tanruixiang Aug 9, 2023
40440fc
write into upper layer
tanruixiang Aug 9, 2023
45de991
format
tanruixiang Aug 9, 2023
449d7af
fix
tanruixiang Aug 9, 2023
8857adb
beauty code
tanruixiang Aug 10, 2023
9821195
format
tanruixiang Aug 10, 2023
a5766a4
delete unuse import
tanruixiang Aug 10, 2023
d8b6049
add empty line
tanruixiang Aug 10, 2023
c623d03
add empty line
tanruixiang Aug 10, 2023
787e94c
update by review
tanruixiang Aug 10, 2023
923bc62
update
tanruixiang Aug 10, 2023
e1dff07
fmt
tanruixiang Aug 10, 2023
38b05a2
update
tanruixiang Aug 10, 2023
8b3cbc2
draft: delete files
tanruixiang Aug 10, 2023
d039ad0
delete files
tanruixiang Aug 14, 2023
fb17c6b
make fix
tanruixiang Aug 14, 2023
f7fde77
fmt
tanruixiang Aug 14, 2023
5c3fc48
Merge branch 'main' into separate_metadata
jiacai2050 Aug 22, 2023
91b005c
refactor metadata reader
jiacai2050 Aug 22, 2023
ef5642b
bump proto, add associated_files
jiacai2050 Aug 23, 2023
4111c6e
remove meta_path from sst reader
jiacai2050 Aug 23, 2023
da1a6b6
remove clone
jiacai2050 Aug 23, 2023
32a0f2f
retry when delete failed
jiacai2050 Aug 24, 2023
8fe3ef6
rename meta file
jiacai2050 Aug 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
ceresdbproto = "1.0.10"
ceresdbproto = "1.0.11"
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,7 @@ mod tests {
row_num: 0,
max_seq: 0,
storage_format: StorageFormat::default(),
associated_files: Vec::new(),
};
let queue = FilePurgeQueue::new(1, 1.into(), tx.clone());
FileHandle::new(file_meta, queue)
Expand All @@ -893,6 +894,7 @@ mod tests {
row_num: 0,
max_seq,
storage_format: StorageFormat::default(),
associated_files: Vec::new(),
};
let queue = FilePurgeQueue::new(1, 1.into(), tx.clone());
FileHandle::new(file_meta, queue)
Expand Down
7 changes: 4 additions & 3 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,6 @@ impl FlushTask {
.context(AllocFileId)?;

let sst_file_path = self.table_data.set_sst_file_path(file_id);

// TODO: `min_key` & `max_key` should be figured out when writing sst.
let sst_meta = MetaData {
min_key: min_key.clone(),
Expand Down Expand Up @@ -583,6 +582,7 @@ impl FlushTask {
time_range: sst_meta.time_range,
max_seq: sst_meta.max_sequence,
storage_format: sst_info.storage_format,
associated_files: vec![sst_info.meta_path],
},
})
}
Expand Down Expand Up @@ -621,7 +621,6 @@ impl FlushTask {
.context(AllocFileId)?;

let sst_file_path = self.table_data.set_sst_file_path(file_id);

let storage_format_hint = self.table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
Expand Down Expand Up @@ -665,6 +664,7 @@ impl FlushTask {
time_range: memtable_state.time_range,
max_seq: memtable_state.last_sequence(),
storage_format: sst_info.storage_format,
associated_files: vec![sst_info.meta_path],
}))
}
}
Expand Down Expand Up @@ -851,6 +851,7 @@ impl SpaceStore {
.fetch_metas(&input.files)
.await
.context(ReadSstMeta)?;

MetaData::merge(sst_metas.into_iter().map(MetaData::from), schema)
};

Expand All @@ -861,7 +862,6 @@ impl SpaceStore {
.context(AllocFileId)?;

let sst_file_path = table_data.set_sst_file_path(file_id);

let mut sst_writer = self
.sst_factory
.create_writer(
Expand Down Expand Up @@ -926,6 +926,7 @@ impl SpaceStore {
max_seq: sst_meta.max_sequence,
time_range: sst_meta.time_range,
storage_format: sst_info.storage_format,
associated_files: vec![sst_info.meta_path],
},
});

Expand Down
39 changes: 28 additions & 11 deletions analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use common_types::{
time::{TimeRange, Timestamp},
SequenceNumber,
};
use future_ext::{retry_async, RetryConfig};
use log::{error, info, warn};
use macros::define_result;
use metric_ext::Meter;
use object_store::ObjectStoreRef;
use object_store::{ObjectStoreRef, Path};
use runtime::{JoinHandle, Runtime};
use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
Expand Down Expand Up @@ -303,7 +305,7 @@ impl Drop for FileHandleInner {
info!("FileHandle is dropped, meta:{:?}", self.meta);

// Push file cannot block or be async because we are in drop().
self.purge_queue.push_file(self.meta.id);
self.purge_queue.push_file(&self.meta);
}
}

Expand Down Expand Up @@ -441,6 +443,8 @@ pub struct FileMeta {
pub max_seq: u64,
/// The format of the file.
pub storage_format: StorageFormat,
/// Associated files, such as: meta_path
pub associated_files: Vec<String>,
}

impl FileMeta {
Expand Down Expand Up @@ -475,9 +479,9 @@ impl FilePurgeQueue {
self.inner.closed.store(true, Ordering::SeqCst);
}

fn push_file(&self, file_id: FileId) {
fn push_file(&self, file_meta: &FileMeta) {
if self.inner.closed.load(Ordering::SeqCst) {
warn!("Purger closed, ignore file_id:{file_id}");
warn!("Purger closed, ignore file_id:{}", file_meta.id);
return;
}

Expand All @@ -486,7 +490,8 @@ impl FilePurgeQueue {
let request = FilePurgeRequest {
space_id: self.inner.space_id,
table_id: self.inner.table_id,
file_id,
file_id: file_meta.id,
associated_files: file_meta.associated_files.clone(),
};

if let Err(send_res) = self.inner.sender.send(Request::Purge(request)) {
Expand All @@ -510,6 +515,7 @@ pub struct FilePurgeRequest {
space_id: SpaceId,
table_id: TableId,
file_id: FileId,
associated_files: Vec<String>,
}

#[derive(Debug)]
Expand All @@ -525,6 +531,11 @@ pub struct FilePurger {
}

impl FilePurger {
const RETRY_CONFIG: RetryConfig = RetryConfig {
max_retries: 3,
interval: Duration::from_millis(500),
};

pub fn start(runtime: &Runtime, store: ObjectStoreRef) -> Self {
// We must use unbound channel, so the sender wont block when the handle is
// dropped.
Expand Down Expand Up @@ -561,6 +572,13 @@ impl FilePurger {
FilePurgeQueue::new(space_id, table_id, self.sender.clone())
}

// TODO: currently we ignore errors when delete.
async fn delete_file(store: &ObjectStoreRef, path: &Path) {
if let Err(e) = retry_async(|| store.delete(path), &Self::RETRY_CONFIG).await {
error!("File purger failed to delete file, path:{path}, err:{e}");
}
}

async fn purge_file_loop(store: ObjectStoreRef, mut receiver: UnboundedReceiver<Request>) {
info!("File purger start");

Expand All @@ -579,13 +597,12 @@ impl FilePurger {
sst_file_path.to_string()
);

if let Err(e) = store.delete(&sst_file_path).await {
error!(
"File purger failed to delete file, sst_file_path:{}, err:{}",
sst_file_path.to_string(),
e
);
for path in purge_request.associated_files {
let path = Path::from(path);
Self::delete_file(&store, &path).await;
}

Self::delete_file(&store, &sst_file_path).await;
}
Request::Exit => break,
}
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ pub mod tests {
time_range: sst_meta.time_range(),
max_seq: sst_meta.max_sequence(),
storage_format: StorageFormat::Columnar,
associated_files: Vec::new(),
},
);
}
Expand Down
59 changes: 36 additions & 23 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ use std::{
};

use lru::LruCache;
use parquet::file::metadata::FileMetaData;
use snafu::{ensure, OptionExt, ResultExt};
use object_store::{ObjectStoreRef, Path};
use parquet::{file::metadata::FileMetaData, format::KeyValue};
use snafu::{ensure, OptionExt};

use crate::sst::{
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
meta_data::{
metadata_reader::parse_metadata, KvMetaDataNotFound, KvMetaVersionEmpty,
ParquetMetaDataRef, Result,
},
parquet::encoding,
};

Expand All @@ -45,38 +49,43 @@ impl MetaData {
/// contains no extended custom information.
// TODO: remove it and use the suggested api.
#[allow(deprecated)]
pub fn try_new(
pub async fn try_new(
parquet_meta_data: &parquet_ext::ParquetMetaData,
ignore_sst_filter: bool,
store: ObjectStoreRef,
) -> Result<Self> {
let file_meta_data = parquet_meta_data.file_metadata();
let kv_metas = file_meta_data
.key_value_metadata()
.context(KvMetaDataNotFound)?;

ensure!(!kv_metas.is_empty(), KvMetaDataNotFound);
let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1);

let mut meta_path = None;
let mut other_kv_metas: Vec<KeyValue> = Vec::with_capacity(kv_metas.len() - 1);
let mut custom_kv_meta = None;
let mut meta_version = encoding::META_VERSION_V1; // default is v1

for kv_meta in kv_metas {
// Remove our extended custom meta data from the parquet metadata for small
// memory consumption in the cache.
if kv_meta.key == encoding::META_KEY {
custom_kv_meta = Some(kv_meta);
} else if kv_meta.key == encoding::META_PATH_KEY {
meta_path = kv_meta.value.as_ref().map(|path| Path::from(path.as_str()))
} else if kv_meta.key == encoding::META_VERSION_KEY {
meta_version = kv_meta.value.as_ref().context(KvMetaVersionEmpty)?;
} else {
other_kv_metas.push(kv_meta.clone());
}
}

let custom = {
let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?;
let mut sst_meta =
encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?;
if ignore_sst_filter {
sst_meta.parquet_filter = None;
}

Arc::new(sst_meta)
};
let custom = parse_metadata(
meta_version,
custom_kv_meta,
ignore_sst_filter,
meta_path.clone(),
store,
)
.await?;

// let's build a new parquet metadata without the extended key value
// metadata.
Expand All @@ -103,7 +112,6 @@ impl MetaData {

Arc::new(thin_parquet_meta_data)
};

Ok(Self { parquet, custom })
}

Expand Down Expand Up @@ -155,6 +163,7 @@ mod tests {
schema::Builder as CustomSchemaBuilder,
time::{TimeRange, Timestamp},
};
use object_store::LocalFileSystem;
use parquet::{arrow::ArrowWriter, file::footer};
use parquet_ext::ParquetMetaData;

Expand Down Expand Up @@ -238,15 +247,16 @@ mod tests {
.unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();

let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
let encoded_meta_data =
encoding::encode_sst_meta_data_v1(custom_meta_data.clone()).unwrap();
writer.append_key_value_metadata(encoded_meta_data);

writer.write(&batch).unwrap();
writer.close().unwrap();
}

#[test]
fn test_arrow_meta_data() {
#[tokio::test]
async fn test_arrow_meta_data() {
let temp_dir = tempfile::tempdir().unwrap();
let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par");
let schema = {
Expand Down Expand Up @@ -284,8 +294,11 @@ mod tests {

let parquet_file = File::open(parquet_file_path.as_path()).unwrap();
let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap();

let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap();
let store =
Arc::new(LocalFileSystem::new_with_prefix(parquet_file_path.as_path()).unwrap());
let meta_data = MetaData::try_new(&parquet_meta_data, false, store)
.await
.unwrap();

assert_eq!(**meta_data.custom(), custom_meta_data);
check_parquet_meta_data(&parquet_meta_data, meta_data.parquet());
Expand Down
Loading