Skip to content

Commit

Permalink
fix cluster key for distributed copy from query
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Jul 18, 2023
1 parent 1912359 commit f5c6c65
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 2 deletions.
6 changes: 5 additions & 1 deletion src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,20 +201,24 @@ impl CopyInterpreter {
)))
} else {
// plan query must exist, we can use unwarp directly.
let (select_interpreter, _) = self.build_query(plan.query.as_ref().unwrap()).await?;
let (select_interpreter, query_source_schema) =
self.build_query(plan.query.as_ref().unwrap()).await?;
let plan_query = select_interpreter.build_physical_plan().await?;
Ok(Some(CopyPlanType::CopyIntoTableFromQuery(
CopyIntoTableFromQuery {
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id
plan_id: 0,
ignore_result: select_interpreter.get_ignore_result(),
catalog_name: plan.catalog_name.clone(),
database_name: plan.database_name.clone(),
table_name: plan.table_name.clone(),
required_source_schema: plan.required_source_schema.clone(),
values_consts: plan.values_consts.clone(),
required_values_schema: plan.required_values_schema.clone(),
result_columns: select_interpreter.get_result_columns(),
query_source_schema,
write_mode: plan.write_mode,
validation_mode: plan.validation_mode.clone(),
force: plan.force,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/interpreters/interpreter_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_pipeline_core::Pipeline;
use common_pipeline_transforms::processors::transforms::TransformDummy;
use common_sql::executor::PhysicalPlan;
use common_sql::parse_result_scan_args;
use common_sql::ColumnBinding;
use common_sql::MetadataRef;
use common_storages_result_cache::gen_result_cache_key;
use common_storages_result_cache::ResultCacheReader;
Expand Down Expand Up @@ -73,6 +74,14 @@ impl SelectInterpreter {
})
}

pub fn get_ignore_result(&self) -> bool {
self.ignore_result
}

pub fn get_result_columns(&self) -> Vec<ColumnBinding> {
self.bind_context.columns.clone()
}

#[inline]
#[async_backtrace::framed]
pub async fn build_physical_plan(&self) -> Result<PhysicalPlan> {
Expand Down
10 changes: 9 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,21 @@ impl PipelineBuilder {
copy_plan: &CopyIntoTableFromQuery,
) -> Result<()> {
self.build_pipeline(&copy_plan.input)?;
// render_result for query
PipelineBuilder::render_result_set(
&self.ctx.get_function_context()?,
copy_plan.input.output_schema()?,
&copy_plan.result_columns,
&mut self.main_pipeline,
copy_plan.ignore_result,
)?;
let catalog = self.ctx.get_catalog(&copy_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&copy_plan.table_info)?;
build_append_data_pipeline(
self.ctx.clone(),
&mut self.main_pipeline,
CopyPlanType::CopyIntoTableFromQuery(copy_plan.clone()),
copy_plan.required_source_schema.clone(),
copy_plan.query_source_schema.clone(),
to_table,
)?;
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ pub struct CopyIntoTableFromQuery {
pub required_values_schema: DataSchemaRef, // ... into table(<columns>) .. -> <columns>
pub values_consts: Vec<Scalar>, // (1, ?, 'a', ?) -> (1, 'a')
pub required_source_schema: DataSchemaRef, // (1, ?, 'a', ?) -> (?, ?)
// these three fileds are used for query result render
pub query_source_schema: DataSchemaRef,
pub ignore_result: bool,
pub result_columns: Vec<ColumnBinding>,

pub write_mode: CopyIntoTableMode,
pub validation_mode: ValidationMode,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
statement ok
set enable_distributed_copy_into = 1;

statement ok
drop table if exists test_order;

statement ok
drop table if exists random_source;

statement ok
drop stage if exists test_stage;

statement ok
drop table if exists parquet_table;

Expand Down Expand Up @@ -166,5 +175,26 @@ select block_count from fuse_snapshot('default','t_query2');
----
2

#test cluster key
statement ok
create table test_order(a int,b string,c timestamp) cluster by(to_yyyymmdd(c),a);

statement ok
create table random_source like test_order Engine = Random;

statement ok
create stage test_stage;

statement ok
copy into @test_stage from (select * from random_source limit 10000000) FILE_FORMAT=(type=parquet);

statement ok
copy into test_order from @test_stage file_format=(type=parquet) force=true;

query I
select count(*) from test_order;
----
10000000

statement ok
set enable_distributed_copy_into = 0;

0 comments on commit f5c6c65

Please sign in to comment.