Skip to content

Commit

Permalink
[Feature](Variant) support variant load (apache#26572)
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored and wsjz committed Nov 19, 2023
1 parent f90f501 commit fe2648f
Show file tree
Hide file tree
Showing 105 changed files with 3,174 additions and 1,056 deletions.
5 changes: 3 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,9 @@ DEFINE_Bool(enable_workload_group_for_scan, "false");
// Will remove after fully test.
DEFINE_Bool(enable_index_apply_preds_except_leafnode_of_andnode, "true");

DEFINE_mBool(enable_flatten_nested_for_variant, "false");
DEFINE_mDouble(ratio_of_defaults_as_sparse_column, "0.95");

// block file cache
DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
Expand Down Expand Up @@ -1013,8 +1016,6 @@ DEFINE_Int32(max_depth_in_bkd_tree, "32");
DEFINE_Bool(inverted_index_compaction_enable, "false");
// use num_broadcast_buffer blocks as buffer to do broadcast
DEFINE_Int32(num_broadcast_buffer, "32");
// semi-structure configs
DEFINE_Bool(enable_parse_multi_dimession_array, "false");

// max depth of expression tree allowed.
DEFINE_Int32(max_depth_of_expr_tree, "600");
Expand Down
8 changes: 6 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1055,8 +1055,6 @@ DECLARE_Int32(max_depth_in_bkd_tree);
DECLARE_Bool(inverted_index_compaction_enable);
// use num_broadcast_buffer blocks as buffer to do broadcast
DECLARE_Int32(num_broadcast_buffer);
// semi-structure configs
DECLARE_Bool(enable_parse_multi_dimession_array);

// max depth of expression tree allowed.
DECLARE_Int32(max_depth_of_expr_tree);
Expand Down Expand Up @@ -1134,6 +1132,12 @@ DECLARE_mInt64(lookup_connection_cache_bytes_limit);

// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT
DECLARE_mInt64(LZ4_HC_compression_level);
// Whether flatten nested arrays in variant column
// Notice: TEST ONLY
DECLARE_mBool(enable_flatten_nested_for_variant);
// Threshold of a column as sparse column
// Notice: TEST ONLY
DECLARE_mDouble(ratio_of_defaults_as_sparse_column);

DECLARE_mBool(enable_merge_on_write_correctness_check);

Expand Down
31 changes: 31 additions & 0 deletions be/src/exprs/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <re2/re2.h>
#include <simdjson/simdjson.h> // IWYU pragma: keep
#include <stdlib.h>
Expand Down Expand Up @@ -316,4 +318,33 @@ Status JsonFunctions::extract_from_object(simdjson::ondemand::object& obj,
return Status::OK();
}

std::string JsonFunctions::print_json_value(const rapidjson::Value& value) {
rapidjson::StringBuffer buffer;
buffer.Clear();
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
value.Accept(writer);
return std::string(buffer.GetString());
}

void JsonFunctions::merge_objects(rapidjson::Value& dst_object, rapidjson::Value& src_object,
rapidjson::Document::AllocatorType& allocator) {
if (!src_object.IsObject()) {
return;
}
for (auto src_it = src_object.MemberBegin(); src_it != src_object.MemberEnd(); ++src_it) {
auto dst_it = dst_object.FindMember(src_it->name);
if (dst_it != dst_object.MemberEnd()) {
if (src_it->value.IsObject()) {
merge_objects(dst_it->value, src_it->value, allocator);
} else {
if (dst_it->value.IsNull()) {
dst_it->value = src_it->value;
}
}
} else {
dst_object.AddMember(src_it->name, src_it->value, allocator);
}
}
}

} // namespace doris
7 changes: 7 additions & 0 deletions be/src/exprs/json_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ class JsonFunctions {
static Status extract_from_object(simdjson::ondemand::object& obj,
const std::vector<JsonPath>& jsonpath,
simdjson::ondemand::value* value) noexcept;
// src: {"a" : "b" {"c" : 1}, "e" : 123}
// dst: {"a" : "b" {"d" : 1}}
// merged: {"a" : "b" : {"c" : 1, "d" : 1}, "e" : 123}
static void merge_objects(rapidjson::Value& dst_object, rapidjson::Value& src_object,
rapidjson::Document::AllocatorType& allocator);

static std::string print_json_value(const rapidjson::Value& value);

private:
static rapidjson::Value* match_value(const std::vector<JsonPath>& parsed_paths,
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "olap/tablet_schema_cache.h"
#include "util/doris_metrics.h"
#include "vec/common/schema_util.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -65,4 +66,14 @@ void BaseTablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema
}
}

void BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_schema) {
std::lock_guard wrlock(_meta_lock);
auto final_schema = std::make_shared<TabletSchema>();
CHECK(_max_version_schema->schema_version() >= update_schema->schema_version());
vectorized::schema_util::get_least_common_schema({_max_version_schema, update_schema},
final_schema);
_max_version_schema = final_schema;
VLOG_DEBUG << "dump updated tablet schema: " << final_schema->dump_structure();
}

} /* namespace doris */
2 changes: 2 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class BaseTablet {

void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);

void update_by_least_common_schema(const TabletSchemaSPtr& update_schema);

TabletSchemaSPtr tablet_schema() const {
std::shared_lock rlock(_meta_lock);
return _max_version_schema;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ void Compaction::build_basic_info() {
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
_cur_tablet_schema =
_tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();
_cur_tablet_schema = _tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
}

bool Compaction::handle_ordered_data_compaction() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ Status TabletReader::init_reader_params_and_create_block(
std::transform(input_rowsets.begin(), input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
TabletSchemaSPtr read_tablet_schema =
tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();
tablet->tablet_schema_with_merged_max_schema_version(rowset_metas);
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*read_tablet_schema);

Expand Down
140 changes: 137 additions & 3 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include "util/time.h"
#include "vec/columns/column.h"
#include "vec/columns/column_object.h"
#include "vec/common/schema_util.h" // LocalSchemaChangeRecorder
#include "vec/common/schema_util.h" // variant column
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"

Expand Down Expand Up @@ -123,8 +123,6 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context)
}
_rowset_meta->set_tablet_uid(_context.tablet_uid);
_rowset_meta->set_tablet_schema(_context.tablet_schema);
_context.schema_change_recorder =
std::make_shared<vectorized::schema_util::LocalSchemaChangeRecorder>();
_context.segment_collector = std::make_shared<SegmentCollectorT<BetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BetaRowsetWriter>>(this);
RETURN_IF_ERROR(_segment_creator.init(_context));
Expand Down Expand Up @@ -446,6 +444,10 @@ Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen
}

TabletSchemaSPtr flush_schema;
if (_context.tablet_schema->num_variant_columns() > 0) {
// Unfold variant column
RETURN_IF_ERROR(expand_variant_to_subcolumns(*block, flush_schema));
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(
Expand Down Expand Up @@ -522,6 +524,11 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}

// update rowset meta tablet schema if tablet schema updated
if (_context.tablet_schema->num_variant_columns() > 0) {
_rowset_meta->set_tablet_schema(_context.tablet_schema);
}

RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, _rowset_meta,
&rowset),
Expand All @@ -544,6 +551,24 @@ bool BetaRowsetWriter::_is_segment_overlapping(
return false;
}

// update tablet schema when meet variant columns, before commit_txn
// Eg. rowset schema: A(int), B(float), C(int), D(int)
// _tabelt->tablet_schema: A(bigint), B(double)
// => update_schema: A(bigint), B(double), C(int), D(int)
void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
std::lock_guard<std::mutex> lock(*(_context.schema_lock));
TabletSchemaSPtr update_schema = std::make_shared<TabletSchema>();
vectorized::schema_util::get_least_common_schema({_context.tablet_schema, flush_schema},
update_schema);
CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
<< "Rowset merge schema columns count is " << update_schema->num_columns()
<< ", but flush_schema is larger " << flush_schema->num_columns()
<< " update_schema: " << update_schema->dump_structure()
<< " flush_schema: " << flush_schema->dump_structure();
_context.tablet_schema.swap(update_schema);
VLOG_DEBUG << "dump rs schema: " << _context.tablet_schema->dump_structure();
}

void BetaRowsetWriter::_build_rowset_meta_with_spec_field(
RowsetMetaSharedPtr rowset_meta, const RowsetMetaSharedPtr& spec_rowset_meta) {
rowset_meta->set_num_rows(spec_rowset_meta->num_rows());
Expand Down Expand Up @@ -751,4 +776,113 @@ Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(
return Status::OK();
}

Status BetaRowsetWriter::expand_variant_to_subcolumns(vectorized::Block& block,
TabletSchemaSPtr& flush_schema) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
}

std::vector<int> variant_column_pos;
if (is_partial_update()) {
// check columns that used to do partial updates should not include variant
for (int i : get_partial_update_info()->update_cids) {
if (_context.tablet_schema->columns()[i].is_variant_type()) {
return Status::InvalidArgument("Not implement partial updates for variant");
}
}
} else {
for (int i = 0; i < _context.tablet_schema->columns().size(); ++i) {
if (_context.tablet_schema->columns()[i].is_variant_type()) {
variant_column_pos.push_back(i);
}
}
}

if (variant_column_pos.empty()) {
return Status::OK();
}

try {
// Parse each variant column from raw string column
vectorized::schema_util::parse_variant_columns(block, variant_column_pos);
vectorized::schema_util::finalize_variant_columns(block, variant_column_pos,
false /*not ingore sparse*/);
vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_column_pos);
} catch (const doris::Exception& e) {
// TODO more graceful, max_filter_ratio
LOG(WARNING) << "encounter execption " << e.to_string();
return Status::InternalError(e.to_string());
}

// Dynamic Block consists of two parts, dynamic part of columns and static part of columns
// static extracted
// | --------- | ----------- |
// The static ones are original _tablet_schame columns
flush_schema = std::make_shared<TabletSchema>();
flush_schema->copy_from(*_context.tablet_schema);
vectorized::Block flush_block(std::move(block));

// If column already exist in original tablet schema, then we pick common type
// and cast column to common type, and modify tablet column to common type,
// otherwise it's a new column, we should add to frontend
auto append_column = [&](const TabletColumn& parent_variant, auto& column_entry_from_object) {
const std::string& column_name =
parent_variant.name_lower_case() + "." + column_entry_from_object->path.get_path();
const vectorized::DataTypePtr& final_data_type_from_object =
column_entry_from_object->data.get_least_common_type();
TabletColumn tablet_column;
vectorized::PathInDataBuilder full_path_builder;
auto full_path = full_path_builder.append(parent_variant.name_lower_case(), false)
.append(column_entry_from_object->path.get_parts(), false)
.build();
vectorized::schema_util::get_column_by_type(
final_data_type_from_object, column_name, tablet_column,
vectorized::schema_util::ExtraInfo {.unique_id = -1,
.parent_unique_id = parent_variant.unique_id(),
.path_info = full_path});
flush_schema->append_column(std::move(tablet_column));
flush_block.insert({column_entry_from_object->data.get_finalized_column_ptr()->get_ptr(),
final_data_type_from_object, column_name});
};

// 1. Flatten variant column into flat columns, append flatten columns to the back of original Block and TabletSchema
// those columns are extracted columns, leave none extracted columns remain in original variant column, which is
// JSONB format at present.
// 2. Collect columns that need to be added or modified when data type changes or new columns encountered
for (size_t i = 0; i < variant_column_pos.size(); ++i) {
size_t variant_pos = variant_column_pos[i];
vectorized::ColumnObject& object_column = assert_cast<vectorized::ColumnObject&>(
flush_block.get_by_position(variant_pos).column->assume_mutable_ref());
const TabletColumn& parent_column = _context.tablet_schema->columns()[variant_pos];
CHECK(object_column.is_finalized());
std::shared_ptr<vectorized::ColumnObject::Subcolumns::Node> root;
for (auto& entry : object_column.get_subcolumns()) {
if (entry->path.empty()) {
// root
root = entry;
continue;
}
append_column(parent_column, entry);
}
// Create new variant column and set root column
auto obj = vectorized::ColumnObject::create(true, false);
// '{}' indicates a root path
static_cast<vectorized::ColumnObject*>(obj.get())->add_sub_column(
{}, root->data.get_finalized_column_ptr()->assume_mutable(),
root->data.get_least_common_type());
flush_block.get_by_position(variant_pos).column = obj->get_ptr();
vectorized::PathInDataBuilder full_root_path_builder;
auto full_root_path =
full_root_path_builder.append(parent_column.name_lower_case(), false).build();
flush_schema->mutable_columns()[variant_pos].set_path_info(full_root_path);
VLOG_DEBUG << "set root_path : " << full_root_path.get_path();
}
update_rowset_schema(flush_schema);
block.swap(flush_block);
VLOG_DEBUG << "dump block: " << block.dump_data();
VLOG_DEBUG << "dump flush schema: " << flush_schema->dump_structure();
return Status::OK();
}

} // namespace doris
10 changes: 10 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class BetaRowsetWriter : public RowsetWriter {
return _context.partial_update_info && _context.partial_update_info->is_partial_update;
}

const RowsetWriterContext& context() const override { return _context; }

private:
Status _create_file_writer(std::string path, io::FileWriterPtr& file_writer);
Status _check_segment_number_limit();
Expand All @@ -163,6 +165,14 @@ class BetaRowsetWriter : public RowsetWriter {
Status _rename_compacted_segment_plain(uint64_t seg_id);
Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id);

// Unfold variant column to Block
// Eg. [A | B | C | (D, E, F)]
// After unfold block structure changed to -> [A | B | C | D | E | F]
// The expanded D, E, F is dynamic part of the block
// The flushed Block columns should match exactly from the same type of frontend meta
Status expand_variant_to_subcolumns(vectorized::Block& block, TabletSchemaSPtr& flush_schema);
void update_rowset_schema(TabletSchemaSPtr flush_schema);

// build a tmp rowset for load segment to calc delete_bitmap
// for this segment
RowsetSharedPtr _build_tmp();
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class BetaRowsetWriterV2 : public RowsetWriter {

PUniqueId load_id() override { return _context.load_id; }

const RowsetWriterContext& context() const override { return _context; }

Version version() override { return _context.version; }

int64_t num_rows() const override { return _segment_creator.num_rows_written(); }
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class RowsetWriter {
virtual std::shared_ptr<PartialUpdateInfo> get_partial_update_info() = 0;

virtual bool is_partial_update() = 0;
virtual const RowsetWriterContext& context() const = 0;

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ struct RowsetWriterContext {
version(Version(0, 0)),
txn_id(0),
tablet_uid(0, 0),
segments_overlap(OVERLAP_UNKNOWN) {
segments_overlap(OVERLAP_UNKNOWN),
schema_lock(new std::mutex) {
load_id.set_hi(0);
load_id.set_lo(0);
}
Expand Down Expand Up @@ -90,9 +91,6 @@ struct RowsetWriterContext {
std::set<int32_t> skip_inverted_index;
DataWriteType write_type = DataWriteType::TYPE_DEFAULT;
BaseTabletSPtr tablet = nullptr;
// for tracing local schema change record
std::shared_ptr<vectorized::schema_util::LocalSchemaChangeRecorder> schema_change_recorder =
nullptr;

std::shared_ptr<MowContext> mow_context;
std::shared_ptr<FileWriterCreator> file_writer_creator;
Expand All @@ -110,6 +108,9 @@ struct RowsetWriterContext {
std::shared_ptr<PartialUpdateInfo> partial_update_info;

bool is_transient_rowset_writer = false;
// In semi-structure senario tablet_schema will be updated concurrently,
// this lock need to be held when update.Use shared_ptr to avoid delete copy contructor
std::shared_ptr<std::mutex> schema_lock;
};

} // namespace doris
Loading

0 comments on commit fe2648f

Please sign in to comment.