Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions be/src/runtime/jsonb_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ struct JsonBinaryValue {
size_t len = 0;
JsonbParser parser;

JsonBinaryValue() : ptr(nullptr), len(0) {}
JsonBinaryValue(char* ptr, int len) {
JsonBinaryValue() = default;
JsonBinaryValue(char* ptr, size_t len) {
static_cast<void>(from_json_string(const_cast<const char*>(ptr), len));
}
JsonBinaryValue(const std::string& s) {
static_cast<void>(from_json_string(s.c_str(), s.length()));
}
JsonBinaryValue(const char* ptr, int len) { static_cast<void>(from_json_string(ptr, len)); }

const char* value() { return ptr; }
const char* value() const { return ptr; }

size_t size() { return len; }
size_t size() const { return len; }

void replace(char* ptr, int len) {
void replace(const char* ptr, int len) {
this->ptr = ptr;
this->len = len;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class ColumnStr final : public COWHelper<IColumn, ColumnStr<T>> {
check_chars_length(new_size, old_size + 1);

chars.resize(new_size);
DCHECK(s.data != nullptr);
DCHECK(chars.data() != nullptr);
memcpy(chars.data() + old_size, s.data, size_to_append);
offsets.push_back(new_size);
sanity_check_simple();
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/common/string_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

namespace doris::vectorized {

// store and commit data. only after commit the data is effective on its' base(ColumnString)
// everytime commit, the _data add one row.
class BufferWritable final {
public:
explicit BufferWritable(ColumnString& vector)
Expand Down Expand Up @@ -64,6 +66,7 @@ class BufferWritable final {
using VectorBufferWriter = BufferWritable;
using BufferWriter = BufferWritable;

// There is consumption of the buffer in the read method.
class BufferReadable {
public:
explicit BufferReadable(StringRef& ref) : _data(ref.data) {}
Expand Down
7 changes: 1 addition & 6 deletions be/src/vec/core/field.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@
#include "vec/io/io_helper.h"
#include "vec/io/var_int.h"

namespace doris {
namespace vectorized {
namespace doris::vectorized {
class BufferReadable;
class BufferWritable;
} // namespace vectorized
} // namespace doris

namespace doris::vectorized {

void read_binary(Array& x, BufferReadable& buf) {
size_t size;
Expand Down
62 changes: 32 additions & 30 deletions be/src/vec/core/field.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,59 +156,61 @@ DEFINE_FIELD_VECTOR(Map);

using VariantMap = std::map<PathInData, Field>;

//TODO: rethink if we really need this? it only save one pointer from std::string
// not POD type so could only use read/write_json_binary instead of read/write_binary
class JsonbField {
public:
JsonbField() = default;
~JsonbField() = default; // unique_ptr will handle cleanup automatically

JsonbField(const char* ptr, uint32_t len) : size(len) {
data = new char[size];
JsonbField(const char* ptr, size_t len) : size(len) {
data = std::make_unique<char[]>(size);
if (!data) {
LOG(FATAL) << "new data buffer failed, size: " << size;
}
memcpy(data, ptr, size);
if (size > 0) {
memcpy(data.get(), ptr, size);
}
}

JsonbField(const JsonbField& x) : size(x.size) {
data = new char[size];
data = std::make_unique<char[]>(size);
if (!data) {
LOG(FATAL) << "new data buffer failed, size: " << size;
}
memcpy(data, x.data, size);
if (size > 0) {
memcpy(data.get(), x.data.get(), size);
}
}

JsonbField(JsonbField&& x) : data(x.data), size(x.size) {
x.data = nullptr;
x.size = 0;
}
JsonbField(JsonbField&& x) noexcept : data(std::move(x.data)), size(x.size) { x.size = 0; }

// dispatch for all type of storage. so need this. but not really used now.
JsonbField& operator=(const JsonbField& x) {
data = new char[size];
if (!data) {
LOG(FATAL) << "new data buffer failed, size: " << size;
if (this != &x) {
data = std::make_unique<char[]>(x.size);
if (!data) {
LOG(FATAL) << "new data buffer failed, size: " << x.size;
}
if (x.size > 0) {
memcpy(data.get(), x.data.get(), x.size);
}
size = x.size;
}
memcpy(data, x.data, size);
return *this;
}

JsonbField& operator=(JsonbField&& x) {
if (data) {
delete[] data;
JsonbField& operator=(JsonbField&& x) noexcept {
if (this != &x) {
data = std::move(x.data);
size = x.size;
x.size = 0;
}
data = x.data;
size = x.size;
x.data = nullptr;
x.size = 0;
return *this;
}

~JsonbField() {
if (data) {
delete[] data;
}
}

const char* get_value() const { return data; }
uint32_t get_size() const { return size; }
const char* get_value() const { return data.get(); }
size_t get_size() const { return size; }

bool operator<(const JsonbField& r) const {
LOG(FATAL) << "comparing between JsonbField is not supported";
Expand Down Expand Up @@ -246,8 +248,8 @@ class JsonbField {
}

private:
char* data = nullptr;
uint32_t size = 0;
std::unique_ptr<char[]> data = nullptr;
size_t size = 0;
};

template <typename T>
Expand Down
25 changes: 14 additions & 11 deletions be/src/vec/data_types/data_type_jsonb.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#pragma once

#include <gen_cpp/Types_types.h>
#include <stddef.h>
#include <stdint.h>

#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>

Expand All @@ -36,15 +36,13 @@
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/data_types/serde/data_type_string_serde.h"

namespace doris {
namespace vectorized {
namespace doris::vectorized {
#include "common/compile_check_begin.h"

class BufferWritable;
class IColumn;
class ReadBuffer;
} // namespace vectorized
} // namespace doris

namespace doris::vectorized {
class DataTypeJsonb final : public IDataType {
public:
using ColumnType = ColumnString;
Expand All @@ -68,10 +66,13 @@ class DataTypeJsonb final : public IDataType {

MutableColumnPtr create_column() const override;

virtual Field get_default() const override {
Field get_default() const override {
std::string default_json = "null";
JsonBinaryValue binary_val(default_json.c_str(), default_json.size());
return JsonbField(binary_val.value(), binary_val.size());
// convert default_json to binary
JsonBinaryValue binary_val(default_json.c_str(), static_cast<Int32>(default_json.size()));
// Throw exception if default_json.size() is large than INT32_MAX
// JsonbField keeps its own memory
return JsonbField(binary_val.value(), static_cast<UInt32>(binary_val.size()));
}

Field get_field(const TExprNode& node) const override {
Expand Down Expand Up @@ -100,4 +101,6 @@ class DataTypeJsonb final : public IDataType {
private:
DataTypeString data_type_string;
};
} // namespace doris::vectorized

#include "common/compile_check_end.h"
} // namespace doris::vectorized
12 changes: 7 additions & 5 deletions be/src/vec/io/io_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "vec/common/string_buffer.hpp"
#include "vec/common/string_ref.h"
#include "vec/common/uint128.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/io/reader_buffer.h"
#include "vec/io/var_int.h"
Expand Down Expand Up @@ -126,7 +127,7 @@ inline void write_string_binary(const char* s, BufferWritable& buf) {
write_string_binary(StringRef {std::string(s)}, buf);
}

inline void write_json_binary(JsonbField s, BufferWritable& buf) {
inline void write_json_binary(const JsonbField& s, BufferWritable& buf) {
write_string_binary(StringRef {s.get_value(), s.get_size()}, buf);
}

Expand Down Expand Up @@ -200,13 +201,14 @@ inline StringRef read_string_binary_into(Arena& arena, BufferReadable& buf) {
char* data = arena.alloc(size);
buf.read(data, size);

return StringRef(data, size);
return {data, size};
}

inline void read_json_binary(JsonbField val, BufferReadable& buf,
inline void read_json_binary(JsonbField& val, BufferReadable& buf,
size_t MAX_JSON_SIZE = DEFAULT_MAX_JSON_SIZE) {
StringRef jrf = StringRef {val.get_value(), val.get_size()};
read_string_binary(jrf, buf);
StringRef result;
read_string_binary(result, buf);
val = JsonbField(result.data, result.size);
}

template <typename Type>
Expand Down
35 changes: 26 additions & 9 deletions be/src/vec/io/var_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,35 +93,44 @@ inline void read_var_uint(UInt64& x, std::istream& istr) {
for (size_t i = 0; i < 9; ++i) {
UInt64 byte = istr.get();
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80)) return;
if (!(byte & 0x80)) {
return;
}
}
}

inline void write_var_uint(UInt64 x, std::ostream& ostr) {
for (size_t i = 0; i < 9; ++i) {
uint8_t byte = x & 0x7F;
if (x > 0x7F) byte |= 0x80;
if (x > 0x7F) {
byte |= 0x80;
}

ostr.put(byte);

x >>= 7;
if (!x) return;
if (!x) {
return;
}
}
}

// TODO: do real implement in the future
inline void read_var_uint(UInt64& x, BufferReadable& buf) {
x = 0;
// get length from first byte firstly
uint8_t len = 0;
buf.read((char*)&len, 1);
auto ref = buf.read(len);

// read data and set it to x per byte.
char* bytes = const_cast<char*>(ref.data);
for (size_t i = 0; i < 9; ++i) {
UInt64 byte = bytes[i];
x |= (byte & 0x7F) << (7 * i);

if (!(byte & 0x80)) return;
if (!(byte & 0x80)) {
return;
}
}
}

Expand All @@ -130,12 +139,16 @@ inline void write_var_uint(UInt64 x, BufferWritable& ostr) {
uint8_t i = 0;
while (i < 9) {
uint8_t byte = x & 0x7F;
if (x > 0x7F) byte |= 0x80;
if (x > 0x7F) {
byte |= 0x80;
}

bytes[i++] = byte;

x >>= 7;
if (!x) break;
if (!x) {
break;
}
}
ostr.write((char*)&i, 1);
ostr.write(bytes, i);
Expand All @@ -144,13 +157,17 @@ inline void write_var_uint(UInt64 x, BufferWritable& ostr) {
inline char* write_var_uint(UInt64 x, char* ostr) {
for (size_t i = 0; i < 9; ++i) {
uint8_t byte = x & 0x7F;
if (x > 0x7F) byte |= 0x80;
if (x > 0x7F) {
byte |= 0x80;
}

*ostr = byte;
++ostr;

x >>= 7;
if (!x) return ostr;
if (!x) {
return ostr;
}
}

return ostr;
Expand Down
Loading
Loading