Skip to content

Commit

Permalink
fix: invalid data (Expected to have char ',', got 'Some(';')' at pos โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ31). (#12727)

* swallow the ; encoding

* add test case for values with trailing string

* extract the fast row reader

* extract FastValuesDecoder

* pass async fallback handler

* add constructor

* add the test folder for fast_values

* use fallback fn

* use async trait instead of fn

* factor out the code in pipeline_builder

* fix lint

* make lint

* make lint

* add one testcase

* add tests

* fix lint

* rename to parse_fallback
  • Loading branch information
flaneur2020 authored Sep 8, 2023
1 parent e122a1f commit 57b45ef
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 223 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ fn test_statement() {
r#"select 'stringwith"doublequote'"#,
r#"select '๐Ÿฆˆ'"#,
r#"insert into t (c1, c2) values (1, 2), (3, 4);"#,
r#"insert into t (c1, c2) values (1, 2); "#,
r#"insert into table t format json;"#,
r#"insert into table t select * from t2;"#,
r#"select parse_json('{"k1": [0, 1, 2]}').k1[0];"#,
Expand Down
41 changes: 41 additions & 0 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6691,6 +6691,47 @@ Insert(
)


---------- Input ----------
insert into t (c1, c2) values (1, 2);
---------- Output ---------
INSERT INTO t (c1, c2) VALUES (1, 2);
---------- AST ------------
Insert(
InsertStmt {
hints: None,
catalog: None,
database: None,
table: Identifier {
name: "t",
quote: None,
span: Some(
12..13,
),
},
columns: [
Identifier {
name: "c1",
quote: None,
span: Some(
15..17,
),
},
Identifier {
name: "c2",
quote: None,
span: Some(
19..21,
),
},
],
source: Values {
rest_str: "(1, 2); ",
},
overwrite: false,
},
)


---------- Input ----------
insert into table t format json;
---------- Output ---------
Expand Down
4 changes: 4 additions & 0 deletions src/query/formats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ doctest = false
test = false

[dependencies] # In alphabetical order
aho-corasick = { version = "1.0.1" }
async-trait = "0.1.57"
bstr = "1.0.1"
chrono-tz = { workspace = true }
lexical-core = "0.8.5"
match-template = "0.0.1"
micromarshal = "0.4.0"
num = "0.4.0"
once_cell = "1.15.0"
ordered-float = { workspace = true }
roaring = { version = "0.10.1", features = ["serde"] }
serde_json = { workspace = true }
Expand All @@ -35,6 +38,7 @@ storages-common-table-meta = { path = "../storages/common/table-meta" }

[dev-dependencies]
common-arrow = { path = "../../common/arrow" }
tokio = { workspace = true }

pretty_assertions = "1.3.0"

Expand Down
199 changes: 199 additions & 0 deletions src/query/formats/src/field_decoder/fast_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::io::BufRead;
use std::io::Cursor;
use std::ops::Not;

use aho_corasick::AhoCorasick;
use bstr::ByteSlice;
use common_arrow::arrow::bitmap::MutableBitmap;
use common_exception::ErrorCode;
Expand All @@ -38,6 +40,7 @@ use common_expression::types::NumberColumnBuilder;
use common_expression::with_decimal_type;
use common_expression::with_number_mapped_type;
use common_expression::ColumnBuilder;
use common_expression::Scalar;
use common_io::constants::FALSE_BYTES_LOWER;
use common_io::constants::INF_BYTES_LOWER;
use common_io::constants::NAN_BYTES_LOWER;
Expand All @@ -53,6 +56,7 @@ use common_io::prelude::FormatSettings;
use jsonb::parse_value;
use lexical_core::FromLexical;
use num::cast::AsPrimitive;
use once_cell::sync::Lazy;

use crate::CommonSettings;
use crate::FieldDecoder;
Expand Down Expand Up @@ -434,3 +438,198 @@ impl FastFieldDecoderValues {
Ok(())
}
}

pub struct FastValuesDecoder<'a> {
field_decoder: &'a FastFieldDecoderValues,
reader: Cursor<&'a [u8]>,
estimated_rows: usize,
positions: VecDeque<usize>,
}

#[async_trait::async_trait]
pub trait FastValuesDecodeFallback {
async fn parse_fallback(&self, sql: &str) -> Result<Vec<Scalar>>;
}

// Pre-generate the positions of `(`, `'` and `\`
static PATTERNS: &[&str] = &["(", "'", "\\"];

static INSERT_TOKEN_FINDER: Lazy<AhoCorasick> = Lazy::new(|| AhoCorasick::new(PATTERNS).unwrap());

impl<'a> FastValuesDecoder<'a> {
pub fn new(data: &'a str, field_decoder: &'a FastFieldDecoderValues) -> Self {
let mut estimated_rows = 0;
let mut positions = VecDeque::new();
for mat in INSERT_TOKEN_FINDER.find_iter(data) {
if mat.pattern() == 0.into() {
estimated_rows += 1;
continue;
}
positions.push_back(mat.start());
}
let reader = Cursor::new(data.as_bytes());
FastValuesDecoder {
reader,
estimated_rows,
positions,
field_decoder,
}
}

pub fn estimated_rows(&self) -> usize {
self.estimated_rows
}

pub async fn parse(
&mut self,
columns: &mut [ColumnBuilder],
fallback_fn: &impl FastValuesDecodeFallback,
) -> Result<()> {
for row in 0.. {
let _ = self.reader.ignore_white_spaces();
if self.reader.eof() {
break;
}

// Not the first row
if row != 0 {
if self.reader.ignore_byte(b';') {
break;
}
self.reader.must_ignore_byte(b',')?;
}

self.parse_next_row(columns, fallback_fn).await?;
}
Ok(())
}

async fn parse_next_row(
&mut self,
columns: &mut [ColumnBuilder],
fallback: &impl FastValuesDecodeFallback,
) -> Result<()> {
let _ = self.reader.ignore_white_spaces();
let col_size = columns.len();
let start_pos_of_row = self.reader.checkpoint();

// Start of the row --- '('
if !self.reader.ignore_byte(b'(') {
return Err(ErrorCode::BadDataValueType(
"Must start with parentheses".to_string(),
));
}
// Ignore the positions in the previous row.
while let Some(pos) = self.positions.front() {
if *pos < start_pos_of_row as usize {
self.positions.pop_front();
} else {
break;
}
}

for col_idx in 0..col_size {
let _ = self.reader.ignore_white_spaces();
let col_end = if col_idx + 1 == col_size { b')' } else { b',' };

let col = columns
.get_mut(col_idx)
.ok_or_else(|| ErrorCode::Internal("ColumnBuilder is None"))?;

let (need_fallback, pop_count) = self
.field_decoder
.read_field(col, &mut self.reader, &mut self.positions)
.map(|_| {
let _ = self.reader.ignore_white_spaces();
let need_fallback = self.reader.ignore_byte(col_end).not();
(need_fallback, col_idx + 1)
})
.unwrap_or((true, col_idx));

// ColumnBuilder and expr-parser both will eat the end ')' of the row.
if need_fallback {
for col in columns.iter_mut().take(pop_count) {
col.pop();
}
// rollback to start position of the row
self.reader.rollback(start_pos_of_row + 1);
skip_to_next_row(&mut self.reader, 1)?;
let end_pos_of_row = self.reader.position();

// Parse from expression and append all columns.
self.reader.set_position(start_pos_of_row);
let row_len = end_pos_of_row - start_pos_of_row;
let buf = &self.reader.remaining_slice()[..row_len as usize];

let sql = std::str::from_utf8(buf).unwrap();
let values = fallback.parse_fallback(sql).await?;

for (col, scalar) in columns.iter_mut().zip(values) {
col.push(scalar.as_ref());
}
self.reader.set_position(end_pos_of_row);
return Ok(());
}
}

Ok(())
}
}

// Values |(xxx), (yyy), (zzz)
pub fn skip_to_next_row<R: AsRef<[u8]>>(reader: &mut Cursor<R>, mut balance: i32) -> Result<()> {
let _ = reader.ignore_white_spaces();

let mut quoted = false;
let mut escaped = false;

while balance > 0 {
let buffer = reader.remaining_slice();
if buffer.is_empty() {
break;
}

let size = buffer.len();

let it = buffer
.iter()
.position(|&c| c == b'(' || c == b')' || c == b'\\' || c == b'\'');

if let Some(it) = it {
let c = buffer[it];
reader.consume(it + 1);

if it == 0 && escaped {
escaped = false;
continue;
}
escaped = false;

match c {
b'\\' => {
escaped = true;
continue;
}
b'\'' => {
quoted ^= true;
continue;
}
b')' => {
if !quoted {
balance -= 1;
}
}
b'(' => {
if !quoted {
balance += 1;
}
}
_ => {}
}
} else {
escaped = false;
reader.consume(size);
}
}
Ok(())
}
2 changes: 2 additions & 0 deletions src/query/formats/src/field_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use std::any::Any;

pub use csv::FieldDecoderCSV;
pub use fast_values::FastFieldDecoderValues;
pub use fast_values::FastValuesDecodeFallback;
pub use fast_values::FastValuesDecoder;
pub use json_ast::FieldJsonAstDecoder;
pub use row_based::FieldDecoderRowBased;
pub use tsv::FieldDecoderTSV;
Expand Down
Loading

1 comment on commit 57b45ef

@vercel
Copy link

@vercel vercel bot commented on 57b45ef Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend โ€“ ./

databend-databend.vercel.app
databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app

Please sign in to comment.