diff --git a/Cargo.lock b/Cargo.lock index 8cb119d38969d..ef46c2499f8dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1982,6 +1982,8 @@ dependencies = [ name = "common-formats" version = "0.1.0" dependencies = [ + "aho-corasick", + "async-trait", "bstr 1.6.2", "chrono-tz", "common-arrow", @@ -1995,12 +1997,14 @@ dependencies = [ "match-template", "micromarshal", "num", + "once_cell", "ordered-float 3.7.0", "pretty_assertions", "roaring", "serde_json", "storages-common-blocks", "storages-common-table-meta", + "tokio", ] [[package]] @@ -3773,7 +3777,6 @@ name = "databend-query" version = "0.1.0" dependencies = [ "aggregating-index", - "aho-corasick", "arrow-array", "arrow-cast", "arrow-flight", diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index aa72ee0af10b1..70b74deaf07b1 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -163,6 +163,7 @@ fn test_statement() { r#"select 'stringwith"doublequote'"#, r#"select '🦈'"#, r#"insert into t (c1, c2) values (1, 2), (3, 4);"#, + r#"insert into t (c1, c2) values (1, 2); "#, r#"insert into table t format json;"#, r#"insert into table t select * from t2;"#, r#"select parse_json('{"k1": [0, 1, 2]}').k1[0];"#, diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index 2aff0f7fdee27..2f537e497a66d 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -6691,6 +6691,47 @@ Insert( ) +---------- Input ---------- +insert into t (c1, c2) values (1, 2); +---------- Output --------- +INSERT INTO t (c1, c2) VALUES (1, 2); +---------- AST ------------ +Insert( + InsertStmt { + hints: None, + catalog: None, + database: None, + table: Identifier { + name: "t", + quote: None, + span: Some( + 12..13, + ), + }, + columns: [ + Identifier { + name: "c1", + quote: None, + span: Some( + 15..17, + ), + }, + Identifier { + name: "c2", + quote: None, + span: Some( + 19..21, + ), + }, + ], + source: Values { + rest_str: "(1, 2); ", + }, + overwrite: false, + }, +) + + ---------- Input ---------- insert into table t format json; ---------- Output --------- diff --git a/src/query/formats/Cargo.toml b/src/query/formats/Cargo.toml index a58d2f1db923a..248abb0939197 100644 --- a/src/query/formats/Cargo.toml +++ b/src/query/formats/Cargo.toml @@ -11,12 +11,15 @@ doctest = false test = false [dependencies] # In alphabetical order +aho-corasick = { version = "1.0.1" } +async-trait = "0.1.57" bstr = "1.0.1" chrono-tz = { workspace = true } lexical-core = "0.8.5" match-template = "0.0.1" micromarshal = "0.4.0" num = "0.4.0" +once_cell = "1.15.0" ordered-float = { workspace = true } roaring = { version = "0.10.1", features = ["serde"] } serde_json = { workspace = true } @@ -35,6 +38,7 @@ storages-common-table-meta = { path = "../storages/common/table-meta" } [dev-dependencies] common-arrow = { path = "../../common/arrow" } +tokio = { workspace = true } pretty_assertions = "1.3.0" diff --git a/src/query/formats/src/field_decoder/fast_values.rs b/src/query/formats/src/field_decoder/fast_values.rs index f42786b3e8e59..87f655f3bce0b 100644 --- a/src/query/formats/src/field_decoder/fast_values.rs +++ b/src/query/formats/src/field_decoder/fast_values.rs @@ -17,7 +17,9 @@ use std::collections::HashSet; use std::collections::VecDeque; use std::io::BufRead; use std::io::Cursor; +use std::ops::Not; +use aho_corasick::AhoCorasick; use bstr::ByteSlice; use common_arrow::arrow::bitmap::MutableBitmap; use common_exception::ErrorCode; @@ -38,6 +40,7 @@ use common_expression::types::NumberColumnBuilder; use common_expression::with_decimal_type; use common_expression::with_number_mapped_type; use common_expression::ColumnBuilder; +use common_expression::Scalar; use common_io::constants::FALSE_BYTES_LOWER; use common_io::constants::INF_BYTES_LOWER; use common_io::constants::NAN_BYTES_LOWER; @@ -53,6 +56,7 @@ use common_io::prelude::FormatSettings; use jsonb::parse_value; use lexical_core::FromLexical; use num::cast::AsPrimitive; +use once_cell::sync::Lazy; use crate::CommonSettings; use crate::FieldDecoder; @@ -434,3 +438,198 @@ impl FastFieldDecoderValues { Ok(()) } } + +pub struct FastValuesDecoder<'a> { + field_decoder: &'a FastFieldDecoderValues, + reader: Cursor<&'a [u8]>, + estimated_rows: usize, + positions: VecDeque, +} + +#[async_trait::async_trait] +pub trait FastValuesDecodeFallback { + async fn parse_fallback(&self, sql: &str) -> Result>; +} + +// Pre-generate the positions of `(`, `'` and `\` +static PATTERNS: &[&str] = &["(", "'", "\\"]; + +static INSERT_TOKEN_FINDER: Lazy = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap()); + +impl<'a> FastValuesDecoder<'a> { + pub fn new(data: &'a str, field_decoder: &'a FastFieldDecoderValues) -> Self { + let mut estimated_rows = 0; + let mut positions = VecDeque::new(); + for mat in INSERT_TOKEN_FINDER.find_iter(data) { + if mat.pattern() == 0.into() { + estimated_rows += 1; + continue; + } + positions.push_back(mat.start()); + } + let reader = Cursor::new(data.as_bytes()); + FastValuesDecoder { + reader, + estimated_rows, + positions, + field_decoder, + } + } + + pub fn estimated_rows(&self) -> usize { + self.estimated_rows + } + + pub async fn parse( + &mut self, + columns: &mut [ColumnBuilder], + fallback_fn: &impl FastValuesDecodeFallback, + ) -> Result<()> { + for row in 0.. { + let _ = self.reader.ignore_white_spaces(); + if self.reader.eof() { + break; + } + + // Not the first row + if row != 0 { + if self.reader.ignore_byte(b';') { + break; + } + self.reader.must_ignore_byte(b',')?; + } + + self.parse_next_row(columns, fallback_fn).await?; + } + Ok(()) + } + + async fn parse_next_row( + &mut self, + columns: &mut [ColumnBuilder], + fallback: &impl FastValuesDecodeFallback, + ) -> Result<()> { + let _ = self.reader.ignore_white_spaces(); + let col_size = columns.len(); + let start_pos_of_row = self.reader.checkpoint(); + + // Start of the row --- '(' + if !self.reader.ignore_byte(b'(') { + return Err(ErrorCode::BadDataValueType( + "Must start with parentheses".to_string(), + )); + } + // Ignore the positions in the previous row. + while let Some(pos) = self.positions.front() { + if *pos < start_pos_of_row as usize { + self.positions.pop_front(); + } else { + break; + } + } + + for col_idx in 0..col_size { + let _ = self.reader.ignore_white_spaces(); + let col_end = if col_idx + 1 == col_size { b')' } else { b',' }; + + let col = columns + .get_mut(col_idx) + .ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?; + + let (need_fallback, pop_count) = self + .field_decoder + .read_field(col, &mut self.reader, &mut self.positions) + .map(|_| { + let _ = self.reader.ignore_white_spaces(); + let need_fallback = self.reader.ignore_byte(col_end).not(); + (need_fallback, col_idx + 1) + }) + .unwrap_or((true, col_idx)); + + // ColumnBuilder and expr-parser both will eat the end ')' of the row. + if need_fallback { + for col in columns.iter_mut().take(pop_count) { + col.pop(); + } + // rollback to start position of the row + self.reader.rollback(start_pos_of_row + 1); + skip_to_next_row(&mut self.reader, 1)?; + let end_pos_of_row = self.reader.position(); + + // Parse from expression and append all columns. + self.reader.set_position(start_pos_of_row); + let row_len = end_pos_of_row - start_pos_of_row; + let buf = &self.reader.remaining_slice()[..row_len as usize]; + + let sql = std::str::from_utf8(buf).unwrap(); + let values = fallback.parse_fallback(sql).await?; + + for (col, scalar) in columns.iter_mut().zip(values) { + col.push(scalar.as_ref()); + } + self.reader.set_position(end_pos_of_row); + return Ok(()); + } + } + + Ok(()) + } +} + +// Values |(xxx), (yyy), (zzz) +pub fn skip_to_next_row>(reader: &mut Cursor, mut balance: i32) -> Result<()> { + let _ = reader.ignore_white_spaces(); + + let mut quoted = false; + let mut escaped = false; + + while balance > 0 { + let buffer = reader.remaining_slice(); + if buffer.is_empty() { + break; + } + + let size = buffer.len(); + + let it = buffer + .iter() + .position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\''); + + if let Some(it) = it { + let c = buffer[it]; + reader.consume(it + 1); + + if it == 0 && escaped { + escaped = false; + continue; + } + escaped = false; + + match c { + b'\\' => { + escaped = true; + continue; + } + b'\'' => { + quoted ^= true; + continue; + } + b')' => { + if !quoted { + balance -= 1; + } + } + b'(' => { + if !quoted { + balance += 1; + } + } + _ => {} + } + } else { + escaped = false; + reader.consume(size); + } + } + Ok(()) +} diff --git a/src/query/formats/src/field_decoder/mod.rs b/src/query/formats/src/field_decoder/mod.rs index 6176e4cf59e8e..573dfc1d892af 100644 --- a/src/query/formats/src/field_decoder/mod.rs +++ b/src/query/formats/src/field_decoder/mod.rs @@ -24,6 +24,8 @@ use std::any::Any; pub use csv::FieldDecoderCSV; pub use fast_values::FastFieldDecoderValues; +pub use fast_values::FastValuesDecodeFallback; +pub use fast_values::FastValuesDecoder; pub use json_ast::FieldJsonAstDecoder; pub use row_based::FieldDecoderRowBased; pub use tsv::FieldDecoderTSV; diff --git a/src/query/formats/tests/it/field_decoder/fast_values.rs b/src/query/formats/tests/it/field_decoder/fast_values.rs new file mode 100644 index 0000000000000..af8c5951f7cfd --- /dev/null +++ b/src/query/formats/tests/it/field_decoder/fast_values.rs @@ -0,0 +1,135 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::types::DataType; +use common_expression::types::NumberDataType; +use common_expression::ColumnBuilder; +use common_expression::DataBlock; +use common_expression::Scalar; +use common_formats::FastFieldDecoderValues; +use common_formats::FastValuesDecodeFallback; +use common_formats::FastValuesDecoder; +use common_io::prelude::FormatSettings; + +struct DummyFastValuesDecodeFallback {} + +#[async_trait::async_trait] +impl FastValuesDecodeFallback for DummyFastValuesDecodeFallback { + async fn parse_fallback(&self, _data: &str) -> Result> { + Err(ErrorCode::Unimplemented("fallback".to_string())) + } +} + +#[tokio::test] +async fn test_fast_values_decoder_multi() -> Result<()> { + struct Test { + data: &'static str, + column_types: Vec, + output: Result<&'static str>, + } + + let tests = vec![ + Test { + data: "(0, 1, 2), (3,4,5)", + column_types: vec![ + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + ], + output: Ok( + "+----------+----------+----------+\n| Column 0 | Column 1 | Column 2 |\n+----------+----------+----------+\n| 0 | 1 | 2 |\n| 3 | 4 | 5 |\n+----------+----------+----------+", + ), + }, + Test { + data: "(0, 1, 2), (3,4,5), ", + column_types: vec![ + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + ], + output: Err(ErrorCode::BadDataValueType( + "Must start with parentheses".to_string(), + )), + }, + Test { + data: "('', '', '')", + column_types: vec![ + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + ], + output: Err(ErrorCode::Unimplemented("fallback".to_string())), + }, + Test { + data: "( 1, '', '2022-10-01')", + column_types: vec![ + DataType::Number(NumberDataType::Int16), + DataType::String, + DataType::Date, + ], + output: Ok( + "+----------+----------+--------------+\n| Column 0 | Column 1 | Column 2 |\n+----------+----------+--------------+\n| 1 | '' | '2022-10-01' |\n+----------+----------+--------------+", + ), + }, + Test { + data: "(1, 2, 3), (1, 1, 1), (1, 1, 1);", + column_types: vec![ + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + ], + output: Ok( + "+----------+----------+----------+\n| Column 0 | Column 1 | Column 2 |\n+----------+----------+----------+\n| 1 | 2 | 3 |\n| 1 | 1 | 1 |\n| 1 | 1 | 1 |\n+----------+----------+----------+", + ), + }, + Test { + data: "(1, 2, 3), (1, 1, 1), (1, 1, 1); ", + column_types: vec![ + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + DataType::Number(NumberDataType::Int16), + ], + output: Ok( + "+----------+----------+----------+\n| Column 0 | Column 1 | Column 2 |\n+----------+----------+----------+\n| 1 | 2 | 3 |\n| 1 | 1 | 1 |\n| 1 | 1 | 1 |\n+----------+----------+----------+", + ), + }, + ]; + + for tt in tests { + let field_decoder = FastFieldDecoderValues::create_for_insert(FormatSettings::default()); + let mut values_decoder = FastValuesDecoder::new(tt.data, &field_decoder); + let fallback = DummyFastValuesDecodeFallback {}; + let mut columns = tt + .column_types + .into_iter() + .map(|dt| ColumnBuilder::with_capacity(&dt, values_decoder.estimated_rows())) + .collect::>(); + let result = values_decoder.parse(&mut columns, &fallback).await; + match tt.output { + Err(err) => { + assert!(result.is_err()); + assert_eq!(err.to_string(), result.unwrap_err().to_string()) + } + Ok(want) => { + let columns = columns.into_iter().map(|cb| cb.build()).collect::>(); + let got = DataBlock::new_from_columns(columns); + assert!(result.is_ok(), "{:?}", result); + assert_eq!(got.to_string(), want.to_string()) + } + } + } + Ok(()) +} diff --git a/src/query/formats/tests/it/field_decoder/mod.rs b/src/query/formats/tests/it/field_decoder/mod.rs new file mode 100644 index 0000000000000..a18d591e43458 --- /dev/null +++ b/src/query/formats/tests/it/field_decoder/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod fast_values; diff --git a/src/query/formats/tests/it/main.rs b/src/query/formats/tests/it/main.rs index 3324a2121920a..e59cc9149a119 100644 --- a/src/query/formats/tests/it/main.rs +++ b/src/query/formats/tests/it/main.rs @@ -19,6 +19,7 @@ use common_formats::ClickhouseFormatType; use common_formats::FileFormatOptionsExt; use common_settings::Settings; +mod field_decoder; mod field_encoder; mod output_format_json_each_row; mod output_format_tcsv; diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 10fdc575fdcb3..329e38a8208bf 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -102,7 +102,6 @@ virtual-column = { path = "../ee-features/virtual-column" } # GitHub dependencies # Crates.io dependencies -aho-corasick = { version = "1.0.1" } arrow-array = { version = "46.0.0" } arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental", "tls"] } arrow-ipc = { version = "46.0.0" } diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index e5977e30e78df..08b782f2f82de 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -13,16 +13,11 @@ // limitations under the License. use std::collections::HashMap; -use std::collections::VecDeque; use std::convert::TryFrom; -use std::io::BufRead; -use std::io::Cursor; -use std::ops::Not; use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; -use aho_corasick::AhoCorasick; use async_channel::Receiver; use common_ast::parser::parse_comma_separated_exprs; use common_ast::parser::tokenize_sql; @@ -42,13 +37,14 @@ use common_expression::DataSchema; use common_expression::DataSchemaRef; use common_expression::FunctionContext; use common_expression::HashMethodKind; +use common_expression::Scalar; use common_expression::SortColumnDescription; use common_formats::FastFieldDecoderValues; +use common_formats::FastValuesDecodeFallback; +use common_formats::FastValuesDecoder; use common_functions::aggregates::AggregateFunctionFactory; use common_functions::aggregates::AggregateFunctionRef; use common_functions::BUILTIN_FUNCTIONS; -use common_io::cursor_ext::ReadBytesExt; -use common_io::cursor_ext::ReadCheckPointExt; use common_pipeline_core::pipe::Pipe; use common_pipeline_core::pipe::PipeItem; use common_pipeline_core::processors::port::InputPort; @@ -126,7 +122,6 @@ use common_storages_fuse::operations::FillInternalColumnProcessor; use common_storages_fuse::operations::TransformSerializeBlock; use common_storages_fuse::FuseTable; use common_storages_stage::StageTable; -use once_cell::sync::Lazy; use parking_lot::RwLock; use super::processors::transforms::FrameBound; @@ -2126,11 +2121,6 @@ impl PipelineBuilder { } } -// Pre-generate the positions of `(`, `'` and `\` -static PATTERNS: &[&str] = &["(", "'", "\\"]; - -static INSERT_TOKEN_FINDER: Lazy = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap()); - pub struct ValueSource { data: String, ctx: Arc, @@ -2153,26 +2143,54 @@ impl AsyncSource for ValueSource { return Ok(None); } - // Use the number of '(' to estimate the number of rows - let mut estimated_rows = 0; - let mut positions = VecDeque::new(); - for mat in INSERT_TOKEN_FINDER.find_iter(&self.data) { - if mat.pattern() == 0.into() { - estimated_rows += 1; - continue; - } - positions.push_back(mat.start()); - } + let format = self.ctx.get_format_settings()?; + let field_decoder = FastFieldDecoderValues::create_for_insert(format); - let mut reader = Cursor::new(self.data.as_bytes()); - let block = self - .read(estimated_rows, &mut reader, &mut positions) - .await?; + let mut values_decoder = FastValuesDecoder::new(&self.data, &field_decoder); + let estimated_rows = values_decoder.estimated_rows(); + + let mut columns = self + .schema + .fields() + .iter() + .map(|f| ColumnBuilder::with_capacity(f.data_type(), estimated_rows)) + .collect::>(); + + values_decoder.parse(&mut columns, self).await?; + + let columns = columns + .into_iter() + .map(|col| col.build()) + .collect::>(); + let block = DataBlock::new_from_columns(columns); self.is_finished = true; Ok(Some(block)) } } +#[async_trait::async_trait] +impl FastValuesDecodeFallback for ValueSource { + async fn parse_fallback(&self, sql: &str) -> Result> { + let settings = self.ctx.get_settings(); + let sql_dialect = settings.get_sql_dialect()?; + let tokens = tokenize_sql(sql)?; + let mut bind_context = self.bind_context.clone(); + let metadata = self.metadata.clone(); + + let exprs = parse_comma_separated_exprs(&tokens[1..tokens.len()], sql_dialect)?; + let values = bind_context + .exprs_to_scalar( + exprs, + &self.schema, + self.ctx.clone(), + &self.name_resolution_ctx, + metadata, + ) + .await?; + Ok(values) + } +} + impl ValueSource { pub fn new( data: String, @@ -2193,198 +2211,4 @@ impl ValueSource { is_finished: false, } } - - #[async_backtrace::framed] - pub async fn read>( - &self, - estimated_rows: usize, - reader: &mut Cursor, - positions: &mut VecDeque, - ) -> Result { - let mut columns = self - .schema - .fields() - .iter() - .map(|f| ColumnBuilder::with_capacity(f.data_type(), estimated_rows)) - .collect::>(); - - let mut bind_context = self.bind_context.clone(); - - let format = self.ctx.get_format_settings()?; - let field_decoder = FastFieldDecoderValues::create_for_insert(format); - - for row in 0.. { - let _ = reader.ignore_white_spaces(); - if reader.eof() { - break; - } - // Not the first row - if row != 0 { - reader.must_ignore_byte(b',')?; - } - - self.parse_next_row( - &field_decoder, - reader, - &mut columns, - positions, - &mut bind_context, - self.metadata.clone(), - ) - .await?; - } - - let columns = columns - .into_iter() - .map(|col| col.build()) - .collect::>(); - Ok(DataBlock::new_from_columns(columns)) - } - - /// Parse single row value, like ('111', 222, 1 + 1) - #[async_backtrace::framed] - async fn parse_next_row>( - &self, - field_decoder: &FastFieldDecoderValues, - reader: &mut Cursor, - columns: &mut [ColumnBuilder], - positions: &mut VecDeque, - bind_context: &mut BindContext, - metadata: MetadataRef, - ) -> Result<()> { - let _ = reader.ignore_white_spaces(); - let col_size = columns.len(); - let start_pos_of_row = reader.checkpoint(); - - // Start of the row --- '(' - if !reader.ignore_byte(b'(') { - return Err(ErrorCode::BadDataValueType( - "Must start with parentheses".to_string(), - )); - } - // Ignore the positions in the previous row. - while let Some(pos) = positions.front() { - if *pos < start_pos_of_row as usize { - positions.pop_front(); - } else { - break; - } - } - - for col_idx in 0..col_size { - let _ = reader.ignore_white_spaces(); - let col_end = if col_idx + 1 == col_size { b')' } else { b',' }; - - let col = columns - .get_mut(col_idx) - .ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?; - - let (need_fallback, pop_count) = field_decoder - .read_field(col, reader, positions) - .map(|_| { - let _ = reader.ignore_white_spaces(); - let need_fallback = reader.ignore_byte(col_end).not(); - (need_fallback, col_idx + 1) - }) - .unwrap_or((true, col_idx)); - - // ColumnBuilder and expr-parser both will eat the end ')' of the row. - if need_fallback { - for col in columns.iter_mut().take(pop_count) { - col.pop(); - } - // rollback to start position of the row - reader.rollback(start_pos_of_row + 1); - skip_to_next_row(reader, 1)?; - let end_pos_of_row = reader.position(); - - // Parse from expression and append all columns. - reader.set_position(start_pos_of_row); - let row_len = end_pos_of_row - start_pos_of_row; - let buf = &reader.remaining_slice()[..row_len as usize]; - - let sql = std::str::from_utf8(buf).unwrap(); - let settings = self.ctx.get_settings(); - let sql_dialect = settings.get_sql_dialect()?; - let tokens = tokenize_sql(sql)?; - let exprs = parse_comma_separated_exprs(&tokens[1..tokens.len()], sql_dialect)?; - - let values = bind_context - .exprs_to_scalar( - exprs, - &self.schema, - self.ctx.clone(), - &self.name_resolution_ctx, - metadata, - ) - .await?; - - for (col, scalar) in columns.iter_mut().zip(values) { - col.push(scalar.as_ref()); - } - reader.set_position(end_pos_of_row); - return Ok(()); - } - } - - Ok(()) - } -} - -// Values |(xxx), (yyy), (zzz) -pub fn skip_to_next_row>(reader: &mut Cursor, mut balance: i32) -> Result<()> { - let _ = reader.ignore_white_spaces(); - - let mut quoted = false; - let mut escaped = false; - - while balance > 0 { - let buffer = reader.remaining_slice(); - if buffer.is_empty() { - break; - } - - let size = buffer.len(); - - let it = buffer - .iter() - .position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\''); - - if let Some(it) = it { - let c = buffer[it]; - reader.consume(it + 1); - - if it == 0 && escaped { - escaped = false; - continue; - } - escaped = false; - - match c { - b'\\' => { - escaped = true; - continue; - } - b'\'' => { - quoted ^= true; - continue; - } - b')' => { - if !quoted { - balance -= 1; - } - } - b'(' => { - if !quoted { - balance += 1; - } - } - _ => {} - } - } else { - escaped = false; - reader.consume(size); - } - } - Ok(()) }