Skip to content

Commit

Permalink
feat(query): new input format framework.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Sep 15, 2022
1 parent 640238d commit 4c75403
Show file tree
Hide file tree
Showing 14 changed files with 1,095 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/query/pipeline/sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ doctest = false
test = false

[dependencies]
async-channel = "1.7.1"
common-arrow = { path = "../../../common/arrow" }
common-base = { path = "../../../common/base" }
common-catalog = { path = "../../catalog" }
common-datablocks = { path = "../../datablocks" }
common-datavalues = { path = "../../datavalues" }
common-exception = { path = "../../../common/exception" }
common-formats = { path = "../../formats" }
common-io = { path = "../../../common/io" }
common-meta-types = { path = "../../../meta/types" }
common-pipeline-core = { path = "../core" }
common-settings = { path = "../../settings" }
common-storage = { path = "../../../common/storage" }
common-streams = { path = "../../streams" }

tracing = "0.1.35"
async-trait = { version = "0.1.0", package = "async-trait-fn" }
futures = "0.3.21"
futures-util = "0.3.21"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use common_arrow::parquet::metadata::RowGroupMetaData;
use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_core::Pipeline;
use opendal::Object;

use crate::processors::sources::input_formats::input_context::InputContext;
use crate::processors::sources::input_formats::input_format::FileInfo;
use crate::processors::sources::input_formats::input_format::InputData;
use crate::processors::sources::input_formats::input_format::SplitInfo;
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
use crate::processors::sources::input_formats::InputFormat;

struct InputFormatParquet;

#[async_trait::async_trait]
impl InputFormat for InputFormatParquet {
async fn read_file_meta(
&self,
obj: &Object,
size: usize,
) -> Result<Option<Arc<dyn InputData>>> {
todo!()
}

async fn read_split_meta(
&self,
obj: &Object,
split_info: &SplitInfo,
) -> Result<Option<Box<dyn InputData>>> {
todo!()
}

fn split_files(&self, file_infos: Vec<FileInfo>) -> Vec<SplitInfo> {
todo!()
}

fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
todo!()
}

fn exec_stream(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
todo!()
}
}

pub struct ParquetFormatPipe;

#[async_trait::async_trait]
impl InputFormatPipe for ParquetFormatPipe {
type ReadBatch = ReadBatch;
type RowBatch = RowGroupInMemory;
type AligningState = AligningState;
type BlockBuilder = ParquetBlockBuilder;
}

pub struct SplitMeta {
row_groups: Vec<RowGroupMetaData>,
}

pub struct RowGroupInMemory {}

impl Debug for RowGroupInMemory {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RowGroupInMemory")
}
}

#[derive(Debug)]
pub enum ReadBatch {
Buffer(Vec<u8>),
RowGroup(RowGroupInMemory),
}

impl From<Vec<u8>> for ReadBatch {
fn from(v: Vec<u8>) -> Self {
Self::Buffer(v)
}
}

pub struct ParquetBlockBuilder {
ctx: Arc<InputContext>,
}

impl BlockBuilderTrait for ParquetBlockBuilder {
type Pipe = ParquetFormatPipe;

fn create(ctx: Arc<InputContext>) -> Self {
ParquetBlockBuilder { ctx }
}

fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
todo!()
}
}

pub struct AligningState {
buffers: Vec<Vec<u8>>,
}

impl AligningStateTrait for AligningState {
type Pipe = ParquetFormatPipe;

fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
todo!()
}

fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
todo!()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;

pub struct InputFormatTSV {}

impl InputFormatTextBase for InputFormatTSV {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod input_format_parquet;
pub mod input_format_tsv;
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::Arc;

use common_base::base::tokio::sync::Mutex;
use common_base::base::Progress;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::StageFileCompression;
use common_meta_types::StageFileFormatType;
use common_meta_types::UserStageInfo;
use common_settings::Settings;
use opendal::io_util::CompressAlgorithm;
use opendal::Operator;

use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV;
use crate::processors::sources::input_formats::input_format::FileInfo;
use crate::processors::sources::input_formats::input_format::SplitInfo;
use crate::processors::sources::input_formats::input_format_text::InputFormatText;
use crate::processors::sources::input_formats::InputFormat;

pub enum InputPlan {
CopyInto(Box<CopyIntoPlan>),
StreamingLoad,
ClickHouseInsert,
}

pub struct CopyIntoPlan {
pub stage_info: UserStageInfo,
pub files: Vec<String>,
}

pub struct InputProgress {
// todo(youngsofun): add write progress and errors
scan_progress: Progress,
size: usize,
}

pub struct InputContext {
pub plan: InputPlan,
pub schema: DataSchemaRef,
pub operator: Operator,
pub format: Arc<dyn InputFormat>,
pub splits: Vec<SplitInfo>,

// runtime config
pub settings: Settings,
pub read_buffer_size: usize,
pub min_split_size: usize,
pub min_block_size: usize,

pub progress_total: Mutex<Vec<Arc<InputProgress>>>,
pub progress_by_file: Mutex<Vec<Arc<InputProgress>>>,
}

impl InputContext {
pub fn get_input_format(format: &StageFileFormatType) -> Result<Arc<dyn InputFormat>> {
match format {
StageFileFormatType::Tsv => Ok(Arc::new(InputFormatText::<InputFormatTSV>::create())),
format => {
Err(ErrorCode::LogicalError(format!(
"Unsupported file format: {:?}",
format
)))
}
}
}

async fn try_create_from_copy(
operator: Operator,
settings: Settings,
schema: DataSchemaRef,
plan: CopyIntoPlan,
) -> Result<Self> {
let format = Self::get_input_format(&plan.stage_info.file_format_options.format)?;
let files = Self::get_file_infos(&format, &operator, &plan).await?;
let splits = format.split_files(files);
Ok(InputContext {
format,
schema,
operator,
splits,
settings,
read_buffer_size: 0,
min_split_size: 0,
min_block_size: 0,
progress_total: Default::default(),
plan: InputPlan::CopyInto(Box::new(plan)),
progress_by_file: Default::default(),
})
}

async fn get_file_infos(
format: &Arc<dyn InputFormat>,
op: &Operator,
plan: &CopyIntoPlan,
) -> Result<Vec<FileInfo>> {
let mut infos = vec![];
for p in &plan.files {
let obj = op.object(p);
let size = obj.metadata().await?.content_length() as usize;
let file_meta = format.read_file_meta(&obj, size).await?;
let compress_alg = InputContext::get_compression_alg_copy(
plan.stage_info.file_format_options.compression,
p,
)?;
let info = FileInfo {
path: p.clone(),
size,
compress_alg,
file_meta,
};
infos.push(info)
}
Ok(infos)
}

pub fn num_prefetch_splits(&self) -> Result<usize> {
Ok(self.settings.get_max_threads()? as usize)
}

pub fn num_prefetch_per_split(&self) -> usize {
1
}

pub fn get_compression_alg(&self, path: &str) -> Result<Option<CompressAlgorithm>> {
let opt = match &self.plan {
InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression,
_ => StageFileCompression::None,
};
Self::get_compression_alg_copy(opt, path)
}

pub fn get_compression_alg_copy(
compress_option: StageFileCompression,
path: &str,
) -> Result<Option<CompressAlgorithm>> {
let compression_algo = match compress_option {
StageFileCompression::Auto => CompressAlgorithm::from_path(path),
StageFileCompression::Gzip => Some(CompressAlgorithm::Gzip),
StageFileCompression::Bz2 => Some(CompressAlgorithm::Bz2),
StageFileCompression::Brotli => Some(CompressAlgorithm::Brotli),
StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd),
StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib),
StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate),
StageFileCompression::Xz => Some(CompressAlgorithm::Xz),
StageFileCompression::Lzo => {
return Err(ErrorCode::UnImplement("compress type lzo is unimplemented"));
}
StageFileCompression::Snappy => {
return Err(ErrorCode::UnImplement(
"compress type snappy is unimplemented",
));
}
StageFileCompression::None => None,
};
Ok(compression_algo)
}
}
Loading

0 comments on commit 4c75403

Please sign in to comment.