From abd134faeace7e2685d27f169c3709ea1948d831 Mon Sep 17 00:00:00 2001 From: lujiashun Date: Fri, 3 Feb 2023 19:49:36 +0800 Subject: [PATCH] feat(tianmu): support in-completed column data during load data. (#1209) [summary] 1. support few column; 2. support too many column; 3. support the data of column is not completed,such as 3.1 such as start with the enclosed char, finished without enclosed char; 3.2 without line-termination string, but reach to the end of file; --- mysql-test/suite/tianmu/r/issue1209.result | 45 +++++++++ mysql-test/suite/tianmu/r/issue1263.result | 2 +- .../suite/tianmu/std_data/issue1209-1.txt | 1 + .../suite/tianmu/std_data/issue1209-2.txt | 1 + mysql-test/suite/tianmu/t/issue1209.test | 27 +++++ mysql-test/suite/tianmu/t/issue1263.test | 2 +- storage/tianmu/loader/load_parser.cpp | 17 +++- storage/tianmu/loader/parsing_strategy.cpp | 98 ++++++++++++------- storage/tianmu/loader/parsing_strategy.h | 6 +- 9 files changed, 155 insertions(+), 44 deletions(-) create mode 100644 mysql-test/suite/tianmu/r/issue1209.result create mode 100644 mysql-test/suite/tianmu/std_data/issue1209-1.txt create mode 100644 mysql-test/suite/tianmu/std_data/issue1209-2.txt create mode 100644 mysql-test/suite/tianmu/t/issue1209.test diff --git a/mysql-test/suite/tianmu/r/issue1209.result b/mysql-test/suite/tianmu/r/issue1209.result new file mode 100644 index 000000000..319ebb1e0 --- /dev/null +++ b/mysql-test/suite/tianmu/r/issue1209.result @@ -0,0 +1,45 @@ +DROP DATABASE IF EXISTS issue1209_test; +CREATE DATABASE issue1209_test; +USE issue1209_test; +CREATE TABLE `t1_tianmu` ( +`id` int(11) DEFAULT NULL, +`a` char(20) DEFAULT NULL, +`id2` int(11) DEFAULT NULL +) ENGINE=tianmu DEFAULT CHARSET=utf8mb4; +LOAD DATA LOCAL infile 'MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-1.txt' into table t1_tianmu FIELDS TERMINATED BY ',' LINES TERMINATED BY ';'; +Warnings: +Warning 1262 Row 2 was truncated; it contained more data than there were input columns +Warning 1261 Row 3 doesn't contain data for all columns +Warning 1261 Row 4 doesn't contain data for all columns +Warning 1261 Row 4 doesn't contain data for all columns +Warning 1366 Incorrect integer value: ' +' for column 'id' at row 5 +Warning 1261 Row 5 doesn't contain data for all columns +Warning 1261 Row 5 doesn't contain data for all columns +select * from t1_tianmu; +id a id2 +1 chai 6 +2 测试 3 +3 chayicha NULL +NULL NULL NULL +0 NULL NULL +truncate table t1_tianmu; +LOAD DATA LOCAL infile 'MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-2.txt' into table t1_tianmu FIELDS TERMINATED BY ',' enclosed by '"' LINES TERMINATED BY ';'; +Warnings: +Warning 1262 Row 2 was truncated; it contained more data than there were input columns +Warning 1261 Row 3 doesn't contain data for all columns +Warning 1261 Row 4 doesn't contain data for all columns +Warning 1261 Row 4 doesn't contain data for all columns +Warning 1261 Row 5 doesn't contain data for all columns +Warning 1261 Row 5 doesn't contain data for all columns +Warning 1261 Row 6 doesn't contain data for all columns +select * from t1_tianmu; +id a id2 +1 chai 7 +2 测试 8 +3 chayicha NULL +NULL NULL NULL +5 NULL NULL +4 "; + NULL +DROP DATABASE issue1209_test; diff --git a/mysql-test/suite/tianmu/r/issue1263.result b/mysql-test/suite/tianmu/r/issue1263.result index 63a3cbc0a..5c1becc77 100644 --- a/mysql-test/suite/tianmu/r/issue1263.result +++ b/mysql-test/suite/tianmu/r/issue1263.result @@ -113,7 +113,7 @@ CREATIONDATE=NULLif(@CREATIONDATE,''), MODIFIEDDATE=NULLif(@MODIFIEDDATE,''), ISACTIVE=NULLif(@ISACTIVE,'') ; -ERROR HY000: Incorrect integer value: ' line 49' for column 'OWNERID' at row 1 +ERROR 01000: Row 1 doesn't contain data for all columns select * from AD_PINSTANCE_LOG_DOUBLE_ENCLOSED; ID AD_CLIENT_ID AD_ORG_ID AD_PINSTANCE_ID P_DATE P_MSG OWNERID MODIFIERID CREATIONDATE MODIFIEDDATE ISACTIVE 765893 37 27 44221 2021-06-28 02:01:16 计算售罄率出错:ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMITORA-06512: at "BOSNDS3.O2O_STORESALERATE_SET", line 49 diff --git a/mysql-test/suite/tianmu/std_data/issue1209-1.txt b/mysql-test/suite/tianmu/std_data/issue1209-1.txt new file mode 100644 index 000000000..03fd3b2eb --- /dev/null +++ b/mysql-test/suite/tianmu/std_data/issue1209-1.txt @@ -0,0 +1 @@ +1,chai,6;2,测试,3,4;3,chayicha;; diff --git a/mysql-test/suite/tianmu/std_data/issue1209-2.txt b/mysql-test/suite/tianmu/std_data/issue1209-2.txt new file mode 100644 index 000000000..e7a0131e9 --- /dev/null +++ b/mysql-test/suite/tianmu/std_data/issue1209-2.txt @@ -0,0 +1 @@ +1,"chai",7;2,"测试",8,9;3,"chayicha";;"5";"4","; diff --git a/mysql-test/suite/tianmu/t/issue1209.test b/mysql-test/suite/tianmu/t/issue1209.test new file mode 100644 index 000000000..eb3b60cf5 --- /dev/null +++ b/mysql-test/suite/tianmu/t/issue1209.test @@ -0,0 +1,27 @@ +--source include/have_tianmu.inc + +--disable_warnings +DROP DATABASE IF EXISTS issue1209_test; +--enable_warnings + +CREATE DATABASE issue1209_test; + +USE issue1209_test; + +CREATE TABLE `t1_tianmu` ( + `id` int(11) DEFAULT NULL, + `a` char(20) DEFAULT NULL, + `id2` int(11) DEFAULT NULL +) ENGINE=tianmu DEFAULT CHARSET=utf8mb4; + +--replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR +eval LOAD DATA LOCAL infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-1.txt' into table t1_tianmu FIELDS TERMINATED BY ',' LINES TERMINATED BY ';'; +select * from t1_tianmu; + +truncate table t1_tianmu; + +--replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR +eval LOAD DATA LOCAL infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1209-2.txt' into table t1_tianmu FIELDS TERMINATED BY ',' enclosed by '"' LINES TERMINATED BY ';'; +select * from t1_tianmu; + +DROP DATABASE issue1209_test; \ No newline at end of file diff --git a/mysql-test/suite/tianmu/t/issue1263.test b/mysql-test/suite/tianmu/t/issue1263.test index 2dfbb35d2..64675382c 100644 --- a/mysql-test/suite/tianmu/t/issue1263.test +++ b/mysql-test/suite/tianmu/t/issue1263.test @@ -93,7 +93,7 @@ ISACTIVE=NULLif(@ISACTIVE,'') select * from AD_PINSTANCE_LOG_DOUBLE_ENCLOSED; --replace_result $MYSQL_TEST_DIR MYSQL_TEST_DIR ---error 1366 +--error 1261 eval load data infile '$MYSQL_TEST_DIR/suite/tianmu/std_data/issue1263-3.txt' into table AD_PINSTANCE_LOG_DOUBLE_ENCLOSED FIELDS TERMINATED BY ',' ENCLOSED BY '"' diff --git a/storage/tianmu/loader/load_parser.cpp b/storage/tianmu/loader/load_parser.cpp index f74574526..1288ab042 100644 --- a/storage/tianmu/loader/load_parser.cpp +++ b/storage/tianmu/loader/load_parser.cpp @@ -88,8 +88,10 @@ bool LoadParser::MakeRow(std::vector &value_buffers) { int errorinfo; bool cont = true; + bool eof = false; + while (cont) { - switch (strategy_->GetOneRow(cur_ptr_, buf_end_ - cur_ptr_, value_buffers, rowsize, errorinfo)) { + switch (strategy_->GetOneRow(cur_ptr_, buf_end_ - cur_ptr_, value_buffers, rowsize, errorinfo, eof)) { case ParsingStrategy::ParseResult::EOB: if (mysql_bin_log.is_open()) binlog_loaded_block(read_buffer_.Buf(), cur_ptr_); @@ -98,10 +100,15 @@ bool LoadParser::MakeRow(std::vector &value_buffers) { buf_end_ = cur_ptr_ + read_buffer_.BufSize(); } else { // reaching the end of the buffer - if (cur_ptr_ != buf_end_) - rejecter_.ConsumeBadRow(cur_ptr_, buf_end_ - cur_ptr_, cur_row_ + 1, errorinfo == -1 ? -1 : errorinfo + 1); - cur_row_++; - cont = false; + if (cur_ptr_ != buf_end_) { + // rejecter_.ConsumeBadRow(cur_ptr_, buf_end_ - cur_ptr_, cur_row_ + 1, errorinfo == -1 ? -1 : errorinfo + + // 1); + // do not cousume the row, take this as the normal line + eof = true; + } else { + cur_row_++; + cont = false; + } } break; diff --git a/storage/tianmu/loader/parsing_strategy.cpp b/storage/tianmu/loader/parsing_strategy.cpp index aad10d187..c1a17e49d 100644 --- a/storage/tianmu/loader/parsing_strategy.cpp +++ b/storage/tianmu/loader/parsing_strategy.cpp @@ -186,11 +186,12 @@ inline bool TailsMatch(const char *s1, const char *s2, const size_t size) { inline ParsingStrategy::SearchResult ParsingStrategy::SearchUnescapedPatternNoEOL(const char *&ptr, const char *const buf_end, const std::string &pattern, + const std::string &line_termination, const std::vector &kmp_next) { const char *c_pattern = pattern.c_str(); size_t size = pattern.size(); - const char *c_eol = terminator_.c_str(); - size_t crlf = terminator_.size(); + const char *c_eol = line_termination.c_str(); + size_t crlf = line_termination.size(); const char *search_end = buf_end; if (size == 1) { @@ -315,12 +316,12 @@ class Field_tmp_nullability_guard { void ParsingStrategy::ReadField(const char *&ptr, const char *&val_beg, Item *&item, uint &index_of_field, std::vector> &vec_ptr_field, - uint &field_index_in_field_list, const CHARSET_INFO *char_info) { + uint &field_index_in_field_list, const CHARSET_INFO *char_info, bool completed_row) { bool is_enclosed = false; char *val_start{nullptr}; size_t val_len{0}; - if (string_qualifier_ && *val_beg == string_qualifier_) { + if (string_qualifier_ && *val_beg == string_qualifier_ && completed_row) { // first char is enclose char, skip it val_start = const_cast(val_beg) + 1; // skip the first and the last char which is encolose char @@ -407,8 +408,8 @@ void ParsingStrategy::ReadField(const char *&ptr, const char *&val_beg, Item *&i } ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, size_t size, - std::vector &record, uint &rowsize, - int &errorinfo) { + std::vector &record, uint &rowsize, int &errorinfo, + bool eof) { const char *buf_end = buf + size; if (!prepared_) { GetEOL(buf, buf_end); @@ -464,22 +465,39 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s uint index_of_field{0}; // step2, fill the field list with data file content; + + // three condition for matching one field: + // (1) enclosed-char(optional) + content + enclosed-char(optional) + delimiter-str, return PATTERN_FOUND + // (2) enclosed-char(optional) + content + enclosed-char(optional) + termination_str, return END_OF_LINE + // (3) enclosed-char(optional) + content +(without enclosed-char(optional) + delimiter-str/termination_str), return + // END_OF_BUFFER + bool enclosed_column = false; + while ((item = it++)) { index++; - if (index == fields_vars.elements) - break; const char *val_beg = ptr; - if (string_qualifier_ && *ptr == string_qualifier_) { - row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_delimiter_, kmp_next_enclose_delimiter_); + + enclosed_column = false; + if (string_qualifier_ && *ptr == string_qualifier_) + enclosed_column = true; + const std::string &delimitor = enclosed_column ? enclose_delimiter_ : delimiter_; + const std::string &line_termination = enclosed_column ? enclose_terminator_ : terminator_; + const std::vector &kmp_local = enclosed_column ? kmp_next_enclose_delimiter_ : kmp_next_delimiter_; + + if (enclosed_column) ++ptr; - } else { - SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, delimiter_, kmp_next_delimiter_); - if (res == SearchResult::END_OF_LINE) { - ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info); - continue; - } - row_incomplete = (res == SearchResult::END_OF_BUFFER); + + SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, delimitor, line_termination, kmp_local); + row_incomplete = (res == SearchResult::END_OF_BUFFER); + + if (row_incomplete && eof) { + // field is incompleted,and reach to the end of buffer, take the incompeted data as the field content; + ReadField(buf_end, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info, false); + ptr = buf_end; + row_incomplete = false; + ++item; + break; } if (row_incomplete) { @@ -487,35 +505,47 @@ ParsingStrategy::ParseResult ParsingStrategy::GetOneRow(const char *const buf, s goto end; } + if (enclosed_column) + ++ptr; ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info); - ptr += delimiter_.size(); - } + if (res == SearchResult::PATTERN_FOUND) { + ptr += delimiter_.size(); + } - if (!row_incomplete) { - // the last column - index++; - const char *val_beg = ptr; - if (string_qualifier_ && *ptr == string_qualifier_) { - row_incomplete = !SearchUnescapedPattern(++ptr, buf_end, enclose_terminator_, kmp_next_enclose_terminator_); - ++ptr; - } else - row_incomplete = !SearchUnescapedPattern(ptr, buf_end, terminator_, kmp_next_terminator_); + if (res == SearchResult::END_OF_LINE) { + ++item; + break; + } + } - if (!row_incomplete) { - ReadField(ptr, val_beg, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info); - } else { - errorinfo = index; - goto end; + if (item) { + while ((item = it++)) { + // field is few, warn if occurs; + ReadField(ptr, ptr, item, index_of_field, vec_ptr_field, field_index_in_field_list, char_info, false); + push_warning_printf(thd_, Sql_condition::SL_WARNING, ER_WARN_TOO_FEW_RECORDS, ER(ER_WARN_TOO_FEW_RECORDS), + thd_->get_stmt_da()->current_row_for_condition()); } + } - ptr += terminator_.size(); + if (!row_incomplete && !eof) { + const char *orig_ptr = ptr; + SearchResult res = SearchUnescapedPatternNoEOL(ptr, buf_end, terminator_, terminator_, kmp_next_terminator_); + // check too many records, warn if occurs + if (res != SearchResult::END_OF_BUFFER) { + if (orig_ptr != ptr) { + push_warning_printf(thd_, Sql_condition::SL_WARNING, ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS), + thd_->get_stmt_da()->current_row_for_condition()); + } + ptr += terminator_.size(); + } } if (thd_->killed) { row_data_error = true; goto end; } + it.rewind(); while ((item = it++)) { Item *real_item = item->real_item(); diff --git a/storage/tianmu/loader/parsing_strategy.h b/storage/tianmu/loader/parsing_strategy.h index 9759876f4..1a4c37d71 100644 --- a/storage/tianmu/loader/parsing_strategy.h +++ b/storage/tianmu/loader/parsing_strategy.h @@ -34,10 +34,10 @@ class ParsingStrategy final { ParsingStrategy(const system::IOParameters &iop, std::vector columns_collations); ~ParsingStrategy() {} ParseResult GetOneRow(const char *const buf, size_t size, std::vector &values, uint &rowsize, - int &errorinfo); + int &errorinfo, bool eof = false); void ReadField(const char *&ptr, const char *&val_beg, Item *&item, uint &index_of_field, std::vector> &vec_ptr_field, uint &field_index_in_field_list, - const CHARSET_INFO *char_info); + const CHARSET_INFO *char_info, bool completed_row = true); void SetTHD(THD *thd) { thd_ = thd; } THD *GetTHD() const { return thd_; }; @@ -79,7 +79,7 @@ class ParsingStrategy final { const std::vector &kmp_next); enum class SearchResult { PATTERN_FOUND, END_OF_BUFFER, END_OF_LINE }; SearchResult SearchUnescapedPatternNoEOL(const char *&ptr, const char *const buf_end, const std::string &pattern, - const std::vector &kmp_next); + const std::string &line_termination, const std::vector &kmp_next); void GetEOL(const char *const buf, const char *const buf_end); void GetValue(const char *const value_ptr, size_t value_size, ushort col, ValueCache &value);