Skip to content

Commit

Permalink
feat(tianmu): support in-completed column data during load data. (#1209)
Browse files Browse the repository at this point in the history
  • Loading branch information
lujiashun committed Feb 4, 2023
1 parent 5dce241 commit 66920b9
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 15 deletions.
31 changes: 31 additions & 0 deletions mysql-test/suite/tianmu/r/issue1209.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
DROP DATABASE IF EXISTS issue1209_test;
CREATE DATABASE issue1209_test;
USE issue1209_test;
CREATE TABLE `t1` (
`id` int(11) DEFAULT NULL,
`a` char(20) DEFAULT NULL
) ENGINE=tianmu DEFAULT CHARSET=utf8mb4;
LOAD DATA LOCAL infile 'MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-1.txt' into table t1 FIELDS TERMINATED BY ',' LINES TERMINATED BY ';';
Warnings:
Warning 1366 Incorrect integer value: '
' for column 'id' at row 4
select * from t1;
id a
1 chai
2 测试
3 chayicha
0 NULL
truncate table t1;
LOAD DATA LOCAL infile 'MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-2.txt' into table t1 FIELDS TERMINATED BY ',' enclosed by '"' LINES TERMINATED BY ';';
Warnings:
Warning 1265 Data truncated for column 'id' at row 5
Warning 1265 Data truncated for column 'id' at row 5
select * from t1;
id a
1 chai
2 测试
3 chayicha
NULL NULL
5 ";

DROP DATABASE issue1209_test;
1 change: 1 addition & 0 deletions mysql-test/suite/tianmu/std_data/issue1209-1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,chai;2,测试;3,chayicha;
1 change: 1 addition & 0 deletions mysql-test/suite/tianmu/std_data/issue1209-2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"1","chai";"2","测试";"3","chayicha";;"5";"4",";
26 changes: 26 additions & 0 deletions mysql-test/suite/tianmu/t/issue1209.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
--source include/have_tianmu.inc

--disable_warnings
DROP DATABASE IF EXISTS issue1209_test;
--enable_warnings

CREATE DATABASE issue1209_test;

USE issue1209_test;

CREATE TABLE `t1` (
`id` int(11) DEFAULT NULL,
`a` char(20) DEFAULT NULL
) ENGINE=tianmu DEFAULT CHARSET=utf8mb4;

--replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR
eval LOAD DATA LOCAL infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-1.txt' into table t1 FIELDS TERMINATED BY ',' LINES TERMINATED BY ';';
select * from t1;

truncate table t1;
--replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR
eval LOAD DATA LOCAL infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-2.txt' into table t1 FIELDS TERMINATED BY ',' enclosed by '"' LINES TERMINATED BY ';';
select * from t1;


DROP DATABASE issue1209_test;
16 changes: 11 additions & 5 deletions storage/tianmu/loader/load_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ bool LoadParser::MakeRow(std::vector<ValueCache> &value_buffers) {
int errorinfo;

bool cont = true;
bool eof = false;
while (cont) {
switch (strategy_->GetOneRow(cur_ptr_, buf_end_ - cur_ptr_, value_buffers, rowsize, errorinfo)) {
switch (strategy_->GetOneRow(cur_ptr_, buf_end_ - cur_ptr_, value_buffers, rowsize, errorinfo, eof)) {
case ParsingStrategy::ParseResult::EOB:
if (mysql_bin_log.is_open())
binlog_loaded_block(read_buffer_.Buf(), cur_ptr_);
Expand All @@ -98,10 +99,15 @@ bool LoadParser::MakeRow(std::vector<ValueCache> &value_buffers) {
buf_end_ = cur_ptr_ + read_buffer_.BufSize();
} else {
// reaching the end of the buffer
if (cur_ptr_ != buf_end_)
rejecter_.ConsumeBadRow(cur_ptr_, buf_end_ - cur_ptr_, cur_row_ + 1, errorinfo == -1 ? -1 : errorinfo + 1);
cur_row_++;
cont = false;
if (cur_ptr_ != buf_end_) {
// rejecter_.ConsumeBadRow(cur_ptr_, buf_end_ - cur_ptr_, cur_row_ + 1, errorinfo == -1 ? -1 : errorinfo +
// 1);
// do not cousume the row,take this as the normal line
eof = true;
} else {
cur_row_++;
cont = false;
}
}
break;

Expand Down
23 changes: 15 additions & 8 deletions storage/tianmu/loader/parsing_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@ class Field_tmp_nullability_guard {

void ParsingStrategy::ReadField(const char *&ptr, const char *&val_beg, Item *&item, uint &index_of_field,
std::vector<std::pair<const char *, size_t>> &vec_ptr_field,
uint &field_index_in_field_list, const CHARSET_INFO *char_info) {
uint &field_index_in_field_list, const CHARSET_INFO *char_info, bool completed_row) {
bool is_enclosed = false;
char *val_start{nullptr};
size_t val_len{0};

if (string_qualifier_ && *val_beg == string_qualifier_) {
if (string_qualifier_ && *val_beg == string_qualifier_ && completed_row) {
// first char is enclose char, skip it
val_start = const_cast<char *>(val_beg) + 1;
// skip the first and the last char which is encolose char
Expand Down Expand Up @@ -383,8 +383,8 @@ void ParsingStrategy::ReadField(const char *&ptr, const char *&val_beg, Item *&i
}

ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, size_t size,
std::vector<ValueCache> &record, uint &rowsize,
int &errorinfo) {
std::vector<ValueCache> &record, uint &rowsize, int &errorinfo,
bool eof) {
const char *buf_end = buf + size;
if (!prepared_) {
GetEOL(buf, buf_end);
Expand Down Expand Up @@ -449,10 +449,15 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s
if (string_qualifier_ && *ptr == string_qualifier_) {
row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_delimiter_, kmp_next_enclose_delimiter_);
++ptr;
if (row_incomplete && eof) {
row_incomplete = false;
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info, false);
continue;
}
} else {
SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, delimiter_, kmp_next_delimiter_);
if (res == SearchResult::END_OF_LINE) {
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info);
if (res == SearchResult::END_OF_LINE || eof) {
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info, false);
continue;
}
row_incomplete = (res == SearchResult::END_OF_BUFFER);
Expand Down Expand Up @@ -480,12 +485,14 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s

if (!row_incomplete) {
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info);
ptr += terminator_.size();
} else if (eof) {
ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info, false);
row_incomplete = false;
} else {
errorinfo = index;
goto end;
}

ptr += terminator_.size();
}

if (thd_->killed) {
Expand Down
4 changes: 2 additions & 2 deletions storage/tianmu/loader/parsing_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ class ParsingStrategy final {
ParsingStrategy(const system::IOParameters &iop, std::vector<uchar> columns_collations);
~ParsingStrategy() {}
ParseResult GetOneRow(const char *const buf, size_t size, std::vector<ValueCache> &values, uint &rowsize,
int &errorinfo);
int &errorinfo, bool eof = false);
void ReadField(const char *&ptr, const char *&val_beg, Item *&item, uint &index_of_field,
std::vector<std::pair<const char *, size_t>> &vec_ptr_field, uint &field_index_in_field_list,
const CHARSET_INFO *char_info);
const CHARSET_INFO *char_info, bool completed_row = true);
void SetTHD(THD *thd) { thd_ = thd; }
THD *GetTHD() const { return thd_; };

Expand Down

0 comments on commit 66920b9

Please sign in to comment.