Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/vec/columns/column_varbinary.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {

StringRef get_data_at(size_t n) const override { return _data[n].to_string_ref(); }

char* alloc(size_t length) { return _arena.alloc(length); }

void insert(const Field& x) override {
auto value = vectorized::get<const doris::StringView&>(x);
insert_data(value.data(), value.size());
Expand Down Expand Up @@ -101,6 +103,12 @@ class ColumnVarbinary final : public COWHelper<IColumn, ColumnVarbinary> {

void insert_default() override { _data.push_back(doris::StringView()); }

int compare_at(size_t n, size_t m, const IColumn& rhs_,
int /*nan_direction_hint*/) const override {
const ColumnVarbinary& rhs = assert_cast<const ColumnVarbinary&>(rhs_);
return this->_data[n].compare(rhs.get_data()[m]);
}

void pop_back(size_t n) override { resize(size() - n); }

StringRef serialize_value_into_arena(size_t n, Arena& arena,
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/common/pod_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ class PODArray : public PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_r

void pop_back() { this->c_end -= this->byte_size(1); }

void pop_back(size_t n) {
DCHECK_GE(this->size(), n);
this->c_end -= this->byte_size(n);
}

/// Do not insert into the array a piece of itself. Because with the resize, the iterators on themselves can be invalidated.
template <typename It1, typename It2, typename... TAllocatorParams>
void insert_prepare(It1 from_begin, It2 from_end, TAllocatorParams&&... allocator_params) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/common/string_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class StringView {
uint32_t size() const { return size_; }
bool empty() const { return size() == 0; }

void set_size(uint32_t size) { size_ = size; }

bool operator==(const StringView& other) const;
friend std::ostream& operator<<(std::ostream& os, const StringView& stringView) {
os.write(stringView.data(), stringView.size());
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/functions/function_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ struct UnHexImpl {
static constexpr auto name = Name::name;
using ReturnType = DataTypeString;
using ColumnType = ColumnString;
static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_STRING;

static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
ColumnString::Chars& dst_data, ColumnString::Offsets& dst_offsets) {
Expand Down Expand Up @@ -1081,6 +1082,7 @@ struct ToBase64Impl {
static constexpr auto name = "to_base64";
using ReturnType = DataTypeString;
using ColumnType = ColumnString;
static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_STRING;

static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
ColumnString::Chars& dst_data, ColumnString::Offsets& dst_offsets) {
Expand Down
81 changes: 57 additions & 24 deletions be/src/vec/functions/function_totype.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_varbinary.h"
#include "vec/columns/column_vector.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_bitmap.h"
Expand Down Expand Up @@ -123,6 +124,13 @@ class FunctionUnaryToType : public IFunction {
block.replace_by_position(result, std::move(col_res));
return Status::OK();
}
} else if constexpr (Impl::PrimitiveTypeImpl == PrimitiveType::TYPE_VARBINARY) {
if (const auto* col = check_and_get_column<ColumnVarbinary>(column.get())) {
auto col_res = Impl::ReturnColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_data(), col_res->get_data()));
block.replace_by_position(result, std::move(col_res));
return Status::OK();
}
}
return Status::RuntimeError("Illegal column {} of argument of function {}",
block.get_by_position(arguments[0]).column->get_name(),
Expand Down Expand Up @@ -465,11 +473,19 @@ class FunctionStringOperateToNullType : public IFunction {

auto& col_ptr = block.get_by_position(arguments[0]).column;

auto res = Impl::ColumnType::create();
if (const auto* col = check_and_get_column<ColumnString>(col_ptr.get())) {
auto col_res = Impl::ColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_chars(), col->get_offsets(), col_res->get_chars(),
col_res->get_offsets(), null_map->get_data()));
if constexpr (std::is_same_v<typename Impl::ReturnType, DataTypeString>) {
RETURN_IF_ERROR(Impl::vector(col->get_chars(), col->get_offsets(),
col_res->get_chars(), col_res->get_offsets(),
null_map->get_data()));
} else if (std::is_same_v<typename Impl::ReturnType, DataTypeVarbinary>) {
RETURN_IF_ERROR(Impl::vector(col->get_chars(), col->get_offsets(), col_res.get(),
null_map->get_data()));
} else {
return Status::RuntimeError("Illegal returntype {} of argument of function {}",
col_res->get_name(), get_name());
}
block.replace_by_position(
result, ColumnNullable::create(std::move(col_res), std::move(null_map)));
} else {
Expand Down Expand Up @@ -506,31 +522,48 @@ class FunctionStringEncode : public IFunction {
if constexpr (is_allow_null) {
auto null_map = ColumnUInt8::create(input_rows_count, 0);
auto& null_map_data = null_map->get_data();
if (const auto* col = assert_cast<const ColumnString*>(col_ptr.get())) {
auto col_res = Impl::ColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_chars(), col->get_offsets(),
col_res->get_chars(), col_res->get_offsets(),
&null_map_data));
block.get_by_position(result).column =
ColumnNullable::create(std::move(col_res), std::move(null_map));
} else {
return Status::RuntimeError("Illegal column {} of argument of function {}",
block.get_by_position(arguments[0]).column->get_name(),
get_name());
if constexpr (Impl::PrimitiveTypeImpl == PrimitiveType::TYPE_STRING) {
if (const auto* col = assert_cast<const ColumnString*>(col_ptr.get())) {
auto col_res = Impl::ColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_chars(), col->get_offsets(),
col_res->get_chars(), col_res->get_offsets(),
&null_map_data));
block.get_by_position(result).column =
ColumnNullable::create(std::move(col_res), std::move(null_map));
return Status::OK();
}
} else if (Impl::PrimitiveTypeImpl == PrimitiveType::TYPE_VARBINARY) {
if (const auto* col = assert_cast<const ColumnVarbinary*>(col_ptr.get())) {
auto col_res = Impl::ColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_data(), col_res->get_chars(),
col_res->get_offsets(), &null_map_data));
block.get_by_position(result).column =
ColumnNullable::create(std::move(col_res), std::move(null_map));
return Status::OK();
}
}
} else {
if (const auto* col = assert_cast<const ColumnString*>(col_ptr.get())) {
auto col_res = Impl::ColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_chars(), col->get_offsets(),
col_res->get_chars(), col_res->get_offsets()));
block.replace_by_position(result, std::move(col_res));
} else {
return Status::RuntimeError("Illegal column {} of argument of function {}",
block.get_by_position(arguments[0]).column->get_name(),
get_name());
if constexpr (Impl::PrimitiveTypeImpl == PrimitiveType::TYPE_STRING) {
if (const auto* col = assert_cast<const ColumnString*>(col_ptr.get())) {
auto col_res = Impl::ColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_chars(), col->get_offsets(),
col_res->get_chars(), col_res->get_offsets()));
block.replace_by_position(result, std::move(col_res));
return Status::OK();
}
} else if (Impl::PrimitiveTypeImpl == PrimitiveType::TYPE_VARBINARY) {
if (const auto* col = assert_cast<const ColumnVarbinary*>(col_ptr.get())) {
auto col_res = Impl::ColumnType::create();
RETURN_IF_ERROR(Impl::vector(col->get_data(), col_res->get_chars(),
col_res->get_offsets()));
block.replace_by_position(result, std::move(col_res));
return Status::OK();
}
}
}
return Status::OK();
return Status::RuntimeError("Illegal column {} of argument of function {}",
block.get_by_position(arguments[0]).column->get_name(),
get_name());
}
};
} // namespace doris::vectorized
136 changes: 124 additions & 12 deletions be/src/vec/functions/function_varbinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
// specific language governing permissions and limitations
// under the License.

#include "vec/functions/function_varbinary.h"

#include <glog/logging.h>

#include <cstddef>
#include <memory>

#include "common/status.h"
#include "util/url_coding.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
Expand All @@ -32,10 +35,12 @@
#include "vec/data_types/data_type_varbinary.h"
#include "vec/functions/function.h"
#include "vec/functions/function_helpers.h"
#include "vec/functions/function_totype.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/functions/string_hex_util.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"

class FunctionToBinary : public IFunction {
public:
Expand All @@ -59,30 +64,24 @@ class FunctionToBinary : public IFunction {
auto col_res = ColumnVarbinary::create();
const auto& data = col->get_chars();
const auto& offsets = col->get_offsets();
col_res->get_data().assign(input_rows_count, StringView());

std::array<char, string_hex::MAX_STACK_CIPHER_LEN> stack_buf;
std::vector<char> heap_buf;
for (int i = 0; i < input_rows_count; ++i) {
const auto* source = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
ColumnString::Offset srclen = offsets[i] - offsets[i - 1];

auto cipher_len = srclen / 2;
char* dst = nullptr;
if (cipher_len <= stack_buf.size()) {
dst = stack_buf.data();
} else {
heap_buf.resize(cipher_len);
dst = heap_buf.data();
}
int cipher_len = srclen / 2;
auto [cipher_inline, dst] = VarBinaryOP::alloc(col_res.get(), i, cipher_len);

int outlen = string_hex::hex_decode(source, srclen, dst);

// if empty string or decode failed, may return NULL
if (outlen == 0) {
null_map->get_data()[i] = 1;
col_res->insert_default();
continue;
}
col_res->insert_data(dst, outlen);
VarBinaryOP::check_and_insert_data(col_res->get_data()[i], dst,
cast_set<uint32_t>(outlen), cipher_inline);
}
block.replace_by_position(
result, ColumnNullable::create(std::move(col_res), std::move(null_map)));
Expand Down Expand Up @@ -143,11 +142,124 @@ class FunctionFromBinary : public IFunction {
}
};

struct NameVarbinaryLength {
static constexpr auto name = "length";
};

struct VarbinaryLengthImpl {
using ReturnType = DataTypeInt32;
using ReturnColumnType = ColumnInt32;
static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_VARBINARY;

static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeVarbinary>()};
}

static Status vector(const PaddedPODArray<doris::StringView>& data,
PaddedPODArray<Int32>& res) {
size_t rows_count = data.size();
res.resize(rows_count);
for (size_t i = 0; i < rows_count; ++i) {
res[i] = data[i].size();
}
return Status::OK();
}
};

using FunctionBinaryLength = FunctionUnaryToType<VarbinaryLengthImpl, NameVarbinaryLength>;

struct ToBase64BinaryImpl {
static constexpr auto name = "to_base64_binary";
using ReturnType = DataTypeString;
using ColumnType = ColumnString;
static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_VARBINARY;

static Status vector(const PaddedPODArray<doris::StringView>& data,
ColumnString::Chars& dst_data, ColumnString::Offsets& dst_offsets) {
auto rows_count = data.size();
dst_offsets.resize(rows_count);

size_t total_size = 0;
for (size_t i = 0; i < rows_count; i++) {
total_size += 4 * ((data[i].size() + 2) / 3);
}
ColumnString::check_chars_length(total_size, rows_count);
dst_data.resize(total_size);
auto* dst_data_ptr = dst_data.data();
size_t offset = 0;

for (size_t i = 0; i < rows_count; i++) {
auto binary = data[i];
auto binlen = binary.size();

if (UNLIKELY(binlen == 0)) {
dst_offsets[i] = cast_set<uint32_t>(offset);
continue;
}

auto outlen = doris::base64_encode(
reinterpret_cast<const unsigned char*>(binary.data()), binlen,
reinterpret_cast<unsigned char*>(dst_data_ptr + offset));

offset += outlen;
dst_offsets[i] = cast_set<uint32_t>(offset);
}

dst_data.pop_back(total_size - offset);

return Status::OK();
}
};

using FunctionToBase64Binary = FunctionStringEncode<ToBase64BinaryImpl, false>;

struct FromBase64BinaryImpl {
static constexpr auto name = "from_base64_binary";
using ReturnType = DataTypeVarbinary;
using ColumnType = ColumnVarbinary;

static Status vector(const ColumnString::Chars& data, const ColumnString::Offsets& offsets,
ColumnVarbinary* res, NullMap& null_map) {
auto rows_count = offsets.size();
res->get_data().assign(rows_count, StringView());

for (size_t i = 0; i < rows_count; i++) {
const auto* source = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
ColumnString::Offset slen = offsets[i] - offsets[i - 1];

if (UNLIKELY(slen == 0)) {
continue;
}

int cipher_len = slen / 4 * 3;
auto [cipher_inline, dst] = VarBinaryOP::alloc(res, i, cipher_len);

auto outlen = doris::base64_decode(source, slen, dst);

if (outlen < 0) {
null_map[i] = 1;
} else {
VarBinaryOP::check_and_insert_data(res->get_data()[i], dst,
cast_set<uint32_t>(outlen), cipher_inline);
}
}

return Status::OK();
}
};

using FunctionFromBase64Binary = FunctionStringOperateToNullType<FromBase64BinaryImpl>;

void register_function_binary(SimpleFunctionFactory& factory) {
factory.register_function<FunctionBinaryLength>();
factory.register_function<FunctionToBase64Binary>();
factory.register_function<FunctionFromBase64Binary>();
factory.register_function<FunctionSubBinary>();
factory.register_function<FunctionToBinary>();
factory.register_function<FunctionFromBinary>();
factory.register_alias("from_binary", "from_hex");
factory.register_alias("to_binary", "to_hex");
}

#include "common/compile_check_end.h"
} // namespace doris::vectorized
Loading
Loading