Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ struct ColumnMapping {
int32_t ref_column;
// normally for default value. stores values for filters
WrapperField* default_value;
// materialize view transform function used in schema change
std::string materialized_function;
};

typedef std::vector<ColumnMapping> SchemaMapping;
Expand Down
237 changes: 225 additions & 12 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ class RowBlockMerger {
std::priority_queue<MergeElement> _heap;
};

RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
const TabletSharedPtr& base_tablet) {
RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema) {
_schema_mapping.resize(tablet_schema.num_columns());
}

RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
const TabletSharedPtr& base_tablet,
const DeleteHandler& delete_handler) {
_schema_mapping.resize(tablet_schema.num_columns());
_delete_handler = delete_handler;
Expand Down Expand Up @@ -254,6 +252,139 @@ ConvertTypeResolver::ConvertTypeResolver() {

ConvertTypeResolver::~ConvertTypeResolver() {}

bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
BitmapValue bitmap;
if (!read_helper->is_null(ref_field_idx)) {
uint64_t origin_value;
char *src = read_helper->cell_ptr(ref_field_idx);
switch (ref_column.type()) {
case OLAP_FIELD_TYPE_TINYINT:
if (*(int8_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int8_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int8_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
origin_value = *(uint8_t *) src;
break;
case OLAP_FIELD_TYPE_SMALLINT:
if (*(int16_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int16_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int16_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
origin_value = *(uint16_t *) src;
break;
case OLAP_FIELD_TYPE_INT:
if (*(int32_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int32_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int32_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_INT:
origin_value = *(uint32_t *) src;
break;
case OLAP_FIELD_TYPE_BIGINT:
if (*(int64_t *) src < 0) {
LOG(WARNING) << "The input: " << *(int64_t *) src
<< " is not valid, to_bitmap only support bigint value from 0 to 18446744073709551615 currently";
return false;
}
origin_value = *(int64_t *) src;
break;
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
origin_value = *(uint64_t *) src;
break;
default:
LOG(WARNING) << "the column type which was altered from was unsupported."
<< " from_type="
<< ref_column.type();
return false;
}
bitmap.add(origin_value);
}
char *buf = reinterpret_cast<char *>(mem_pool->allocate(bitmap.getSizeInBytes()));
Slice dst(buf, bitmap.getSizeInBytes());
bitmap.write(dst.data);
write_helper->set_field_content(field_idx, reinterpret_cast<char *>(&dst), mem_pool);
return true;
}

bool hll_hash(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
HyperLogLog hll;
if (!read_helper->is_null(ref_field_idx)) {
uint64_t hash_value;

switch (ref_column.type()) {
case OLAP_FIELD_TYPE_CHAR: {
int p = ref_column.length() - 1;
Slice* slice = reinterpret_cast<Slice*>(read_helper->cell_ptr(ref_field_idx));
char* buf = slice->data;
while (p >= 0 && buf[p] == '\0') {
p--;
}
slice->size = p + 1;
}

case OLAP_FIELD_TYPE_VARCHAR: {
Slice slice = *reinterpret_cast<Slice *>(read_helper->cell_ptr(ref_field_idx));
hash_value = HashUtil::murmur_hash64A(slice.data, slice.size, HashUtil::MURMUR_SEED);
break;
}
case OLAP_FIELD_TYPE_BOOL:
case OLAP_FIELD_TYPE_TINYINT:
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
case OLAP_FIELD_TYPE_SMALLINT:
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
case OLAP_FIELD_TYPE_INT:
case OLAP_FIELD_TYPE_UNSIGNED_INT:
case OLAP_FIELD_TYPE_BIGINT:
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
case OLAP_FIELD_TYPE_LARGEINT:
case OLAP_FIELD_TYPE_FLOAT:
case OLAP_FIELD_TYPE_DOUBLE:
case OLAP_FIELD_TYPE_DISCRETE_DOUBLE:
case OLAP_FIELD_TYPE_DATE:
case OLAP_FIELD_TYPE_DATETIME: {
std::string ref_column_string = read_helper->column_schema(ref_field_idx)->type_info()->to_string(
read_helper->cell_ptr(ref_field_idx));
hash_value = HashUtil::murmur_hash64A(ref_column_string.c_str(), ref_column_string.length(), HashUtil::MURMUR_SEED);
break;
}
default:
LOG(WARNING) << "fail to hll hash type : " << ref_column.type();
return false;
}

hll.update(hash_value);
}
std::string buf;
buf.resize(hll.max_serialized_size());
buf.resize(hll.serialize((uint8_t *) buf.c_str()));
Slice dst(buf);
write_helper->set_field_content(field_idx, reinterpret_cast<char *>(&dst), mem_pool);
return true;
}

bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
write_helper->set_not_null(field_idx);
int64_t count = read_helper->is_null(ref_field_idx) ? 0 : 1;
write_helper->set_field_content(field_idx, (char*)&count, mem_pool);
return true;
}

bool RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data_version,
RowBlock* mutable_block, uint64_t* filtered_rows) const {
if (mutable_block == nullptr) {
Expand Down Expand Up @@ -324,6 +455,36 @@ bool RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data_v
int32_t ref_column = _schema_mapping[i].ref_column;

if (_schema_mapping[i].ref_column >= 0) {
if (!_schema_mapping[i].materialized_function.empty()) {
bool (*_do_materialized_transform) (RowCursor*, RowCursor*, const TabletColumn&, int, int, MemPool* );
if (_schema_mapping[i].materialized_function == "to_bitmap") {
_do_materialized_transform = to_bitmap;
} else if (_schema_mapping[i].materialized_function == "hll_hash") {
_do_materialized_transform = hll_hash;
} else if (_schema_mapping[i].materialized_function == "count_field") {
_do_materialized_transform = count_field;
} else {
LOG(WARNING) << "error materialized view function : " << _schema_mapping[i].materialized_function;
return false;
}
VLOG(3) << "_schema_mapping[" << i << "].materialized_function : " << _schema_mapping[i].materialized_function;
for (size_t row_index = 0, new_row_index = 0;
row_index < ref_block->row_block_info().row_num; ++row_index) {
// No need row, need to be filter
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
}
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);

if (!_do_materialized_transform(&read_helper, &write_helper,
ref_block->tablet_schema().column(ref_column), i, _schema_mapping[i].ref_column, mem_pool)) {
return false;
}
}
continue;
}

// new column will be assigned as referenced column
// check if the type of new column is equal to the older's.
FieldType reftype = ref_block->tablet_schema().column(ref_column).type();
Expand Down Expand Up @@ -1412,6 +1573,36 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
sc_params.new_tablet = new_tablet;
sc_params.ref_rowset_readers = rs_readers;
sc_params.delete_handler = delete_handler;
if (request.__isset.materialized_view_params) {
for (auto item : request.materialized_view_params) {
AlterMaterializedViewParam mv_param;
mv_param.column_name = item.column_name;
/*
* origin_column_name is always be set now,
* but origin_column_name may be not set in some materialized view function. eg:count(1)
*/
if (item.__isset.origin_column_name) {
mv_param.origin_column_name = item.origin_column_name;
}

/*
* TODO(lhy)
* Building the materialized view function for schema_change here based on defineExpr.
* This is a trick because the current storage layer does not support expression evaluation.
* We can refactor this part of the code until the uniform expression evaluates the logic.
* count distinct materialized view will set mv_expr with to_bitmap or hll_hash.
* count materialized view will set mv_expr with count.
*/
if (item.__isset.mv_expr) {
if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) {
mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name;
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
mv_param.mv_expr = "count_field";
}
}
sc_params.materialized_params_map.insert(std::make_pair(item.column_name, mv_param));
}
}

res = _convert_historical_rowsets(sc_params);
if (res != OLAP_SUCCESS) {
Expand Down Expand Up @@ -1453,12 +1644,17 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl

// a. 解析Alter请求,转换成内部的表示形式
// 不使用DELETE_DATA命令指定的删除条件
RowBlockChanger rb_changer(new_tablet->tablet_schema(), base_tablet);
RowBlockChanger rb_changer(new_tablet->tablet_schema());
bool sc_sorting = false;
bool sc_directly = false;

if (OLAP_SUCCESS !=
(res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly))) {
const std::unordered_map<std::string, AlterMaterializedViewParam> materialized_function_map;
if (OLAP_SUCCESS != (res = _parse_request(base_tablet,
new_tablet,
&rb_changer,
&sc_sorting,
&sc_directly,
materialized_function_map))) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return res;
}
Expand Down Expand Up @@ -1659,16 +1855,15 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa

// change中增加了filter信息,在_parse_request中会设置filter的column信息
// 并在每次row block的change时,过滤一些数据
RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.base_tablet,
sc_params.delete_handler);
RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler);

bool sc_sorting = false;
bool sc_directly = false;
SchemaChange* sc_procedure = nullptr;

// a. 解析Alter请求,转换成内部的表示形式
OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer,
&sc_sorting, &sc_directly);
OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet,
&rb_changer, &sc_sorting, &sc_directly, sc_params.materialized_params_map);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to parse the request. res=" << res;
goto PROCESS_ALTER_EXIT;
Expand Down Expand Up @@ -1803,8 +1998,10 @@ PROCESS_ALTER_EXIT : {
// 分析column的mapping以及filter key的mapping
OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer, bool* sc_sorting,
bool* sc_directly) {
RowBlockChanger* rb_changer,
bool* sc_sorting,
bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map) {
OLAPStatus res = OLAP_SUCCESS;

// set column mapping
Expand All @@ -1830,6 +2027,22 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
continue;
}

if (materialized_function_map.find(column_name) != materialized_function_map.end()) {
AlterMaterializedViewParam mvParam = materialized_function_map.find(column_name)->second;
column_mapping->materialized_function = mvParam.mv_expr;
std::string origin_column_name = mvParam.origin_column_name;
int32_t column_index = base_tablet->field_index(origin_column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
} else {
LOG(WARNING) << "referenced column was missing. "
<< "[column=" << column_name
<< " referenced_column=" << column_index << "]";
return OLAP_ERR_CE_CMD_PARAMS_ERROR;
}
}

int32_t column_index = base_tablet->field_index(column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
Expand Down
28 changes: 22 additions & 6 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ class RowBlock;
// defined in 'row_cursor.h'
class RowCursor;

bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool);
bool hll_hash(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool);
bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool);

class RowBlockChanger {
public:
RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr& base_tablet,
const DeleteHandler& delete_handler);
RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler& delete_handler);

RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr& base_tablet);
RowBlockChanger(const TabletSchema& tablet_schema);

virtual ~RowBlockChanger();

Expand Down Expand Up @@ -194,12 +200,19 @@ class SchemaChangeHandler {
OLAPStatus _get_versions_to_be_changed(TabletSharedPtr base_tablet,
std::vector<Version>* versions_to_be_changed);

struct AlterMaterializedViewParam {
std::string column_name;
std::string origin_column_name;
std::string mv_expr;
};

struct SchemaChangeParams {
AlterTabletType alter_tablet_type;
TabletSharedPtr base_tablet;
TabletSharedPtr new_tablet;
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
DeleteHandler delete_handler;
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
};

// add alter task to base_tablet and new_tablet.
Expand All @@ -216,9 +229,12 @@ class SchemaChangeHandler {

static OLAPStatus _convert_historical_rowsets(const SchemaChangeParams& sc_params);

static OLAPStatus _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer, bool* sc_sorting,
bool* sc_directly);
static OLAPStatus _parse_request(TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer,
bool* sc_sorting,
bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map);

// 需要新建default_value时的初始化设置
static OLAPStatus _init_column_mapping(ColumnMapping* column_mapping,
Expand Down
Loading