Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve](serde) support text serde for nested type-array/map #22738

Merged
merged 5 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,54 @@ struct Slice {
size -= n;
}

/// Drop the last "n" bytes from this slice.
///
/// @pre n <= size
///
/// @note Only the base and bounds of the slice are changed;
/// the data is not modified.
///
/// @param [in] n
/// Number of bytes that should be dropped from the last.
void remove_suffix(size_t n) {
assert(n <= size);
size -= n;
}

/// Remove leading spaces.
///
/// @pre n <= size
///
/// @note Only the base and bounds of the slice are changed;
/// the data is not modified.
///
/// @param [in] n
/// Number of bytes of space that should be dropped from the beginning.
void trim_prefix() {
int32_t begin = 0;
while (begin < size && data[begin] == ' ') {
data += 1;
size -= 1;
}
}

/// Remove quote char '"' or ''' which should exist as first and last char.
///
/// @pre n <= size
///
/// @note Only the base and bounds of the slice are changed;
/// the data is not modified.
///
/// @param [in] n
/// Number of bytes of space that should be dropped from the beginning.
void trim_quote() {
int32_t begin = 0;
if (size > 2 && ((data[begin] == '"' && data[size - 1] == '"') ||
(data[begin] == '\'' && data[size - 1] == '\''))) {
data += 1;
size -= 2;
}
}
/// Truncate the slice to the given number of bytes.
///
/// @pre n <= size
Expand Down
110 changes: 110 additions & 0 deletions be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,116 @@ namespace doris {
namespace vectorized {
class Arena;

void DataTypeArraySerDe::serialize_column_to_text(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw,
FormatOptions& options) const {
SERIALIZE_COLUMN_TO_TEXT()
}

void DataTypeArraySerDe::serialize_one_cell_to_text(const IColumn& column, int row_num,
BufferWritable& bw,
FormatOptions& options) const {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;

auto& data_column = assert_cast<const ColumnArray&>(*ptr);
auto& offsets = data_column.get_offsets();
size_t offset = offsets[row_num - 1];
size_t next_offset = offsets[row_num];

const IColumn& nested_column = data_column.get_data();
// bool is_nested_string = remove_nullable(nested_column.get_ptr())->is_column_string();

bw.write("[", 1);
// nested column field delim should be replaced as collection delim because this field is in array.
// add ' ' to keep same with origin format with array
options.field_delim = options.collection_delim;
options.field_delim += " ";
nested_serde->serialize_column_to_text(nested_column, offset, next_offset, bw, options);
bw.write("]", 1);
}

Status DataTypeArraySerDe::deserialize_column_from_text_vector(IColumn& column,
std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const {
DCHECK(!slices.empty());
int end = num_deserialized && *num_deserialized > 0 ? *num_deserialized : slices.size();

for (int i = 0; i < end; ++i) {
if (Status st = deserialize_one_cell_from_text(column, slices[i], options);
st != Status::OK()) {
*num_deserialized = i + 1;
return st;
}
}
return Status::OK();
}

Status DataTypeArraySerDe::deserialize_one_cell_from_text(IColumn& column, Slice& slice,
const FormatOptions& options) const {
DCHECK(!slice.empty());
auto& array_column = assert_cast<ColumnArray&>(column);
auto& offsets = array_column.get_offsets();
IColumn& nested_column = array_column.get_data();
DCHECK(nested_column.is_nullable());
if (slice[0] != '[') {
return Status::InvalidArgument("Array does not start with '[' character, found '{}'",
slice[0]);
}
if (slice[slice.size - 1] != ']') {
return Status::InvalidArgument("Array does not end with ']' character, found '{}'",
slice[slice.size - 1]);
}
// empty array []
if (slice.size == 2) {
offsets.push_back(offsets.back());
return Status::OK();
}
slice.remove_prefix(1);
slice.remove_suffix(1);

// deserialize array column from text we have to know how to split from text and support nested
// complex type.
// 1. get item according to collection_delimiter, but if meet collection_delimiter in string, we should ignore it.
// 2. keep a nested level to support nested complex type.
int nested_level = 0;
bool has_quote = false;
std::vector<Slice> slices;
slice.trim_prefix();
slices.emplace_back(slice);
size_t slice_size = slice.size;
// pre add total slice can reduce lasted element check.
for (int idx = 0; idx < slice_size; ++idx) {
char c = slice[idx];
if (c == '"' || c == '\'') {
has_quote = !has_quote;
} else if (!has_quote && (c == '[' || c == '{')) {
++nested_level;
} else if (!has_quote && (c == ']' || c == '}')) {
--nested_level;
} else if (!has_quote && nested_level == 0 && c == options.collection_delim) {
// if meet collection_delimiter and not in quote, we can make it as an item.
slices.back().remove_suffix(slice_size - idx);
// add next total slice.(slice data will not change, so we can use slice directly)
// skip delimiter
Slice next(slice.data + idx + 1, slice_size - idx - 1);
next.trim_prefix();
if (options.converted_from_string) slices.back().trim_quote();
slices.emplace_back(next);
}
}

if (options.converted_from_string) slices.back().trim_quote();

int elem_deserialized = 0;
Status st = nested_serde->deserialize_column_from_text_vector(nested_column, slices,
&elem_deserialized, options);
offsets.emplace_back(offsets.back() + elem_deserialized);
return st;
}

void DataTypeArraySerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result,
Arena* mem_pool, int32_t col_id,
int row_num) const {
Expand Down
23 changes: 13 additions & 10 deletions be/src/vec/data_types/serde/data_type_array_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,25 @@ class DataTypeArraySerDe : public DataTypeSerDe {
DataTypeArraySerDe(const DataTypeSerDeSPtr& _nested_serde) : nested_serde(_nested_serde) {}

void serialize_one_cell_to_text(const IColumn& column, int row_num, BufferWritable& bw,
const FormatOptions& options) const override {
LOG(FATAL) << "Not support serialize array column to buffer";
}
FormatOptions& options) const override;

Status deserialize_one_cell_from_text(IColumn& column, ReadBuffer& rb,
const FormatOptions& options) const override {
LOG(FATAL) << "Not support deserialize from buffer to array";
return Status::NotSupported("Not support deserialize from buffer to array");
}
void serialize_column_to_text(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw, FormatOptions& options) const override;

Status deserialize_one_cell_from_text(IColumn& column, Slice& slice,
const FormatOptions& options) const override;

Status deserialize_column_from_text_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const override;
Status write_column_to_pb(const IColumn& column, PValues& result, int start,
int end) const override {
LOG(FATAL) << "Not support write array column to pb";
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}
Status read_column_from_pb(IColumn& column, const PValues& arg) const override {
LOG(FATAL) << "Not support read from pb to array";
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}

void write_one_cell_to_jsonb(const IColumn& column, JsonbWriter& result, Arena* mem_pool,
Expand Down
28 changes: 21 additions & 7 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,26 @@ class Arena;
class DataTypeBitMapSerDe : public DataTypeSerDe {
public:
void serialize_one_cell_to_text(const IColumn& column, int row_num, BufferWritable& bw,
const FormatOptions& options) const override {
LOG(FATAL) << "Not support serialize bitmap column to buffer";
FormatOptions& options) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}

Status deserialize_one_cell_from_text(IColumn& column, ReadBuffer& rb,
void serialize_column_to_text(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw, FormatOptions& options) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}
Status deserialize_one_cell_from_text(IColumn& column, Slice& slice,
const FormatOptions& options) const override {
LOG(FATAL) << "Not support deserialize from buffer to bitmap";
return Status::NotSupported("Not support deserialize from buffer to bitmap");
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}
Status deserialize_column_from_text_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}

Status write_column_to_pb(const IColumn& column, PValues& result, int start,
Expand All @@ -55,11 +67,13 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {
void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int start,
int end) const override {
LOG(FATAL) << "Not support write bitmap column to arrow";
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}
void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
int end, const cctz::time_zone& ctz) const override {
LOG(FATAL) << "Not support read bitmap column from arrow";
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_pb with type " + column.get_name());
}

Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
Expand Down
44 changes: 34 additions & 10 deletions be/src/vec/data_types/serde/data_type_date64_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@
namespace doris {
namespace vectorized {

void DataTypeDate64SerDe::serialize_column_to_text(const IColumn& column, int start_idx,
int end_idx, BufferWritable& bw,
FormatOptions& options) const {
SERIALIZE_COLUMN_TO_TEXT();
}

void DataTypeDate64SerDe::serialize_one_cell_to_text(const IColumn& column, int row_num,
BufferWritable& bw,
const FormatOptions& options) const {
FormatOptions& options) const {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;
Expand All @@ -54,33 +60,45 @@ void DataTypeDate64SerDe::serialize_one_cell_to_text(const IColumn& column, int
char* pos = value.to_string(buf);
bw.write(buf, pos - buf - 1);
}
bw.commit();
}

Status DataTypeDate64SerDe::deserialize_one_cell_from_text(IColumn& column, ReadBuffer& rb,
Status DataTypeDate64SerDe::deserialize_column_from_text_vector(
IColumn& column, std::vector<Slice>& slices, int* num_deserialized,
const FormatOptions& options) const {
DESERIALIZE_COLUMN_FROM_TEXT_VECTOR()
return Status::OK();
}

Status DataTypeDate64SerDe::deserialize_one_cell_from_text(IColumn& column, Slice& slice,
const FormatOptions& options) const {
auto& column_data = assert_cast<ColumnInt64&>(column);
Int64 val = 0;
if (options.date_olap_format) {
tm time_tm;
char* res = strptime(rb.position(), "%Y-%m-%d", &time_tm);
char* res = strptime(slice.data, "%Y-%m-%d", &time_tm);
if (nullptr != res) {
val = (time_tm.tm_year + 1900) * 16 * 32 + (time_tm.tm_mon + 1) * 32 + time_tm.tm_mday;
} else {
// 1400 - 01 - 01
val = 716833;
}
} else if (!read_date_text_impl<Int64>(val, rb)) {
} else if (ReadBuffer rb(slice.data, slice.size); !read_date_text_impl<Int64>(val, rb)) {
return Status::InvalidArgument("parse date fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str());
}
column_data.insert_value(val);
return Status::OK();
}

void DataTypeDateTimeSerDe::serialize_column_to_text(const IColumn& column, int start_idx,
int end_idx, BufferWritable& bw,
FormatOptions& options) const {
SERIALIZE_COLUMN_TO_TEXT()
}

void DataTypeDateTimeSerDe::serialize_one_cell_to_text(const IColumn& column, int row_num,
BufferWritable& bw,
const FormatOptions& options) const {
FormatOptions& options) const {
auto result = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = result.first;
row_num = result.second;
Expand Down Expand Up @@ -109,16 +127,22 @@ void DataTypeDateTimeSerDe::serialize_one_cell_to_text(const IColumn& column, in
char* pos = value.to_string(buf);
bw.write(buf, pos - buf - 1);
}
bw.commit();
}

Status DataTypeDateTimeSerDe::deserialize_one_cell_from_text(IColumn& column, ReadBuffer& rb,
Status DataTypeDateTimeSerDe::deserialize_column_from_text_vector(
IColumn& column, std::vector<Slice>& slices, int* num_deserialized,
const FormatOptions& options) const {
DESERIALIZE_COLUMN_FROM_TEXT_VECTOR()
return Status::OK();
}

Status DataTypeDateTimeSerDe::deserialize_one_cell_from_text(IColumn& column, Slice& slice,
const FormatOptions& options) const {
auto& column_data = assert_cast<ColumnInt64&>(column);
Int64 val = 0;
if (options.date_olap_format) {
tm time_tm;
char* res = strptime(rb.position(), "%Y-%m-%d %H:%M:%S", &time_tm);
char* res = strptime(slice.data, "%Y-%m-%d %H:%M:%S", &time_tm);
if (nullptr != res) {
val = ((time_tm.tm_year + 1900) * 10000L + (time_tm.tm_mon + 1) * 100L +
time_tm.tm_mday) *
Expand All @@ -128,7 +152,7 @@ Status DataTypeDateTimeSerDe::deserialize_one_cell_from_text(IColumn& column, Re
// 1400 - 01 - 01
val = 14000101000000L;
}
} else if (!read_datetime_text_impl<Int64>(val, rb)) {
} else if (ReadBuffer rb(slice.data, slice.size); !read_datetime_text_impl<Int64>(val, rb)) {
return Status::InvalidArgument("parse datetime fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str());
}
Expand Down
22 changes: 17 additions & 5 deletions be/src/vec/data_types/serde/data_type_date64_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ class Arena;

class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> {
void serialize_one_cell_to_text(const IColumn& column, int row_num, BufferWritable& bw,
const FormatOptions& options) const override;

Status deserialize_one_cell_from_text(IColumn& column, ReadBuffer& rb,
FormatOptions& options) const override;
void serialize_column_to_text(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw, FormatOptions& options) const override;
Status deserialize_one_cell_from_text(IColumn& column, Slice& slice,
const FormatOptions& options) const override;

Status deserialize_column_from_text_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const override;

void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int start,
int end) const override;
Expand All @@ -64,11 +70,17 @@ class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> {
};

class DataTypeDateTimeSerDe : public DataTypeDate64SerDe {
void serialize_column_to_text(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw, FormatOptions& options) const override;

void serialize_one_cell_to_text(const IColumn& column, int row_num, BufferWritable& bw,
const FormatOptions& options) const override;
FormatOptions& options) const override;

Status deserialize_one_cell_from_text(IColumn& column, ReadBuffer& rb,
Status deserialize_one_cell_from_text(IColumn& column, Slice& slice,
const FormatOptions& options) const override;
Status deserialize_column_from_text_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const override;
};
} // namespace vectorized
} // namespace doris
Loading