Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ Status VariantColumnReader::_new_iterator_with_flat_leaves(ColumnIteratorUPtr* i
const auto* node =
target_col.has_path_info() ? _subcolumn_readers->find_leaf(relative_path) : nullptr;
if (!node) {
if (relative_path.get_path() == SPARSE_COLUMN_PATH) {
if (relative_path.get_path() == SPARSE_COLUMN_PATH && _sparse_column_reader != nullptr) {
// read sparse column and filter extracted columns in subcolumn_path_map
std::unique_ptr<ColumnIterator> inner_iter;
RETURN_IF_ERROR(_sparse_column_reader->new_iterator(&inner_iter, nullptr));
Expand Down
66 changes: 60 additions & 6 deletions be/src/vec/common/schema_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,14 @@ Status VariantCompactionUtil::check_path_stats(const std::vector<RowsetSharedPtr
if (output->tablet_schema()->num_variant_columns() == 0) {
return Status::OK();
}
// check no extended schema in input rowsets
for (const auto& rowset : intputs) {
for (const auto& column : rowset->tablet_schema()->columns()) {
if (column->is_extracted_column()) {
return Status::OK();
}
}
}
// check no extended schema in output rowset
for (const auto& column : output->tablet_schema()->columns()) {
if (column->is_extracted_column()) {
Expand Down Expand Up @@ -829,6 +837,12 @@ Status VariantCompactionUtil::check_path_stats(const std::vector<RowsetSharedPtr
// in this case, input stats is accurate, so we check the stats size and stats value
else {
for (const auto& [path, size] : stats) {
if (original_uid_to_path_stats.at(uid).find(path) ==
original_uid_to_path_stats.at(uid).end()) {
return Status::InternalError(
"Path stats not found for uid {}, path {}, tablet_id {}", uid, path,
tablet->tablet_id());
}
if (original_uid_to_path_stats.at(uid).at(path) != size) {
return Status::InternalError(
"Path stats not match for uid {} with path `{}`, input size {}, output "
Expand Down Expand Up @@ -900,7 +914,7 @@ Status VariantCompactionUtil::get_compaction_nested_columns(
return Status::OK();
}

void VariantCompactionUtil::get_compaction_subcolumns(
void VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
TabletSchema::PathsSetInfo& paths_set_info, const TabletColumnPtr parent_column,
const TabletSchemaSPtr& target, const PathToDataTypes& path_to_data_types,
const std::unordered_set<std::string>& sparse_paths, TabletSchemaSPtr& output_schema) {
Expand Down Expand Up @@ -966,6 +980,34 @@ void VariantCompactionUtil::get_compaction_subcolumns(
}
}

void VariantCompactionUtil::get_compaction_subcolumns_from_data_types(
TabletSchema::PathsSetInfo& paths_set_info, const TabletColumnPtr parent_column,
const TabletSchemaSPtr& target, const PathToDataTypes& path_to_data_types,
TabletSchemaSPtr& output_schema) {
const auto& parent_indexes = target->inverted_indexs(parent_column->unique_id());
for (const auto& [path, data_types] : path_to_data_types) {
if (data_types.empty() || path.empty() || path.has_nested_part()) {
continue;
}
DataTypePtr data_type;
get_least_supertype_jsonb(data_types, &data_type);
auto column_name = parent_column->name_lower_case() + "." + path.get_path();
auto column_path = PathInData(column_name);
TabletColumn sub_column = get_column_by_type(
data_type, column_name,
vectorized::schema_util::ExtraInfo {.unique_id = -1,
.parent_unique_id = parent_column->unique_id(),
.path_info = column_path});
vectorized::schema_util::inherit_column_attributes(*parent_column, sub_column);
TabletIndexes sub_column_indexes;
vectorized::schema_util::inherit_index(parent_indexes, sub_column_indexes, sub_column);
paths_set_info.subcolumn_indexes.emplace(path.get_path(), std::move(sub_column_indexes));
output_schema->append_column(sub_column);
VLOG_DEBUG << "append sub column " << path.get_path() << " data type "
<< data_type->get_name();
}
}

// Build the temporary schema for compaction
// 1. aggregate path stats and data types from all rowsets
// 2. append typed columns and nested columns to the output schema
Expand All @@ -986,7 +1028,9 @@ Status VariantCompactionUtil::get_extended_compaction_schema(
output_schema->shawdow_copy_without_columns(*target);
std::unordered_map<int32_t, TabletSchema::PathsSetInfo> uid_to_paths_set_info;
for (const TabletColumnPtr& column : target->columns()) {
output_schema->append_column(*column);
if (!column->is_extracted_column()) {
output_schema->append_column(*column);
}
if (!column->is_variant_type()) {
continue;
}
Expand All @@ -1008,10 +1052,20 @@ Status VariantCompactionUtil::get_extended_compaction_schema(
uid_to_paths_set_info[column->unique_id()]);

// 4. append subcolumns
get_compaction_subcolumns(
uid_to_paths_set_info[column->unique_id()], column, target,
uid_to_variant_extended_info[column->unique_id()].path_to_data_types,
uid_to_variant_extended_info[column->unique_id()].sparse_paths, output_schema);
if (column->variant_max_subcolumns_count() > 0 || !column->get_sub_columns().empty()) {
get_compaction_subcolumns_from_subpaths(
uid_to_paths_set_info[column->unique_id()], column, target,
uid_to_variant_extended_info[column->unique_id()].path_to_data_types,
uid_to_variant_extended_info[column->unique_id()].sparse_paths, output_schema);
}
// variant_max_subcolumns_count == 0 and no typed paths materialized
// it means that all subcolumns are materialized, may be from old data
else {
get_compaction_subcolumns_from_data_types(
uid_to_paths_set_info[column->unique_id()], column, target,
uid_to_variant_extended_info[column->unique_id()].path_to_data_types,
output_schema);
}

// append sparse column
TabletColumn sparse_column = create_sparse_column(*column);
Expand Down
15 changes: 9 additions & 6 deletions be/src/vec/common/schema_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,15 @@ class VariantCompactionUtil {
segment_v2::VariantStatisticsPB* stats, size_t row_pos,
size_t num_rows);

static void get_compaction_subcolumns(TabletSchema::PathsSetInfo& paths_set_info,
const TabletColumnPtr parent_column,
const TabletSchemaSPtr& target,
const PathToDataTypes& path_to_data_types,
const std::unordered_set<std::string>& sparse_paths,
TabletSchemaSPtr& output_schema);
static void get_compaction_subcolumns_from_subpaths(
TabletSchema::PathsSetInfo& paths_set_info, const TabletColumnPtr parent_column,
const TabletSchemaSPtr& target, const PathToDataTypes& path_to_data_types,
const std::unordered_set<std::string>& sparse_paths, TabletSchemaSPtr& output_schema);

static void get_compaction_subcolumns_from_data_types(
TabletSchema::PathsSetInfo& paths_set_info, const TabletColumnPtr parent_column,
const TabletSchemaSPtr& target, const PathToDataTypes& path_to_data_types,
TabletSchemaSPtr& output_schema);

static Status get_compaction_typed_columns(const TabletSchemaSPtr& target,
const std::unordered_set<std::string>& typed_paths,
Expand Down
70 changes: 60 additions & 10 deletions be/test/vec/common/schema_util_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ using namespace doris;
class SchemaUtilTest : public testing::Test {
public:
SchemaUtilTest() = default;
virtual ~SchemaUtilTest() = default;
~SchemaUtilTest() override = default;
};

void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id,
Expand Down Expand Up @@ -1452,7 +1452,7 @@ TEST_F(SchemaUtilTest, get_compaction_nested_columns) {
EXPECT_FALSE(st2.ok());
}

TEST_F(SchemaUtilTest, get_compaction_subcolumns) {
TEST_F(SchemaUtilTest, get_compaction_subcolumns_from_subpaths) {
TabletColumn variant;
variant.set_unique_id(30);
variant.set_variant_max_subcolumns_count(3);
Expand All @@ -1470,7 +1470,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns) {
std::unordered_set<std::string> sparse_paths;
TabletSchemaSPtr output_schema = std::make_shared<TabletSchema>();

schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 2);
for (const auto& column : output_schema->columns()) {
Expand All @@ -1483,7 +1483,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns) {
std::make_shared<vectorized::DataTypeInt32>()};
path_to_data_types[vectorized::PathInData("b")] = {
std::make_shared<vectorized::DataTypeString>()};
schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 2);
bool found_int = false, found_str = false;
Expand All @@ -1500,7 +1500,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns) {

output_schema = std::make_shared<TabletSchema>();
sparse_paths.insert("a");
schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 2);
for (const auto& column : output_schema->columns()) {
Expand All @@ -1517,7 +1517,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns) {
for (int i = 0; i < config::variant_max_sparse_column_statistics_size + 1; ++i) {
sparse_paths.insert("dummy" + std::to_string(i));
}
schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 2);
for (const auto& column : output_schema->columns()) {
Expand Down Expand Up @@ -1554,7 +1554,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns_advanced) {
std::unordered_set<std::string> sparse_paths;
TabletSchemaSPtr output_schema = std::make_shared<TabletSchema>();

schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 4);
for (const auto& column : output_schema->columns()) {
Expand All @@ -1571,7 +1571,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns_advanced) {
std::make_shared<vectorized::DataTypeInt32>()};
path_to_data_types[vectorized::PathInData("b")] = {
std::make_shared<vectorized::DataTypeString>()};
schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 4);
bool found_int = false, found_str = false;
Expand All @@ -1590,7 +1590,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns_advanced) {

output_schema = std::make_shared<TabletSchema>();
sparse_paths.insert("a");
schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 4);
for (const auto& column : output_schema->columns()) {
Expand All @@ -1609,7 +1609,7 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns_advanced) {
for (int i = 0; i < config::variant_max_sparse_column_statistics_size + 1; ++i) {
sparse_paths.insert("dummy" + std::to_string(i));
}
schema_util::VariantCompactionUtil::get_compaction_subcolumns(
schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_subpaths(
paths_set_info, parent_column, schema, path_to_data_types, sparse_paths, output_schema);
EXPECT_EQ(output_schema->num_columns(), 4);
for (const auto& column : output_schema->columns()) {
Expand All @@ -1621,6 +1621,56 @@ TEST_F(SchemaUtilTest, get_compaction_subcolumns_advanced) {
}
}

TEST_F(SchemaUtilTest, get_compaction_subcolumns_from_data_types) {
TabletSchemaPB schema_pb;
schema_pb.set_keys_type(KeysType::DUP_KEYS);

construct_column(schema_pb.add_column(), schema_pb.add_index(), 20000, "v_index_alpha", 1,
"VARIANT", "v1", IndexType::INVERTED);

TabletSchemaSPtr target = std::make_shared<TabletSchema>();
target->init_from_pb(schema_pb);

TabletColumnPtr parent_column = target->columns().front();
// Build path -> data types
doris::vectorized::schema_util::PathToDataTypes path_to_data_types;
path_to_data_types[vectorized::PathInData("a")] = {
std::make_shared<vectorized::DataTypeInt32>(),
std::make_shared<vectorized::DataTypeInt64>()}; // -> BIGINT
path_to_data_types[vectorized::PathInData("b")] = {
std::make_shared<vectorized::DataTypeString>()}; // -> STRING

TabletSchemaSPtr output_schema = std::make_shared<TabletSchema>();
TabletSchema::PathsSetInfo paths_set_info;

schema_util::VariantCompactionUtil::get_compaction_subcolumns_from_data_types(
paths_set_info, parent_column, target, path_to_data_types, output_schema);

EXPECT_EQ(output_schema->num_columns(), 2);
bool found_a = false, found_b = false;
for (const auto& col : output_schema->columns()) {
if (col->name() == "v1.a") {
found_a = true;
EXPECT_EQ(col->type(), FieldType::OLAP_FIELD_TYPE_BIGINT);
EXPECT_EQ(col->parent_unique_id(), 1);
EXPECT_EQ(col->path_info_ptr()->get_path(), "v1.a");
} else if (col->name() == "v1.b") {
found_b = true;
EXPECT_EQ(col->type(), FieldType::OLAP_FIELD_TYPE_STRING);
EXPECT_EQ(col->parent_unique_id(), 1);
EXPECT_EQ(col->path_info_ptr()->get_path(), "v1.b");
}
}
EXPECT_TRUE(found_a && found_b);

ASSERT_TRUE(paths_set_info.subcolumn_indexes.find("a") !=
paths_set_info.subcolumn_indexes.end());
ASSERT_TRUE(paths_set_info.subcolumn_indexes.find("b") !=
paths_set_info.subcolumn_indexes.end());
EXPECT_EQ(paths_set_info.subcolumn_indexes["a"].size(), 1);
EXPECT_EQ(paths_set_info.subcolumn_indexes["b"].size(), 1);
}

// Test has_different_structure_in_same_path function indirectly through check_variant_has_no_ambiguous_paths
TEST_F(SchemaUtilTest, has_different_structure_in_same_path_indirect) {
// Test case 1: Same structure and same length - should not detect ambiguity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,10 +1059,6 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
throw new DdlException("Not supporting alter table add generated columns.");
}

if (newColumn.getType().isVariantType() && olapTable.hasVariantColumns()) {
checkAddVariantColumnAllowed(olapTable, newColumn);
}

/*
* add new column to indexes.
* UNIQUE:
Expand Down Expand Up @@ -3458,20 +3454,4 @@ private void checkOrder(List<Column> targetIndexSchema, List<String> orderedColN
nameSet.add(colName);
}
}

private void checkAddVariantColumnAllowed(OlapTable olapTable, Column newColumn) throws DdlException {
int currentCount = newColumn.getVariantMaxSubcolumnsCount();
for (Column column : olapTable.getBaseSchema()) {
if (column.getType().isVariantType()) {
if (currentCount == 0 && column.getVariantMaxSubcolumnsCount() != 0) {
throw new DdlException("The variant_max_subcolumns_count must either be 0 in all columns"
+ " or greater than 0 in all columns");
}
if (currentCount > 0 && column.getVariantMaxSubcolumnsCount() == 0) {
throw new DdlException("The variant_max_subcolumns_count must either be 0 in all columns"
+ " or greater than 0 in all columns");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,7 @@ public void validate(ConnectContext ctx) {
Preconditions.checkState(!Strings.isNullOrEmpty(ctlName), "catalog name is null or empty");
Preconditions.checkState(!Strings.isNullOrEmpty(dbName), "database name is null or empty");

//check datatype: datev1, decimalv2, variant
boolean allZero = false;
boolean allPositive = false;
//check datatype: datev1, decimalv2
for (ColumnDefinition columnDef : columns) {
String columnNameUpperCase = columnDef.getName().toUpperCase();
if (columnNameUpperCase.startsWith("__DORIS_")) {
Expand All @@ -361,20 +359,6 @@ public void validate(ConnectContext ctx) {
"Disable to create table of `VARIANT` type column named with a `.` character: "
+ columnNameUpperCase);
}
VariantType variantType = (VariantType) columnDef.getType();
if (variantType.getVariantMaxSubcolumnsCount() == 0) {
allZero = true;
if (allPositive) {
throw new AnalysisException("The variant_max_subcolumns_count must either be 0"
+ " in all columns, or greater than 0 in all columns");
}
} else {
allPositive = true;
if (allZero) {
throw new AnalysisException("The variant_max_subcolumns_count must either be 0"
+ " in all columns, or greater than 0 in all columns");
}
}
}
if (columnDef.getType().isDateType() && Config.disable_datev1) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ var variant<PROPERTIES ("variant_max_subcolumns_count" = "10")> Yes false \N NON
id bigint Yes true \N
var variant<PROPERTIES ("variant_max_subcolumns_count" = "10")> Yes false \N NONE
var2 variant<PROPERTIES ("variant_max_subcolumns_count" = "15")> Yes false \N NONE
var3 variant<PROPERTIES ("variant_max_subcolumns_count" = "10")> Yes false \N NONE
var3 variant Yes false \N NONE

Loading
Loading