Skip to content

Commit

Permalink
feat: select from stage support ndjson.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Jun 8, 2023
1 parent 50d06a4 commit dde523e
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct StageTableInfo {
pub files_info: StageFilesInfo,
pub stage_info: StageInfo,
pub files_to_copy: Option<Vec<StageFileInfo>>,
pub is_select: bool,
}

impl StageTableInfo {
Expand Down
2 changes: 2 additions & 0 deletions src/query/formats/src/field_decoder/json_ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::FileFormatOptionsExt;
pub struct FieldJsonAstDecoder {
pub timezone: Tz,
pub ident_case_sensitive: bool,
pub is_select: bool,
}

impl FieldDecoder for FieldJsonAstDecoder {
Expand All @@ -60,6 +61,7 @@ impl FieldJsonAstDecoder {
FieldJsonAstDecoder {
timezone: options.timezone,
ident_case_sensitive: options.ident_case_sensitive,
is_select: options.is_select,
}
}

Expand Down
8 changes: 7 additions & 1 deletion src/query/formats/src/file_format_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ pub struct FileFormatOptionsExt {
pub json_strings: bool,
pub disable_variant_check: bool,
pub timezone: Tz,
pub is_select: bool,
}

impl FileFormatOptionsExt {
pub fn create_from_settings(settings: &Settings) -> Result<FileFormatOptionsExt> {
pub fn create_from_settings(
settings: &Settings,
is_select: bool,
) -> Result<FileFormatOptionsExt> {
let timezone = parse_timezone(settings)?;
let options = FileFormatOptionsExt {
ident_case_sensitive: false,
Expand All @@ -56,6 +60,7 @@ impl FileFormatOptionsExt {
json_strings: false,
disable_variant_check: false,
timezone,
is_select,
};
Ok(options)
}
Expand All @@ -72,6 +77,7 @@ impl FileFormatOptionsExt {
json_strings: false,
disable_variant_check: false,
timezone,
is_select: false,
};
let suf = &clickhouse_type.suffixes;
options.headers = suf.headers;
Expand Down
4 changes: 2 additions & 2 deletions src/query/formats/tests/it/output_format_tcsv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn test_data_block(is_nullable: bool) -> Result<()> {

let params =
FileFormatParams::try_from_ast(FileFormatOptionsAst::new(options.clone()), false)?;
let mut options = FileFormatOptionsExt::create_from_settings(&settings)?;
let mut options = FileFormatOptionsExt::create_from_settings(&settings, false)?;
let mut output_format = options.get_output_format(schema, params)?;
let buffer = output_format.serialize_block(&block)?;

Expand Down Expand Up @@ -131,7 +131,7 @@ fn test_field_delimiter_with_ascii_control_code() -> Result<()> {
options.insert("field_delimiter".to_string(), "\x01".to_string());
options.insert("record_delimiter".to_string(), "\r\n".to_string());
let params = FileFormatParams::try_from_ast(FileFormatOptionsAst::new(options.clone()), false)?;
let mut options = FileFormatOptionsExt::create_from_settings(&settings)?;
let mut options = FileFormatOptionsExt::create_from_settings(&settings, false)?;
let mut output_format = options.get_output_format(schema, params)?;
let buffer = output_format.serialize_block(&block)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,43 @@ impl InputFormatNDJson {
schema: &TableSchemaRef,
) -> Result<()> {
let mut json: serde_json::Value = serde_json::from_reader(buf)?;
// if it's not case_sensitive, we convert to lowercase
if !field_decoder.ident_case_sensitive {
if let serde_json::Value::Object(x) = json {
let y = x.into_iter().map(|(k, v)| (k.to_lowercase(), v)).collect();
json = serde_json::Value::Object(y);
// todo: this is temporary
if field_decoder.is_select {
field_decoder
.read_field(&mut columns[0], &json)
.map_err(|e| {
let value_str = format!("{:?}", json);
ErrorCode::BadBytes(format!(
"fail to{}. column=$1 value={}",
e,
maybe_truncated(&value_str, 1024),
))
})?;
} else {
// if it's not case_sensitive, we convert to lowercase
if !field_decoder.ident_case_sensitive {
if let serde_json::Value::Object(x) = json {
let y = x.into_iter().map(|(k, v)| (k.to_lowercase(), v)).collect();
json = serde_json::Value::Object(y);
}
}
}

for (f, column) in schema.fields().iter().zip(columns.iter_mut()) {
let value = if field_decoder.ident_case_sensitive {
&json[f.name().to_owned()]
} else {
&json[f.name().to_lowercase()]
};
field_decoder.read_field(column, value).map_err(|e| {
let value_str = format!("{:?}", value);
ErrorCode::BadBytes(format!(
"{}. column={} value={}",
e,
f.name(),
maybe_truncated(&value_str, 1024),
))
})?;
for (f, column) in schema.fields().iter().zip(columns.iter_mut()) {
let value = if field_decoder.ident_case_sensitive {
&json[f.name().to_owned()]
} else {
&json[f.name().to_lowercase()]
};
field_decoder.read_field(column, value).map_err(|e| {
let value_str = format!("{:?}", value);
ErrorCode::BadBytes(format!(
"{}. column={} value={}",
e,
f.name(),
maybe_truncated(&value_str, 1024),
))
})?;
}
}
Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions src/query/pipeline/sources/src/input_formats/input_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ impl InputContext {
scan_progress: Arc<Progress>,
block_compact_thresholds: BlockThresholds,
on_error_map: Arc<DashMap<String, HashMap<u16, InputError>>>,
is_select: bool,
) -> Result<Self> {
let mut file_format_options_ext = FileFormatOptionsExt::create_from_settings(&settings)?;
let mut file_format_options_ext =
FileFormatOptionsExt::create_from_settings(&settings, is_select)?;
file_format_options_ext.disable_variant_check =
stage_info.copy_options.disable_variant_check;
let on_error_mode = stage_info.copy_options.on_error.clone();
Expand Down Expand Up @@ -255,7 +257,7 @@ impl InputContext {
block_compact_thresholds: BlockThresholds,
) -> Result<Self> {
let read_batch_size = settings.get_input_read_buffer_size()? as usize;
let file_format_options_ext = FileFormatOptionsExt::create_from_settings(&settings)?;
let file_format_options_ext = FileFormatOptionsExt::create_from_settings(&settings, false)?;
let format = Self::get_input_format(&file_format_params)?;

let plan = StreamPlan {
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl CopyInterpreter {
pattern: None,
},
files_to_copy: None,
is_select: false,
};
let table = StageTable::try_create(stage_table_info)?;
append2table(
Expand Down
1 change: 1 addition & 0 deletions src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ common-settings = { path = "../settings" }
common-storage = { path = "../../common/storage" }
common-storages-parquet = { path = "../storages/parquet" }
common-storages-result-cache = { path = "../storages/result_cache" }
common-storages-stage = { path = "../storages/stage" }
common-storages-view = { path = "../storages/view" }
common-users = { path = "../users" }
data-mask-feature = { path = "../ee-features/data-mask" }
Expand Down
4 changes: 4 additions & 0 deletions src/query/sql/src/planner/binder/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl<'a> Binder {
files_info,
stage_info,
files_to_copy: None,
is_select: false,
},
values_consts: vec![],
required_source_schema: required_values_schema.clone(),
Expand Down Expand Up @@ -195,6 +196,7 @@ impl<'a> Binder {
files_info,
stage_info,
files_to_copy: None,
is_select: false,
},
values_consts: vec![],
required_source_schema: required_values_schema.clone(),
Expand Down Expand Up @@ -338,6 +340,7 @@ impl<'a> Binder {
files_info,
stage_info,
files_to_copy: None,
is_select: false,
},
write_mode: CopyIntoTableMode::Copy,
query: None,
Expand Down Expand Up @@ -456,6 +459,7 @@ impl<'a> Binder {
files_info,
stage_info,
files_to_copy: None,
is_select: false,
},
write_mode,
query: None,
Expand Down
77 changes: 49 additions & 28 deletions src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use common_ast::parser::tokenize_sql;
use common_ast::Dialect;
use common_catalog::catalog_kind::CATALOG_DEFAULT;
use common_catalog::plan::ParquetReadOptions;
use common_catalog::plan::StageTableInfo;
use common_catalog::table::ColumnStatistics;
use common_catalog::table::NavigationPoint;
use common_catalog::table::Table;
Expand All @@ -46,6 +47,9 @@ use common_expression::ColumnId;
use common_expression::ConstantFolder;
use common_expression::FunctionKind;
use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchema;
use common_functions::BUILTIN_FUNCTIONS;
use common_license::license_manager::get_license_manager;
use common_meta_app::principal::FileFormatParams;
Expand All @@ -60,6 +64,7 @@ use common_storages_parquet::ParquetTable;
use common_storages_result_cache::ResultCacheMetaManager;
use common_storages_result_cache::ResultCacheReader;
use common_storages_result_cache::ResultScan;
use common_storages_stage::StageTable;
use common_storages_view::view_table::QUERY;
use common_users::UserApiProvider;
use dashmap::DashMap;
Expand Down Expand Up @@ -523,39 +528,55 @@ impl Binder {
alias: &Option<TableAlias>,
files_to_copy: Option<Vec<StageFileInfo>>,
) -> Result<(SExpr, BindContext)> {
if matches!(stage_info.file_format_params, FileFormatParams::Parquet(..)) {
let read_options = ParquetReadOptions::default();
let table = match stage_info.file_format_params {
FileFormatParams::Parquet(..) => {
let read_options = ParquetReadOptions::default();

let table =
ParquetTable::create(stage_info.clone(), files_info, read_options, files_to_copy)
.await?;

let table_alias_name = if let Some(table_alias) = alias {
Some(normalize_identifier(&table_alias.name, &self.name_resolution_ctx).name)
} else {
None
};

let table_index = self.metadata.write().add_table(
CATALOG_DEFAULT.to_string(),
"system".to_string(),
table.clone(),
table_alias_name,
false,
);

let (s_expr, mut bind_context) = self
.bind_base_table(bind_context, "system", table_index)
.await?;
if let Some(alias) = alias {
bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?;
.await?
}
Ok((s_expr, bind_context))
FileFormatParams::NdJson(..) => {
let schema = Arc::new(TableSchema::new(vec![TableField::new(
"_$1", // TODO: this name should be in visible
TableDataType::Variant,
)]));
let info = StageTableInfo {
schema,
stage_info,
files_info,
files_to_copy: None,
is_select: true,
};
StageTable::try_create(info)?
}
_ => {
return Err(ErrorCode::Unimplemented(
"stage table function only support parquet/NDJson format for now",
));
}
};

let table_alias_name = if let Some(table_alias) = alias {
Some(normalize_identifier(&table_alias.name, &self.name_resolution_ctx).name)
} else {
Err(ErrorCode::Unimplemented(
"stage table function only support parquet format for now",
))
None
};

let table_index = self.metadata.write().add_table(
CATALOG_DEFAULT.to_string(),
"system".to_string(),
table.clone(),
table_alias_name,
false,
);

let (s_expr, mut bind_context) = self
.bind_base_table(bind_context, "system", table_index)
.await?;
if let Some(alias) = alias {
bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?;
}
Ok((s_expr, bind_context))
}

#[async_backtrace::framed]
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/stage/src/stage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ impl Table for StageTable {
ctx.get_scan_progress(),
compact_threshold,
on_error_map,
self.table_info.is_select,
)?);

input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?;
Expand Down
3 changes: 2 additions & 1 deletion src/query/storages/stage/src/stage_table_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl StageTableSink {
uuid: String,
group_id: usize,
) -> Result<ProcessorPtr> {
let mut options_ext = FileFormatOptionsExt::create_from_settings(&ctx.get_settings())?;
let mut options_ext =
FileFormatOptionsExt::create_from_settings(&ctx.get_settings(), false)?;
let output_format = options_ext.get_output_format(
table_info.schema(),
table_info.stage_info.file_format_params.clone(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{"a":false,"b":4,"c":4.4,"d":"gh","e":"2023-01-01","f":"2023-01-01 00:00:00","g":[10,11],"h":{"0":3,"1":"d"},"i":[1,2]}
{"a":false,"b":3,"c":3.3,"d":"ef","e":"2022-01-01","f":"2022-01-01 00:00:00","g":[7,8,9],"h":{"0":2,"1":"c"},"i":"xyz"}
{"a":true,"b":1,"c":1.1,"d":"ab","e":"2020-01-01","f":"2020-01-01 00:00:00","g":[1,2,3],"h":{"0":0,"1":"a"},"i":{"k":"v"}}
{"a":true,"b":2,"c":2.2,"d":"cd","e":"2021-01-01","f":"2021-01-01 00:00:00","g":[4,5,6],"h":{"0":1,"1":"b"},"i":123}
false
false
true
true
false 1.1
false 1.1
true 1.1
true 1.1
1.1
{"a":false,"b":3,"c":3.3,"d":"ef","e":"2022-01-01","f":"2022-01-01 00:00:00","g":[7,8,9],"h":{"0":2,"1":"c"},"i":"xyz"}
{"a":false,"b":4,"c":4.4,"d":"gh","e":"2023-01-01","f":"2023-01-01 00:00:00","g":[10,11],"h":{"0":3,"1":"d"},"i":[1,2]}
{"a":true,"b":1,"c":1.1,"d":"ab","e":"2020-01-01","f":"2020-01-01 00:00:00","g":[1,2,3],"h":{"0":0,"1":"a"},"i":{"k":"v"}}
{"a":true,"b":2,"c":2.2,"d":"cd","e":"2021-01-01","f":"2021-01-01 00:00:00","g":[4,5,6],"h":{"0":1,"1":"b"},"i":123}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../../shell_env.sh
DATADIR_PATH="$CURDIR/../../../../data/"

echo "drop stage if exists data_fs;" | $MYSQL_CLIENT_CONNECT
echo "create stage data_fs url = 'fs://$DATADIR_PATH' FILE_FORMAT = (type = NDJSON)" | $MYSQL_CLIENT_CONNECT
echo "drop table if exists t;" | $MYSQL_CLIENT_CONNECT
echo "create table t (a variant)" | $MYSQL_CLIENT_CONNECT
echo "insert into table t values (1.1)" | $MYSQL_CLIENT_CONNECT

echo "select \$1 from @data_fs (files=>('json_sample.ndjson')) order by \$1;" | $MYSQL_CLIENT_CONNECT

echo "select \$1:a as a from @data_fs (files=>('json_sample.ndjson')) order by a;" | $MYSQL_CLIENT_CONNECT

echo "select t2.\$1:a, a from @data_fs (files=>('json_sample.ndjson')) as t2, t order by t2.\$1:a;" | $MYSQL_CLIENT_CONNECT

echo "copy into t from (select \$1 from @data_fs t2) files=('json_sample.ndjson');" | $MYSQL_CLIENT_CONNECT
echo "select \$1 from t order by \$1" | $MYSQL_CLIENT_CONNECT

0 comments on commit dde523e

Please sign in to comment.