Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External source with new processor #4277

Merged
1 change: 0 additions & 1 deletion common/contexts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ doctest = false
test = false

[dependencies]

async-trait = "0.1.52"
opendal = "0.1.3"
5 changes: 3 additions & 2 deletions common/planners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod plan_node_display;
mod plan_node_display_indent;
mod plan_node_extras;
mod plan_node_rewriter;
mod plan_node_s3_external_table;
mod plan_node_stage;
mod plan_node_statistics;
mod plan_node_visitor;
Expand Down Expand Up @@ -82,7 +83,6 @@ mod plan_user_create;
mod plan_user_drop;
mod plan_user_privilege_grant;
mod plan_user_privilege_revoke;
mod plan_user_stage;
mod plan_user_stage_create;
mod plan_user_stage_describe;
mod plan_user_stage_drop;
Expand Down Expand Up @@ -149,6 +149,7 @@ pub use plan_node_builder::PlanBuilder;
pub use plan_node_extras::Extras;
pub use plan_node_rewriter::PlanRewriter;
pub use plan_node_rewriter::RewriteHelper;
pub use plan_node_s3_external_table::S3ExternalTableInfo;
pub use plan_node_stage::StageKind;
pub use plan_node_stage::StagePlan;
pub use plan_node_statistics::Statistics;
Expand All @@ -157,6 +158,7 @@ pub use plan_partition::Part;
pub use plan_partition::Partitions;
pub use plan_projection::ProjectionPlan;
pub use plan_read_datasource::ReadDataSourcePlan;
pub use plan_read_datasource::SourceInfo;
pub use plan_remote::RemotePlan;
pub use plan_role_create::CreateRolePlan;
pub use plan_role_drop::DropRolePlan;
Expand Down Expand Up @@ -192,7 +194,6 @@ pub use plan_user_create::CreateUserPlan;
pub use plan_user_drop::DropUserPlan;
pub use plan_user_privilege_grant::GrantPrivilegePlan;
pub use plan_user_privilege_revoke::RevokePrivilegePlan;
pub use plan_user_stage::UserStagePlan;
pub use plan_user_stage_create::CreateUserStagePlan;
pub use plan_user_stage_describe::DescribeUserStagePlan;
pub use plan_user_stage_drop::DropUserStagePlan;
Expand Down
6 changes: 3 additions & 3 deletions common/planners/src/plan_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::str::FromStr;
use common_datavalues::DataSchemaRef;
use common_meta_types::MetaId;

use crate::UserStagePlan;
use crate::ReadDataSourcePlan;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone, Debug)]
pub enum ValidationMode {
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct CopyPlan {
pub tbl_name: String,
pub tbl_id: MetaId,
pub schema: DataSchemaRef,
pub stage_plan: UserStagePlan,
pub from: ReadDataSourcePlan,
pub validation_mode: ValidationMode,
pub files: Vec<String>,
}
Expand All @@ -73,7 +73,7 @@ impl Debug for CopyPlan {
// Ignore the schema.
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Copy into {:}.{:}", self.db_name, self.tbl_name)?;
write!(f, " ,{:?}", self.stage_plan)?;
write!(f, ", {:?}", self.from)?;
if !self.files.is_empty() {
write!(f, " ,files:{:?}", self.files)?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@ use common_datavalues::DataSchemaRef;
use common_meta_types::UserStageInfo;

#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq)]
pub struct UserStagePlan {
pub struct S3ExternalTableInfo {
pub schema: DataSchemaRef,
pub file_name: Option<String>,
pub stage_info: UserStageInfo,
}

impl Debug for UserStagePlan {
impl S3ExternalTableInfo {
pub fn schema(&self) -> DataSchemaRef {
self.schema.clone()
}

pub fn desc(&self) -> String {
self.stage_info.stage_name.clone()
}
}

impl Debug for S3ExternalTableInfo {
// Ignore the schema.
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.stage_info)
Expand Down
37 changes: 32 additions & 5 deletions common/planners/src/plan_read_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,45 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use common_datavalues::DataField;
use common_datavalues::DataSchema;
use common_datavalues::DataSchemaRef;
use common_meta_types::TableInfo;

use crate::Expression;
use crate::Extras;
use crate::Partitions;
use crate::S3ExternalTableInfo;
use crate::Statistics;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum SourceInfo {
// Normal table source, 'fuse/system'.
TableSource(TableInfo),

// S3 external source, 's3://'.
S3ExternalSource(S3ExternalTableInfo),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make s3 as a special table?

Such as a table function that holds the temporary table.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make s3 as table engine, we can do the unloading as copy from table into s3://

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not extensible enough, considering we will support azblob, gcs, and other locations.

Copy link
Member

@sundy-li sundy-li Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But table function can also support insert.

copy from table into s3://

select * from s3://

  1. as_table to generate a temporary table.
  2. Select or insert works with the table.

I do think it's better to introduce external table function, like:

select * from external(stage_name, ...)

This may works for s3, azblob, ... anything stage supports.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have add a desc for why make it as a table engine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, for select * from @stage ..., we will convert the @stage alias to the external table info which get from the metasrv(after we support stage) in the sql parser phase, it will be same as select * from s3:// did.

}

impl SourceInfo {
pub fn schema(&self) -> Arc<DataSchema> {
match self {
SourceInfo::TableSource(table_info) => table_info.schema(),
SourceInfo::S3ExternalSource(table_info) => table_info.schema(),
}
}

pub fn desc(&self) -> String {
match self {
SourceInfo::TableSource(table_info) => table_info.desc.clone(),
SourceInfo::S3ExternalSource(table_info) => table_info.desc(),
}
}
}

// TODO: Delete the scan plan field, but it depends on plan_parser:L394
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub struct ReadDataSourcePlan {
pub table_info: TableInfo,
pub source_info: SourceInfo,

/// Required fields to scan.
///
Expand All @@ -52,21 +79,21 @@ impl ReadDataSourcePlan {
.clone()
.map(|x| {
let fields: Vec<_> = x.iter().map(|(_, f)| f.clone()).collect();
Arc::new(self.table_info.schema().project_by_fields(fields))
Arc::new(self.source_info.schema().project_by_fields(fields))
})
.unwrap_or_else(|| self.table_info.schema())
.unwrap_or_else(|| self.source_info.schema())
}

/// Return designated required fields or all fields in a hash map.
pub fn scan_fields(&self) -> BTreeMap<usize, DataField> {
self.scan_fields
.clone()
.unwrap_or_else(|| self.table_info.schema().fields_map())
.unwrap_or_else(|| self.source_info.schema().fields_map())
}

pub fn projections(&self) -> Vec<usize> {
let default_proj = || {
(0..self.table_info.schema().fields().len())
(0..self.source_info.schema().fields().len())
.into_iter()
.collect::<Vec<usize>>()
};
Expand Down
3 changes: 2 additions & 1 deletion common/planners/tests/it/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_planners::Part;
use common_planners::Partitions;
use common_planners::PlanNode;
use common_planners::ReadDataSourcePlan;
use common_planners::SourceInfo;
use common_planners::Statistics;

pub struct Test {}
Expand All @@ -40,7 +41,7 @@ impl Test {
};

Ok(PlanNode::ReadSource(ReadDataSourcePlan {
table_info: TableInfo::simple("system", "numbers_mt", schema),
source_info: SourceInfo::TableSource(TableInfo::simple("system", "numbers_mt", schema)),
scan_fields: None,
parts: Self::generate_partitions(8, total as u64),
statistics: statistics.clone(),
Expand Down
2 changes: 1 addition & 1 deletion query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ common-tracing = { path = "../common/tracing" }
# Github dependencies
cargo-license = { git = "https://github.com/datafuse-extras/cargo-license", rev = "f1ce4a2" }
msql-srv = { git = "https://github.com/datafuse-extras/msql-srv", rev = "70aa0b2" }
opendal = "0.1.3"
sqlparser = { git = "https://github.com/datafuse-extras/sqlparser-rs", rev = "472f5b6" }

# Crates.io dependencies
Expand Down Expand Up @@ -85,6 +84,7 @@ num = "0.4.0"
num_cpus = "1.13.1"
octocrab = "0.15.4"
once_cell = "1.9.0"
opendal = "0.1.3"
parquet-format-async-temp = "0.2.0"
paste = "1.0.6"
petgraph = "0.6.0"
Expand Down
93 changes: 50 additions & 43 deletions query/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@

use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::StageFileFormatType;
use common_meta_types::StageType;
use common_planners::CopyPlan;
use common_planners::PlanNode;
use common_planners::ReadDataSourcePlan;
use common_planners::SourceInfo;
use common_streams::DataBlockStream;
use common_streams::ProgressStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;
use futures::TryStreamExt;

use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::processors::Processor;
use crate::pipelines::transforms::CsvSourceTransform;
use crate::pipelines::new::executor::PipelinePullingExecutor;
use crate::pipelines::new::QueryPipelineBuilder;
use crate::sessions::QueryContext;

pub struct CopyInterpreter {
Expand All @@ -41,38 +42,46 @@ impl CopyInterpreter {
Ok(Arc::new(CopyInterpreter { ctx, plan }))
}

// Rewrite the ReadDataSourcePlan.S3ExternalSource.file_name to new file name.
fn rewrite_read_plan_file_name(
mut plan: ReadDataSourcePlan,
file_name: Option<String>,
) -> ReadDataSourcePlan {
if let SourceInfo::S3ExternalSource(ref mut s3) = plan.source_info {
s3.file_name = file_name;
}
plan
}

// Read a file and commit it to the table.
// If the file_name is empty, we will read it {path}/{file_name}.
async fn write_one_file(&self, file_name: Option<String>, commit: bool) -> Result<()> {
// Progress:
// 1. Build a select pipeline
// 2. Execute the pipeline and get the stream
// 3. Read from the stream and write to the table.
// Note:
// We parse the `s3://` to ReadSourcePlan instead of to a SELECT plan is that:
// COPY should deal with the file one by one and do some error handler on the OnError strategy.

#[tracing::instrument(level = "debug", name = "copy_one_file_to_table", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn copy_one_file_to_table(&self, file_name: Option<String>) -> Result<()> {
let ctx = self.ctx.clone();
let stage_plan = self.plan.stage_plan.clone();

let source_stream = match stage_plan.stage_info.stage_type {
StageType::External => {
match stage_plan.stage_info.file_format_options.format {
// CSV.
StageFileFormatType::Csv => {
CsvSourceTransform::try_create(
self.ctx.clone(),
file_name,
stage_plan.clone(),
)?
.execute()
.await
}
// Unsupported.
format => Err(ErrorCode::LogicalError(format!(
"Unsupported file format: {:?}",
format
))),
}
}
let settings = self.ctx.get_settings();

let read_source_plan = self.plan.from.clone();
let read_source_plan = Self::rewrite_read_plan_file_name(read_source_plan, file_name);

StageType::Internal => Err(ErrorCode::LogicalError(
"Unsupported copy from internal stage",
)),
}?;
tracing::info!("copy_one_file_to_table: source plan:{:?}", read_source_plan);

let from_plan = common_planners::SelectPlan {
input: Arc::new(PlanNode::ReadSource(read_source_plan)),
};

let pipeline_builder = QueryPipelineBuilder::create(ctx.clone());
let mut pipeline = pipeline_builder.finalize(&from_plan)?;
pipeline.set_max_threads(settings.get_max_threads()? as usize);

let executor = PipelinePullingExecutor::try_create(pipeline)?;
let source_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
let progress_stream = Box::pin(ProgressStream::try_create(
source_stream,
ctx.get_scan_progress(),
Expand All @@ -81,15 +90,16 @@ impl CopyInterpreter {
let table = ctx
.get_table(&self.plan.db_name, &self.plan.tbl_name)
.await?;
let r = table
let operations = table
.append_data(ctx.clone(), progress_stream)
.await?
.try_collect()
.await?;

if commit {
table.commit_insertion(ctx.clone(), r, false).await?;
}
// Commit.
table
.commit_insertion(ctx.clone(), operations, false)
.await?;

Ok(())
}
Expand All @@ -101,21 +111,18 @@ impl Interpreter for CopyInterpreter {
"CopyInterpreter"
}

#[tracing::instrument(level = "debug", name = "copy_interpreter_execute", skip(self, _input_stream), fields(ctx.id = self.ctx.get_id().as_str()))]
async fn execute(
&self,
mut _input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
tracing::info!("Plan:{:?}", self.plan);

// Commit after each file write.
let commit = true;
let files = self.plan.files.clone();

if files.is_empty() {
self.write_one_file(None, commit).await?;
self.copy_one_file_to_table(None).await?;
} else {
for file in files {
self.write_one_file(Some(file), commit).await?;
self.copy_one_file_to_table(Some(file)).await?;
}
}

Expand Down
Loading