Skip to content

Commit

Permalink
[FIX](complextype)fix struct nested complex collection type and and r…
Browse files Browse the repository at this point in the history
…egresstest (#26973)
  • Loading branch information
amorynan authored Nov 20, 2023
1 parent 840f3b6 commit c0f22e8
Show file tree
Hide file tree
Showing 11 changed files with 14,588 additions and 92 deletions.
1 change: 0 additions & 1 deletion be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
case FieldType::OLAP_FIELD_TYPE_STRUCT: {
// not support empty struct
DCHECK(meta.children_columns_size() >= 1);
num_rows = meta.children_columns(0).num_rows();
// create struct column reader
std::unique_ptr<ColumnReader> struct_reader(
new ColumnReader(opts, meta, num_rows, file_reader));
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ Status StructColumnWriter::finish() {
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->finish());
}
_opts.meta->set_num_rows(get_next_rowid());
return Status::OK();
}

Expand Down Expand Up @@ -993,6 +994,7 @@ Status ArrayColumnWriter::finish() {
RETURN_IF_ERROR(_null_writer->finish());
}
RETURN_IF_ERROR(_item_writer->finish());
_opts.meta->set_num_rows(get_next_rowid());
return Status::OK();
}

Expand Down Expand Up @@ -1091,6 +1093,7 @@ Status MapColumnWriter::finish() {
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->finish());
}
_opts.meta->set_num_rows(get_next_rowid());
return Status::OK();
}

Expand Down
188 changes: 103 additions & 85 deletions be/src/vec/data_types/serde/data_type_struct_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ Status DataTypeStructSerDe::serialize_one_cell_to_json(const IColumn& column, in
bw.write('}');
return Status::OK();
}

Status DataTypeStructSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
if (slice.empty()) {
Expand Down Expand Up @@ -94,106 +93,125 @@ Status DataTypeStructSerDe::deserialize_one_cell_from_json(IColumn& column, Slic
}
return Status::OK();
}

ReadBuffer rb(slice.data, slice.size);
++rb.position();
// remove '{' '}'
slice.remove_prefix(1);
slice.remove_suffix(1);
slice.trim_prefix();

bool is_explicit_names = false;
std::vector<std::string> field_names;
std::vector<ReadBuffer> field_rbs;
std::vector<size_t> field_pos;

while (!rb.eof()) {
StringRef slot(rb.position(), rb.count());
bool has_quota = false;
bool is_name = false;
if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
return Status::InvalidArgument("Cannot read struct field from text '{}'",
slot.to_string());
}
if (is_name) {
std::string name = slot.to_string();
if (!next_slot_from_string(rb, slot, is_name, has_quota)) {
return Status::InvalidArgument("Cannot read struct field from text '{}'",
slot.to_string());
}
ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
field_names.push_back(name);
field_rbs.push_back(field_rb);

if (!is_explicit_names) {
is_explicit_names = true;
int nested_level = 0;
bool has_quote = false;
int start_pos = 0;
size_t slice_size = slice.size;
bool key_added = false;
int idx = 0;
char quote_char = 0;

auto elem_size = elemSerDeSPtrs.size();
int field_pos = 0;

for (; idx < slice_size; ++idx) {
char c = slice[idx];
if (c == '"' || c == '\'') {
if (!has_quote) {
quote_char = c;
has_quote = !has_quote;
} else if (has_quote && quote_char == c) {
quote_char = 0;
has_quote = !has_quote;
}
} else {
ReadBuffer field_rb(const_cast<char*>(slot.data), slot.size);
field_rbs.push_back(field_rb);
}
}

// TODO: should we support insert default field value when actual field number is less than
// schema field number?
if (field_rbs.size() != elemSerDeSPtrs.size()) {
std::string cmp_str = field_rbs.size() > elemSerDeSPtrs.size() ? "more" : "less";
return Status::InvalidArgument(
"Actual struct field number {} is {} than schema field number {}.",
field_rbs.size(), cmp_str, elemSerDeSPtrs.size());
}

if (is_explicit_names) {
if (field_names.size() != field_rbs.size()) {
return Status::InvalidArgument(
"Struct field name number {} is not equal to field number {}.",
field_names.size(), field_rbs.size());
}
std::unordered_set<std::string> name_set;
for (size_t i = 0; i < field_names.size(); i++) {
// check duplicate fields
auto ret = name_set.insert(field_names[i]);
if (!ret.second) {
return Status::InvalidArgument("Struct field name {} is duplicate with others.",
field_names[i]);
} else if (c == '\\' && idx + 1 < slice_size) { //escaped
++idx;
} else if (!has_quote && (c == '[' || c == '{')) {
++nested_level;
} else if (!has_quote && (c == ']' || c == '}')) {
--nested_level;
} else if (!has_quote && nested_level == 0 && c == options.map_key_delim && !key_added) {
// if meet map_key_delimiter and not in quote, we can make it as key elem.
if (idx == start_pos) {
continue;
}
// check name valid
auto idx = try_get_position_by_name(field_names[i]);
if (idx == std::nullopt) {
Slice next(slice.data + start_pos, idx - start_pos);
next.trim_prefix();
next.trim_quote();
// check field_name
if (elemNames[field_pos] != next) {
// we should do column revert if error
for (size_t j = 0; j < field_pos; j++) {
struct_column.get_column(j).pop_back(1);
}
return Status::InvalidArgument("Cannot find struct field name {} in schema.",
field_names[i]);
next.to_string());
}
field_pos.push_back(idx.value());
}
} else {
for (size_t i = 0; i < field_rbs.size(); i++) {
field_pos.push_back(i);
// skip delimiter
start_pos = idx + 1;
is_explicit_names = true;
key_added = true;
} else if (!has_quote && nested_level == 0 && c == options.collection_delim &&
(key_added || !is_explicit_names)) {
// if meet collection_delimiter and not in quote, we can make it as value elem
if (idx == start_pos) {
continue;
}
Slice next(slice.data + start_pos, idx - start_pos);
next.trim_prefix();
if (field_pos > elem_size) {
// we should do column revert if error
for (size_t j = 0; j < field_pos; j++) {
struct_column.get_column(j).pop_back(1);
}
return Status::InvalidArgument(
"Actual struct field number is more than schema field number {}.",
field_pos, elem_size);
}
if (Status st = elemSerDeSPtrs[field_pos]->deserialize_one_cell_from_json(
struct_column.get_column(field_pos), next, options);
st != Status::OK()) {
// we should do column revert if error
for (size_t j = 0; j < field_pos; j++) {
struct_column.get_column(j).pop_back(1);
}
return st;
}
// skip delimiter
start_pos = idx + 1;
// reset key_added
key_added = false;
++field_pos;
}
}

for (size_t idx = 0; idx < elemSerDeSPtrs.size(); idx++) {
auto field_rb = field_rbs[field_pos[idx]];
// handle empty element
if (field_rb.count() == 0) {
struct_column.get_column(idx).insert_default();
continue;
}
// handle null element
if (field_rb.count() == 4 && strncmp(field_rb.position(), NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str())) == 0) {
auto& nested_null_col =
reinterpret_cast<ColumnNullable&>(struct_column.get_column(idx));
nested_null_col.insert_null_elements(1);
continue;
// for last value elem
if (!has_quote && nested_level == 0 && idx == slice_size && idx != start_pos &&
(key_added || !is_explicit_names)) {
Slice next(slice.data + start_pos, idx - start_pos);
next.trim_prefix();
if (field_pos > elem_size) {
// we should do column revert if error
for (size_t j = 0; j < field_pos; j++) {
struct_column.get_column(j).pop_back(1);
}
return Status::InvalidArgument(
"Actual struct field number is more than schema field number {}.", field_pos,
elem_size);
}
Slice element_slice(field_rb.position(), field_rb.count());
auto st = elemSerDeSPtrs[idx]->deserialize_one_cell_from_json(struct_column.get_column(idx),
element_slice, options);
if (!st.ok()) {
if (Status st = elemSerDeSPtrs[field_pos]->deserialize_one_cell_from_json(
struct_column.get_column(field_pos), next, options);
st != Status::OK()) {
// we should do column revert if error
for (size_t j = 0; j < idx; j++) {
for (size_t j = 0; j < field_pos; j++) {
struct_column.get_column(j).pop_back(1);
}
return st;
}
++field_pos;
}

// check stuff:
if (field_pos < elem_size) {
return Status::InvalidArgument(
"Actual struct field number {} is less than schema field number {}.", field_pos,
elem_size);
}
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/data_types/serde/data_type_struct_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class DataTypeStructSerDe : public DataTypeSerDe {
size_t str_len = 1;
// search until next '"' or '\''
while (str_len < rb.count() && *(rb.position() + str_len) != str_sep) {
if (*(rb.position() + str_len) == '\\' && str_len + 1 < rb.count()) {
++str_len;
}
++str_len;
}
// invalid string
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions regression-test/data/load_p0/stream_load/test_stream_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@
3 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": null, "f8": null, "f9": null, "f10": 1.100000}
4 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
5 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
6 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
7 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
8 {"f1": 1, "f2": null, "f3": null, "f4": null, "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
6 \N
7 \N
8 \N
9 {"f1": null, "f2": null, "f3": null, "f4": null, "f5": null, "f6": null, "f7": null, "f8": null, "f9": null, "f10": null}
10 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
11 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": null, "f8": null, "f9": null, "f10": 1.100000}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@
3 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": null, "f8": null, "f9": null, "f10": 1.100000}
4 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
5 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
6 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
7 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
8 {"f1": 1, "f2": null, "f3": null, "f4": null, "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
6 \N
7 \N
8 \N
9 {"f1": null, "f2": null, "f3": null, "f4": null, "f5": null, "f6": null, "f7": null, "f8": null, "f9": null, "f10": null}
10 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": "2023-02-26 17:58:00", "f8": 1.01, "f9": 3.1415926, "f10": 1.100000}
11 {"f1": 1, "f2": 100, "f3": 100000, "f4": "a", "f5": "doris", "f6": "2023-02-26", "f7": null, "f8": null, "f9": null, "f10": 1.100000}
Expand Down
Loading

0 comments on commit c0f22e8

Please sign in to comment.