Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](serde)Optimize the filling of fixed values ​​into block columns without repeated deserialization. #37377

Merged
merged 5 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,25 @@ Status DataTypeDateTimeV2SerDe::write_column_to_orc(const std::string& timezone,
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeDateTimeV2SerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

void DataTypeDateTimeV2SerDe::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnVector<UInt64>&>(column);
auto sz = col.size();
UInt64 val = col.get_element(sz - 1);
col.insert_many_vals(val, times);
}

} // namespace doris::vectorized
5 changes: 5 additions & 0 deletions be/src/vec/data_types/serde/data_type_datetimev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<UInt64> {
int start, int end,
std::vector<StringRef>& buffer_list) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;
void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
21 changes: 21 additions & 0 deletions be/src/vec/data_types/serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,26 @@ Status DataTypeDateV2SerDe::write_column_to_orc(const std::string& timezone, con
return Status::OK();
}

Status DataTypeDateV2SerDe::deserialize_column_from_fixed_json(IColumn& column, Slice& slice,
int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}
DataTypeDateV2SerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

void DataTypeDateV2SerDe::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnVector<UInt32>&>(column);
auto sz = col.size();
UInt32 val = col.get_element(sz - 1);

col.insert_many_vals(val, times);
}

} // namespace vectorized
} // namespace doris
6 changes: 6 additions & 0 deletions be/src/vec/data_types/serde/data_type_datev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class DataTypeDateV2SerDe : public DataTypeNumberSerDe<UInt32> {
int start, int end,
std::vector<StringRef>& buffer_list) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;

void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
26 changes: 26 additions & 0 deletions be/src/vec/data_types/serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,32 @@ Status DataTypeDecimalSerDe<T>::write_column_to_orc(const std::string& timezone,
}
return Status::OK();
}
template <typename T>

Status DataTypeDecimalSerDe<T>::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeDecimalSerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

template <typename T>
void DataTypeDecimalSerDe<T>::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnDecimal<T>&>(column);
auto sz = col.size();

T val = col.get_element(sz - 1);
for (int i = 0; i < times; i++) {
col.insert_value(val);
}
}

template class DataTypeDecimalSerDe<Decimal32>;
template class DataTypeDecimalSerDe<Decimal64>;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/data_types/serde/data_type_decimal_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ class DataTypeDecimalSerDe : public DataTypeSerDe {
int start, int end,
std::vector<StringRef>& buffer_list) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;

void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
Expand Down
20 changes: 20 additions & 0 deletions be/src/vec/data_types/serde/data_type_nullable_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,26 @@ Status DataTypeNullableSerDe::deserialize_column_from_hive_text_vector(
return Status::OK();
}

Status DataTypeNullableSerDe::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
auto& col = static_cast<ColumnNullable&>(column);
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}
auto& null_map = col.get_null_map_data();
auto& nested_column = col.get_nested_column();

null_map.resize_fill(
rows, null_map.back()); // data_type_nullable::insert_column_last_value_multiple_times()
if (rows - 1 != 0) {
nested_serde->insert_column_last_value_multiple_times(nested_column, rows - 1);
}
*num_deserialized = rows;
return Status::OK();
}

Status DataTypeNullableSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
auto& null_column = assert_cast<ColumnNullable&>(column);
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/data_types/serde/data_type_nullable_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class DataTypeNullableSerDe : public DataTypeSerDe {
int* num_deserialized,
const FormatOptions& options) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;
Status deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
int hive_text_complex_type_delimiter_level = 1) const override;
Expand Down
22 changes: 22 additions & 0 deletions be/src/vec/data_types/serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,28 @@ void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
const auto* raw_data = reinterpret_cast<const T*>(buffer->data()) + start;
col_data.insert(raw_data, raw_data + row_count);
}
template <typename T>
Status DataTypeNumberSerDe<T>::deserialize_column_from_fixed_json(
IColumn& column, Slice& slice, int rows, int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeNumberSerDe::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

template <typename T>
void DataTypeNumberSerDe<T>::insert_column_last_value_multiple_times(IColumn& column,
int times) const {
auto& col = static_cast<ColumnVector<T>&>(column);
auto sz = col.size();
T val = col.get_element(sz - 1);
col.insert_many_vals(val, times);
}

template <typename T>
template <bool is_binary_format>
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/data_types/serde/data_type_number_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ class DataTypeNumberSerDe : public DataTypeSerDe {
int* num_deserialized,
const FormatOptions& options) const override;

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override;

void insert_column_last_value_multiple_times(IColumn& column, int times) const override;

Status write_column_to_pb(const IColumn& column, PValues& result, int start,
int end) const override;
Status read_column_from_pb(IColumn& column, const PValues& arg) const override;
Expand Down
21 changes: 21 additions & 0 deletions be/src/vec/data_types/serde/data_type_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ class DataTypeSerDe {
virtual Status deserialize_column_from_json_vector(IColumn& column, std::vector<Slice>& slices,
int* num_deserialized,
const FormatOptions& options) const = 0;
// deserialize fixed values.Repeatedly insert the value row times into the column.
virtual Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
*num_deserialized = 0;
return st;
}
insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}
// Insert the last value to the end of this column multiple times.
virtual void insert_column_last_value_multiple_times(IColumn& column, int times) const {
//If you try to simplify this operation by using `column.insert_many_from(column, column.size() - 1, rows - 1);`
// you are likely to get incorrect data results.
MutableColumnPtr dum_col = column.clone_empty();
dum_col->insert_from(column, column.size() - 1);
column.insert_many_from(*dum_col.get(), 0, times);
}

virtual Status deserialize_one_cell_from_hive_text(
IColumn& column, Slice& slice, const FormatOptions& options,
Expand Down
25 changes: 25 additions & 0 deletions be/src/vec/data_types/serde/data_type_string_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,31 @@ class DataTypeStringSerDeBase : public DataTypeSerDe {
}
return Status::OK();
}

Status deserialize_column_from_fixed_json(IColumn& column, Slice& slice, int rows,
int* num_deserialized,
const FormatOptions& options) const override {
Status st = deserialize_one_cell_from_json(column, slice, options);
if (!st.ok()) {
return st;
}

DataTypeStringSerDeBase::insert_column_last_value_multiple_times(column, rows - 1);
*num_deserialized = rows;
return Status::OK();
}

void insert_column_last_value_multiple_times(IColumn& column, int times) const override {
auto& col = static_cast<ColumnString&>(column);
auto sz = col.size();

StringRef ref = col.get_data_at(sz - 1);
String str(ref.data, ref.size);
std::vector<StringRef> refs(times, {str.data(), str.size()});

col.insert_many_strings(refs.data(), refs.size());
}

Status read_column_from_pb(IColumn& column, const PValues& arg) const override {
auto& column_dest = assert_cast<ColumnType&>(column);
column_dest.reserve(column_dest.size() + arg.string_value_size());
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,13 +935,10 @@ Status OrcReader::_fill_partition_columns(
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
vector<Slice> slices(rows);
for (int i = 0; i < rows; i++) {
slices[i] = {value.data(), value.size()};
}
int num_deserialized = 0;
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
_text_formatOptions) != Status::OK()) {
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -631,13 +631,10 @@ Status RowGroupReader::_fill_partition_columns(
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
vector<Slice> slices(rows);
for (int i = 0; i < rows; i++) {
slices[i] = {value.data(), value.size()};
}
int num_deserialized = 0;
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
_text_formatOptions) != Status::OK()) {
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
Expand Down
9 changes: 3 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,10 @@ Status VFileScanner::_fill_columns_from_path(size_t rows) {
auto& [value, slot_desc] = kv.second;
auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
vector<Slice> slices(rows);
for (int i = 0; i < rows; i++) {
slices[i] = {value.data(), value.size()};
}
int num_deserialized = 0;
if (_text_serde->deserialize_column_from_json_vector(*col_ptr, slices, &num_deserialized,
_text_formatOptions) != Status::OK()) {
if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows,
&num_deserialized,
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
Expand Down
Loading