Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <glog/logging.h>
#include <stdint.h>

Expand Down Expand Up @@ -84,12 +85,10 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
case TYPE_LARGEINT:
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_HLL:
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_STRING:
case TYPE_JSONB:
case TYPE_OBJECT:
*result = arrow::utf8();
break;
case TYPE_DATEV2:
Expand Down Expand Up @@ -150,6 +149,12 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
*result = arrow::utf8();
break;
}
case TYPE_QUANTILE_STATE:
case TYPE_OBJECT:
case TYPE_HLL: {
*result = arrow::binary();
break;
}
default:
return Status::InvalidArgument("Unknown primitive type({}) convert to Arrow type",
type.type);
Expand Down
59 changes: 56 additions & 3 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "data_type_bitmap_serde.h"

#include <arrow/array/builder_binary.h>
#include <gen_cpp/types.pb.h>

#include <string>
Expand All @@ -27,12 +28,36 @@
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
#include "vec/data_types/serde/data_type_nullable_serde.h"

namespace doris {

namespace vectorized {
class IColumn;

Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int start_idx,
int end_idx, BufferWritable& bw,
FormatOptions& options) const {
SERIALIZE_COLUMN_TO_JSON();
}

Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,
BufferWritable& bw,
FormatOptions& options) const {
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
return Status::OK();
}

Status DataTypeBitMapSerDe::deserialize_column_from_json_vector(
IColumn& column, std::vector<Slice>& slices, int* num_deserialized,
const FormatOptions& options) const {
Expand Down Expand Up @@ -95,6 +120,26 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr
result.writeEndBinary();
}

void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int start,
int end, const cctz::time_zone& ctz) const {
const auto& col = assert_cast<const ColumnBitmap&>(column);
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
checkArrowStatus(builder.AppendNull(), column.get_name(),
array_builder->type()->name());
} else {
auto& bitmap_value = const_cast<BitmapValue&>(col.get_element(string_i));
std::string memory_buffer(bitmap_value.getSizeInBytes(), '0');
bitmap_value.write_to(memory_buffer.data());
checkArrowStatus(
builder.Append(memory_buffer.data(), static_cast<int>(memory_buffer.size())),
column.get_name(), array_builder->type()->name());
}
}
}

void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const {
auto& col = reinterpret_cast<ColumnBitmap&>(column);
auto blob = static_cast<const JsonbBlobVal*>(arg);
Expand Down Expand Up @@ -147,11 +192,19 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con
auto& col_data = assert_cast<const ColumnBitmap&>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
const auto& ele = col_data.get_data_at(row_id);
cur_batch->data[row_id] = const_cast<char*>(ele.data);
cur_batch->length[row_id] = ele.size;
auto bitmap_value = const_cast<BitmapValue&>(col_data.get_element(row_id));
size_t len = bitmap_value.getSizeInBytes();

REALLOC_MEMORY_FOR_ORC_WRITER()

bitmap_value.write_to(const_cast<char*>(bufferRef.data) + offset);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

Expand Down
13 changes: 3 additions & 10 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {
DataTypeBitMapSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {};

Status serialize_one_cell_to_json(const IColumn& column, int row_num, BufferWritable& bw,
FormatOptions& options) const override {
return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name());
}
FormatOptions& options) const override;

Status serialize_column_to_json(const IColumn& column, int start_idx, int end_idx,
BufferWritable& bw, FormatOptions& options) const override {
return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name());
}
BufferWritable& bw, FormatOptions& options) const override;

Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const override;
Expand All @@ -63,10 +59,7 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {

void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int start, int end,
const cctz::time_zone& ctz) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_arrow with type " + column.get_name());
}
const cctz::time_zone& ctz) const override;

void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
int end, const cctz::time_zone& ctz) const override {
Expand Down
20 changes: 2 additions & 18 deletions be/src/vec/data_types/serde/data_type_date64_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con
auto& col_data = static_cast<const ColumnVector<Int64>&>(column).get_data();
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
const size_t begin_off = offset;
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
Expand All @@ -309,18 +300,11 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con

REALLOC_MEMORY_FOR_ORC_WRITER()

cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off;
data_off += cur_batch->length[row_id];
}
}

buffer_list.emplace_back(bufferRef);
cur_batch->numElements = end - start;
return Status::OK();
}
Expand Down
48 changes: 23 additions & 25 deletions be/src/vec/data_types/serde/data_type_hll_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <stddef.h>
#include <stdint.h>

#include <memory>
#include <string>

#include "arrow/array/builder_binary.h"
Expand All @@ -47,28 +48,17 @@ Status DataTypeHLLSerDe::serialize_column_to_json(const IColumn& column, int sta
Status DataTypeHLLSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,
BufferWritable& bw,
FormatOptions& options) const {
if (!options._output_object_data) {
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
return Status::OK();
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
auto col_row = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = col_row.first;
row_num = col_row.second;
auto& data = const_cast<HyperLogLog&>(assert_cast<const ColumnHLL&>(*ptr).get_element(row_num));
std::unique_ptr<char[]> buf =
std::make_unique_for_overwrite<char[]>(data.max_serialized_size());
size_t size = data.serialize((uint8*)buf.get());
bw.write(buf.get(), size);
return Status::OK();
}

Expand Down Expand Up @@ -137,7 +127,7 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const NullMa
arrow::ArrayBuilder* array_builder, int start, int end,
const cctz::time_zone& ctz) const {
const auto& col = assert_cast<const ColumnHLL&>(column);
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
checkArrowStatus(builder.AppendNull(), column.get_name(),
Expand Down Expand Up @@ -195,11 +185,19 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const
auto& col_data = assert_cast<const ColumnHLL&>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
const auto& ele = col_data.get_data_at(row_id);
cur_batch->data[row_id] = const_cast<char*>(ele.data);
cur_batch->length[row_id] = ele.size;
auto hll_value = const_cast<HyperLogLog&>(col_data.get_element(row_id));
size_t len = hll_value.max_serialized_size();

REALLOC_MEMORY_FOR_ORC_WRITER()

hll_value.serialize((uint8_t*)(bufferRef.data) + offset);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

Expand Down
37 changes: 11 additions & 26 deletions be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,38 +182,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const
int end, std::vector<StringRef>& buffer_list) const {
const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data();
orc::StringVectorBatch* cur_batch = assert_cast<orc::StringVectorBatch*>(orc_col_batch);
char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
const size_t begin_off = offset;

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
continue;
}
std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
size_t len = ipv6_str.size();

REALLOC_MEMORY_FOR_ORC_WRITER()
INIT_MEMORY_FOR_ORC_WRITER()

strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str());
offset += len;
cur_batch->length[row_id] = len;
}
size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off;
data_off += cur_batch->length[row_id];
std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
size_t len = ipv6_str.size();

REALLOC_MEMORY_FOR_ORC_WRITER()

strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str());
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}
buffer_list.emplace_back(bufferRef);

cur_batch->numElements = end - start;
return Status::OK();
}
Expand Down
28 changes: 27 additions & 1 deletion be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>

#include <cstddef>
#include <cstdint>
#include <memory>

#include "arrow/array/builder_binary.h"
#include "common/exception.h"
#include "common/status.h"
Expand Down Expand Up @@ -136,7 +140,29 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch, int start,
int end, std::vector<StringRef>& buffer_list) const {
return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name());
auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
const auto& string_column = assert_cast<const ColumnString&>(column);

INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
std::string_view string_ref = string_column.get_data_at(row_id).to_string_view();
auto serialized_value = std::make_unique<std::string>(
JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size()));
auto len = serialized_value->size();

REALLOC_MEMORY_FOR_ORC_WRITER()

memcpy(const_cast<char*>(bufferRef.data) + offset, serialized_value->data(), len);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

cur_batch->numElements = end - start;
return Status::OK();
}

void convert_jsonb_to_rapidjson(const JsonbValue& val, rapidjson::Value& target,
Expand Down
Loading
Loading