Skip to content

Commit

Permalink
[improve](ut)add data_type_array ut test and serde test (apache#46063)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
this pr :
1/ data_type_array ut cover
2/ data_type_array_serde ut cover
  • Loading branch information
amorynan authored Jan 9, 2025
1 parent 394caaf commit f527910
Show file tree
Hide file tree
Showing 10 changed files with 798 additions and 65 deletions.
2 changes: 1 addition & 1 deletion be/src/util/arrow/block_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ Status FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
_timezone_obj);
} catch (std::exception& e) {
return Status::InternalError(
"Fail to convert block data to arrow data, tyep: {}, name: {}, error: {}",
"Fail to convert block data to arrow data, type: {}, name: {}, error: {}",
_cur_type->get_name(), e.what());
}
arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]);
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/data_types/data_type_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ bool DataTypeArray::equals(const IDataType& rhs) const {
nested->equals(*static_cast<const DataTypeArray&>(rhs).nested);
}

// here we should remove nullable, otherwise here always be 1
size_t DataTypeArray::get_number_of_dimensions() const {
const DataTypeArray* nested_array = typeid_cast<const DataTypeArray*>(nested.get());
const DataTypeArray* nested_array =
typeid_cast<const DataTypeArray*>(remove_nullable(nested).get());
if (!nested_array) return 1;
return 1 +
nested_array
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/data_types/serde/data_type_decimal_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <arrow/util/decimal.h>

#include "arrow/type.h"
#include "common/consts.h"
#include "vec/columns/column_decimal.h"
#include "vec/common/arithmetic_overflow.h"
#include "vec/core/types.h"
Expand Down Expand Up @@ -109,7 +110,6 @@ void DataTypeDecimalSerDe<T>::write_column_to_arrow(const IColumn& column, const
checkArrowStatus(builder.Append(value), column.get_name(),
array_builder->type()->name());
}
// TODO: decimal256
} else if constexpr (std::is_same_v<T, Decimal128V3>) {
std::shared_ptr<arrow::DataType> s_decimal_ptr =
std::make_shared<arrow::Decimal128Type>(38, col.get_scale());
Expand Down Expand Up @@ -202,7 +202,8 @@ void DataTypeDecimalSerDe<T>::read_column_from_arrow(IColumn& column,
column_data.emplace_back(*reinterpret_cast<const T*>(concrete_array->Value(value_i)));
}
} else {
LOG(WARNING) << "Unsuppoted convertion to decimal from " << column.get_name();
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"read_column_from_arrow with type " + column.get_name());
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/data_types/serde/data_type_decimal_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ Status DataTypeDecimalSerDe<T>::write_column_to_pb(const IColumn& column, PValue
return Status::OK();
}

// TODO: decimal256
template <typename T>
Status DataTypeDecimalSerDe<T>::read_column_from_pb(IColumn& column, const PValues& arg) const {
if constexpr (std::is_same_v<T, Decimal<Int128>> || std::is_same_v<T, Decimal128V3> ||
std::is_same_v<T, Decimal256> || std::is_same_v<T, Decimal<Int32>>) {
std::is_same_v<T, Decimal256> || std::is_same_v<T, Decimal<Int32>> ||
std::is_same_v<T, Decimal<Int64>>) {
auto old_column_size = column.size();
column.resize(old_column_size + arg.bytes_value_size());
auto& data = reinterpret_cast<ColumnDecimal<T>&>(column).get_data();
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/data_types/serde/data_type_number_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,

// only for largeint(int128) type
if (arrow_array->type_id() == arrow::Type::STRING) {
auto concrete_array = dynamic_cast<const arrow::StringArray*>(arrow_array);
const auto* concrete_array = dynamic_cast<const arrow::StringArray*>(arrow_array);
std::shared_ptr<arrow::Buffer> buffer = concrete_array->value_data();

for (size_t offset_i = start; offset_i < end; ++offset_i) {
Expand All @@ -226,6 +226,9 @@ void DataTypeNumberSerDe<T>::read_column_from_arrow(IColumn& column,
std::string(rb.position(), rb.count()).c_str());
}
col_data.emplace_back(val);
} else {
// insert default value
col_data.emplace_back(Int128());
}
}
return;
Expand Down
61 changes: 44 additions & 17 deletions be/test/vec/data_types/common_data_type_serder_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,14 @@
#include <fstream>
#include <iostream>

#include "olap/schema.h"
#include "runtime/descriptors.cpp"
#include "arrow/type.h"
#include "runtime/descriptors.h"
#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_map.h"
#include "vec/columns/columns_number.h"
#include "vec/core/field.h"
#include "vec/core/sort_block.h"
#include "vec/core/sort_description.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/utils/arrow_column_to_doris_column.h"

// this test is gonna to be a data type serialize and deserialize functions
Expand Down Expand Up @@ -151,16 +143,14 @@ class CommonDataTypeSerdeTest : public ::testing::Test {

while (std::getline(file, line)) {
std::stringstream lineStream(line);
// std::cout << "whole : " << lineStream.str() << std::endl;
std::string value;
int l_idx = 0;
int c_idx = 0;
std::vector<string> row;
std::vector<std::string> row;
while (std::getline(lineStream, value, spliter)) {
if (idxes.contains(l_idx)) {
if (!value.starts_with("//") && idxes.contains(l_idx)) {
// load csv data
Slice string_slice(value.data(), value.size());
std::cout << "origin : " << string_slice << std::endl;
Status st;
// deserialize data
if constexpr (is_hive_format) {
Expand Down Expand Up @@ -210,16 +200,14 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
if (generate_res_file) {
// generate res
auto pos = file_path.find_last_of(".");
string hive_format = is_hive_format ? "_hive" : "";
std::string hive_format = is_hive_format ? "_hive" : "";
std::string res_file = file_path.substr(0, pos) + hive_format + "_serde_res.csv";
std::ofstream res_f(res_file);
if (!res_f.is_open()) {
throw std::ios_base::failure("Failed to open file." + res_file);
}
for (size_t r = 0; r < assert_str_cols[0]->size(); ++r) {
for (size_t c = 0; c < assert_str_cols.size(); ++c) {
std::cout << assert_str_cols[c]->get_data_at(r).to_string() << spliter
<< std::endl;
res_f << assert_str_cols[c]->get_data_at(r).to_string() << spliter;
}
res_f << std::endl;
Expand All @@ -230,6 +218,8 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
}

// standard hive text ser-deserialize assert function
// pb serde now is only used RPCFncall and fold_constant_executor which just write column data to pb value means
// just call write_column_to_pb
static void assert_pb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders) {
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
Expand Down Expand Up @@ -263,6 +253,21 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
static void assert_jsonb_format(MutableColumns& load_cols, DataTypeSerDeSPtrs serders) {
Arena pool;
auto jsonb_column = ColumnString::create(); // jsonb column
// maybe these load_cols has different size, so we keep it same
size_t max_row_size = load_cols[0]->size();
for (size_t i = 1; i < load_cols.size(); ++i) {
if (load_cols[i]->size() > max_row_size) {
max_row_size = load_cols[i]->size();
}
}
// keep same rows
for (size_t i = 0; i < load_cols.size(); ++i) {
if (load_cols[i]->size() < max_row_size) {
load_cols[i]->insert_many_defaults(max_row_size - load_cols[i]->size());
} else if (load_cols[i]->size() > max_row_size) {
load_cols[i]->resize(max_row_size);
}
}
jsonb_column->reserve(load_cols[0]->size());
MutableColumns assert_cols;
for (size_t i = 0; i < load_cols.size(); ++i) {
Expand Down Expand Up @@ -322,6 +327,21 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
DataTypes types) {
// make a block to write to arrow
auto block = std::make_shared<Block>();
// maybe these load_cols has different size, so we keep it same
size_t max_row_size = load_cols[0]->size();
for (size_t i = 1; i < load_cols.size(); ++i) {
if (load_cols[i]->size() > max_row_size) {
max_row_size = load_cols[i]->size();
}
}
// keep same rows
for (size_t i = 0; i < load_cols.size(); ++i) {
if (load_cols[i]->size() < max_row_size) {
load_cols[i]->insert_many_defaults(max_row_size - load_cols[i]->size());
} else if (load_cols[i]->size() > max_row_size) {
load_cols[i]->resize(max_row_size);
}
}
for (size_t i = 0; i < load_cols.size(); ++i) {
auto& col = load_cols[i];
block->insert(ColumnWithTypeAndName(std::move(col), types[i], types[i]->get_name()));
Expand All @@ -330,13 +350,13 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
std::cout << "block: " << block->dump_structure() << std::endl;
std::shared_ptr<arrow::Schema> block_arrow_schema;
EXPECT_EQ(get_arrow_schema_from_block(*block, &block_arrow_schema, "UTC"), Status::OK());
std::cout << "schema: " << block_arrow_schema->ToString(true) << std::endl;
// convert block to arrow
std::shared_ptr<arrow::RecordBatch> result;
cctz::time_zone _timezone_obj; //default UTC
Status stt = convert_to_arrow_batch(*block, block_arrow_schema,
arrow::default_memory_pool(), &result, _timezone_obj);
EXPECT_EQ(Status::OK(), stt) << "convert block to arrow failed" << stt.to_string();

// deserialize arrow to block
auto assert_block = block->clone_empty();
auto rows = block->rows();
Expand All @@ -347,15 +367,22 @@ class CommonDataTypeSerdeTest : public ::testing::Test {
array.get(), 0, column_with_type_and_name.column,
column_with_type_and_name.type, rows, _timezone_obj);
// do check data
std::cout << "arrow_column_to_doris_column done: "
<< column_with_type_and_name.column->get_name()
<< " with column size: " << column_with_type_and_name.column->size()
<< std::endl;
std::cout << assert_block.dump_structure() << std::endl;
EXPECT_EQ(Status::OK(), ret) << "convert arrow to block failed" << ret.to_string();
auto& col = block->get_by_position(i).column;
auto& assert_col = column_with_type_and_name.column;
EXPECT_EQ(assert_col->size(), col->size());
for (size_t j = 0; j < col->size(); ++j) {
auto cell = col->operator[](j);
auto assert_cell = assert_col->operator[](j);
EXPECT_EQ(cell, assert_cell) << "column: " << col->get_name() << " row: " << j;
}
}
std::cout << "assert block: " << assert_block.dump_structure() << std::endl;
}

// assert rapidjson format
Expand Down
Loading

0 comments on commit f527910

Please sign in to comment.