Skip to content

Commit

Permalink
fix regression test
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jun 21, 2022
1 parent b88632a commit 5413fab
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 245 deletions.
30 changes: 13 additions & 17 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
for (auto slot : slots) {
pindex->add_columns(slot->col_name());
}
for (auto column : columns) {
column->to_schema_pb(pindex->add_columns_desc());
}
}

Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
Expand Down Expand Up @@ -59,13 +62,13 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
}
index->slots.emplace_back(it->second);
}
for (auto& pcolumn_desc : p_index.columns_desc()) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}
for (auto& pcolumn : pschema.columns()) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn);
_columns.emplace_back(tc);
}

std::sort(_indexes.begin(), _indexes.end(),
[](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
Expand Down Expand Up @@ -99,13 +102,13 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
}
index->slots.emplace_back(it->second);
}
for (auto& tcolumn_desc : t_index.columns_desc) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}
for (auto& tcolumn : tschema.columns) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn);
_columns.emplace_back(tc);
}

std::sort(_indexes.begin(), _indexes.end(),
[](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
Expand All @@ -125,18 +128,11 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
for (auto index : _indexes) {
index->to_protobuf(pschema->add_indexes());
}
for (auto column : _columns) {
column->to_schema_pb(pschema->add_columns());
}
}

std::string OlapTableSchemaParam::debug_string() const {
std::stringstream ss;
ss << "tuple_desc=" << _tuple_desc->debug_string();
ss << "columns: ";
for (auto column : _columns) {
ss << column->unique_id() << ":" << column->name() << " ";
}
return ss.str();
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct OlapTableIndexSchema {
int64_t index_id;
std::vector<SlotDescriptor*> slots;
int32_t schema_hash;
std::vector<TabletColumn*> columns;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand All @@ -59,7 +60,6 @@ class OlapTableSchemaParam {
int64_t version() const { return _version; }

TupleDescriptor* tuple_desc() const { return _tuple_desc; }
const std::vector<TabletColumn*>& columns() const { return _columns; }
const std::vector<OlapTableIndexSchema*>& indexes() const { return _indexes; }

void to_protobuf(POlapTableSchemaParam* pschema) const;
Expand All @@ -83,7 +83,6 @@ class OlapTableSchemaParam {
TupleDescriptor* _tuple_desc = nullptr;
mutable POlapTableSchemaParam* _proto_schema = nullptr;
std::vector<OlapTableIndexSchema*> _indexes;
std::vector<TabletColumn*> _columns;
mutable ObjectPool _obj_pool;
};

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
const TabletSchema& ori_tablet_schema) {
*_tablet_schema = ori_tablet_schema;
//new tablet schame if new table
if (ptable_schema_param.columns_size() != 0 &&
ptable_schema_param.columns(0).unique_id() >= 0) {
if (ptable_schema_param.indexes(0).columns_desc_size() != 0 &&
ptable_schema_param.indexes(0).columns_desc(0).unique_id() >= 0) {
_tablet_schema->build_current_tablet_schema(index_id, ptable_schema_param,
ori_tablet_schema);
}
Expand Down
38 changes: 23 additions & 15 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,12 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
DeleteHandler delete_handler;
std::vector<ColumnId> return_columns;
auto base_tablet_schema = base_tablet->tablet_schema();
if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
base_tablet_schema.clear_columns();
for (const auto& column : request.columns) {
base_tablet_schema.append_column(TabletColumn(column));
}
}

// begin to find deltas to convert from base tablet to new tablet so that
// obtain base tablet and new tablet's push lock and header write lock to prevent loading data
Expand Down Expand Up @@ -1469,16 +1475,17 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
reader_params.tablet = base_tablet;
reader_params.reader_type = READER_ALTER_TABLE;
reader_params.rs_readers = rs_readers;
const auto& schema = base_tablet->tablet_schema();
reader_params.tablet_schema = &base_tablet_schema;
const auto& schema = base_tablet_schema;
reader_params.return_columns.resize(schema.num_columns());
std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
reader_params.origin_return_columns = &reader_params.return_columns;
reader_params.version = {0, end_version};
// BlockReader::init will call base_tablet->get_header_lock(), but this lock we already get at outer layer, so we just call TabletReader::init
RETURN_NOT_OK(reader.TabletReader::init(reader_params));

res = delete_handler.init(base_tablet->tablet_schema(),
base_tablet->delete_predicates(), end_version, &reader);
res = delete_handler.init(base_tablet_schema, base_tablet->delete_predicates(),
end_version, &reader);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< base_tablet->full_name() << ", end_version=" << end_version;
Expand Down Expand Up @@ -1509,6 +1516,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
sc_params.new_tablet = new_tablet;
sc_params.ref_rowset_readers = rs_readers;
sc_params.delete_handler = &delete_handler;
sc_params.base_tablet_schema = &base_tablet_schema;
if (request.__isset.materialized_view_params) {
for (auto item : request.materialized_view_params) {
AlterMaterializedViewParam mv_param;
Expand Down Expand Up @@ -1601,7 +1609,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,

const std::unordered_map<std::string, AlterMaterializedViewParam> materialized_function_map;
if (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly,
materialized_function_map, desc_tbl);
materialized_function_map, desc_tbl, nullptr);
!res) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return res;
Expand Down Expand Up @@ -1723,9 +1731,9 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
bool sc_directly = false;

// a.Parse the Alter request and convert it into an internal representation
Status res =
_parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer, &sc_sorting,
&sc_directly, sc_params.materialized_params_map, *sc_params.desc_tbl);
Status res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer,
&sc_sorting, &sc_directly, sc_params.materialized_params_map,
*sc_params.desc_tbl, sc_params.base_tablet_schema);

auto process_alter_exit = [&]() -> Status {
{
Expand Down Expand Up @@ -1829,7 +1837,7 @@ Status SchemaChangeHandler::_parse_request(
bool* sc_sorting, bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>&
materialized_function_map,
DescriptorTbl desc_tbl) {
DescriptorTbl desc_tbl, const TabletSchema* base_tablet_schema) {
// set column mapping
for (int i = 0, new_schema_size = new_tablet->tablet_schema().num_columns();
i < new_schema_size; ++i) {
Expand All @@ -1838,7 +1846,7 @@ Status SchemaChangeHandler::_parse_request(
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);

if (new_column.has_reference_column()) {
int32_t column_index = base_tablet->field_index(new_column.referenced_column());
int32_t column_index = base_tablet_schema->field_index(new_column.referenced_column());

if (column_index < 0) {
LOG(WARNING) << "referenced column was missing. "
Expand All @@ -1857,7 +1865,7 @@ Status SchemaChangeHandler::_parse_request(
auto mvParam = materialized_function_map.find(column_name)->second;
column_mapping->materialized_function = mvParam.mv_expr;
column_mapping->expr = mvParam.expr;
int32_t column_index = base_tablet->field_index(mvParam.origin_column_name);
int32_t column_index = base_tablet_schema->field_index(mvParam.origin_column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
Expand All @@ -1869,7 +1877,7 @@ Status SchemaChangeHandler::_parse_request(
}
}

int32_t column_index = base_tablet->field_index(column_name);
int32_t column_index = base_tablet_schema->field_index(column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
Expand All @@ -1878,7 +1886,7 @@ Status SchemaChangeHandler::_parse_request(
// Newly added column go here
column_mapping->ref_column = -1;

if (i < base_tablet->num_short_key_columns()) {
if (i < base_tablet_schema->num_short_key_columns()) {
*sc_directly = true;
}
RETURN_IF_ERROR(
Expand Down Expand Up @@ -1907,7 +1915,7 @@ Status SchemaChangeHandler::_parse_request(
}
}

const TabletSchema& ref_tablet_schema = base_tablet->tablet_schema();
const TabletSchema& ref_tablet_schema = *base_tablet_schema;
const TabletSchema& new_tablet_schema = new_tablet->tablet_schema();
if (ref_tablet_schema.keys_type() != new_tablet_schema.keys_type()) {
// only when base table is dup and mv is agg
Expand All @@ -1924,15 +1932,15 @@ Status SchemaChangeHandler::_parse_request(
// old keys: A B C D
// new keys: A B
if (new_tablet_schema.keys_type() != KeysType::DUP_KEYS &&
new_tablet->num_key_columns() < base_tablet->num_key_columns()) {
new_tablet->num_key_columns() < base_tablet_schema->num_key_columns()) {
// this is a table with aggregate key type, and num of key columns in new schema
// is less, which means the data in new tablet should be more aggregated.
// so we use sorting schema change to sort and merge the data.
*sc_sorting = true;
return Status::OK();
}

if (base_tablet->num_short_key_columns() != new_tablet->num_short_key_columns()) {
if (base_tablet_schema->num_short_key_columns() != new_tablet->num_short_key_columns()) {
// the number of short_keys changed, can't do linked schema change
*sc_directly = true;
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ class SchemaChangeHandler {
AlterTabletType alter_tablet_type;
TabletSharedPtr base_tablet;
TabletSharedPtr new_tablet;
TabletSchema* base_tablet_schema = nullptr;
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
DeleteHandler* delete_handler = nullptr;
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
Expand All @@ -271,7 +272,7 @@ class SchemaChangeHandler {
RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>&
materialized_function_map,
DescriptorTbl desc_tbl);
DescriptorTbl desc_tbl, const TabletSchema* base_tablet_schema);

// Initialization Settings for creating a default value
static Status _init_column_mapping(ColumnMapping* column_mapping,
Expand Down
45 changes: 16 additions & 29 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
void TabletColumn::to_schema_pb(ColumnPB* column) const {
column->set_unique_id(_unique_id);
column->set_name(_col_name);
column->set_col_unique_id(_col_unique_id);
column->set_type(get_string_by_field_type(_type));
column->set_is_key(_is_key);
column->set_is_nullable(_is_nullable);
Expand Down Expand Up @@ -520,42 +521,28 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id,
_cols.clear();
_field_name_to_index.clear();

std::unordered_set<int32_t> col_unqiue_id_set;
for (const POlapTableIndexSchema& index : ptable_schema_param.indexes()) {
if (index.id() == index_id) {
std::unordered_set<std::string> col_name_set;
std::for_each(index.columns().begin(), index.columns().end(),
[&](std::string name) { col_name_set.insert(std::move(name)); });
for (size_t i = 0; i < ptable_schema_param.columns_size(); i++) {
if (col_name_set.find(ptable_schema_param.columns(i).name()) !=
col_name_set.end()) {
col_unqiue_id_set.insert(ptable_schema_param.columns(i).unique_id());
for (auto& pcolumn : index.columns_desc()) {
TabletColumn column;
column.init_from_pb(pcolumn);
if (column.is_key()) {
_num_key_columns++;
}
if (column.is_nullable()) {
_num_null_columns++;
}
if (column.is_bf_column()) {
has_bf_columns = true;
}
_field_name_to_index[column.name()] = _num_columns;
_field_id_to_index[column.col_unique_id()] = _num_columns;
_cols.emplace_back(std::move(column));
_num_columns++;
}
break;
}
}

for (auto& pcolumn : ptable_schema_param.columns()) {
if (col_unqiue_id_set.find(pcolumn.unique_id()) == col_unqiue_id_set.end()) {
continue;
}
TabletColumn column;
column.init_from_pb(pcolumn);
if (column.is_key()) {
_num_key_columns++;
}
if (column.is_nullable()) {
_num_null_columns++;
}
if (column.is_bf_column()) {
has_bf_columns = true;
}
_field_name_to_index[column.name()] = _num_columns;
_field_id_to_index[column.col_unique_id()] = _num_columns;
_cols.emplace_back(std::move(column));
_num_columns++;
}
if (has_bf_columns) {
_has_bf_fpp = true;
_bf_fpp = ori_tablet_schema.bloom_filter_fpp();
Expand Down
Loading

0 comments on commit 5413fab

Please sign in to comment.