Skip to content

Commit

Permalink
add comments, remove serialized_reader
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Oct 19, 2022
1 parent 7a2d29e commit 13d99c0
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 608 deletions.
10 changes: 5 additions & 5 deletions analytic_engine/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use async_trait::async_trait;
use bytes::Bytes;
use common_types::{
projected_schema::{ProjectedSchema, RowProjector},
projected_schema::ProjectedSchema,
record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey},
};
use datafusion::{
Expand Down Expand Up @@ -315,7 +315,7 @@ impl AsyncFileReader for CachableParquetFileReader {

struct RecordBatchProjector {
stream: SendableRecordBatchStream,
row_projector: RowProjector,
row_projector: ArrowRecordBatchProjector,
storage_format_opts: StorageFormatOptions,
row_num: usize,
}
Expand All @@ -340,8 +340,6 @@ impl Stream for RecordBatchProjector {
{
Err(e) => Poll::Ready(Some(Err(e))),
Ok(record_batch) => {
let arrow_record_batch_projector =
ArrowRecordBatchProjector::from(projector.row_projector.clone());
let parquet_decoder =
ParquetDecoder::new(projector.storage_format_opts.clone());
let record_batch = parquet_decoder
Expand All @@ -351,7 +349,8 @@ impl Stream for RecordBatchProjector {

projector.row_num += record_batch.num_rows();

let projected_batch = arrow_record_batch_projector
let projected_batch = projector
.row_projector
.project_to_record_batch_with_key(record_batch)
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch {});
Expand Down Expand Up @@ -392,6 +391,7 @@ impl<'a> SstReader for ParquetSstReader<'a> {
.try_project_with_key(&metadata.schema)
.map_err(|e| Box::new(e) as _)
.context(Projection)?;
let row_projector = ArrowRecordBatchProjector::from(row_projector);
let storage_format_opts = metadata.storage_format_opts.clone();

Ok(Box::new(RecordBatchProjector {
Expand Down
12 changes: 8 additions & 4 deletions analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use common_types::{
};
use common_util::{config::ReadableDuration, runtime};
use futures::stream::StreamExt;
use log::info;
use log::{error, info};
use table_engine::{
engine::{
CreateTableRequest, DropTableRequest, EngineRuntimes, OpenTableRequest,
Expand Down Expand Up @@ -421,11 +421,15 @@ impl Builder {
// When running tests in CI, there will be error like
// /tmp/.tmpIGahGc/store/100/2199023255554/29.sst not found: No such file or
// directory
// So we use test-data directory in current working directory when running in CI
let (data_path, wal_path) = match env {
Env::CI => {
let now = Timestamp::now();
let data_dir = format!("data-{}", now.as_i64());
(data_dir.clone(), data_dir)
let data_dir = "test-data";
if let Err(e) = std::fs::remove_dir_all(data_dir) {
error!("delete data dir failed, err:{}", e)
}

(data_dir.to_string(), data_dir.to_string())
}
_ => {
let tmp = dir.path().to_str().unwrap().to_string();
Expand Down
2 changes: 1 addition & 1 deletion common_types/src/projected_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum Error {

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct RowProjector {
schema_with_key: RecordSchemaWithKey,
source_schema: Schema,
Expand Down
3 changes: 0 additions & 3 deletions components/parquet_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
pub mod cache;
#[allow(deprecated)]
pub mod reverse_reader;
mod serialized_reader;
#[cfg(test)]
pub mod tests;

// use cache::Cache;
use std::sync::Arc;

pub use serialized_reader::CacheableSerializedFileReader;

use crate::cache::{DataCache, MetaCache};

pub type MetaCacheRef = Arc<dyn MetaCache + Send + Sync>;
Expand Down
Loading

0 comments on commit 13d99c0

Please sign in to comment.