Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ const RowCursor* CollectIterator::Level0Iterator::current_row() const {
return _current_row;
}

int32_t CollectIterator::Level0Iterator::version() const {
int64_t CollectIterator::Level0Iterator::version() const {
return _rs_reader->version().second;
}

Expand All @@ -172,8 +172,7 @@ OLAPStatus CollectIterator::Level0Iterator::_refresh_current_row() {
size_t pos = _row_block->pos();
_row_block->get_row(pos, &_row_cursor);
if (_row_block->block_status() == DEL_PARTIAL_SATISFIED &&
_reader->_delete_handler.is_filter_data(_rs_reader->version().second,
_row_cursor)) {
_reader->_delete_handler.is_filter_data(version(), _row_cursor)) {
_reader->_stats.rows_del_filtered++;
_row_block->pos_inc();
continue;
Expand Down Expand Up @@ -250,7 +249,7 @@ const RowCursor* CollectIterator::Level1Iterator::current_row() const {
return nullptr;
}

int32_t CollectIterator::Level1Iterator::version() const {
int64_t CollectIterator::Level1Iterator::version() const {
if (_cur_child != nullptr) {
return _cur_child->version();
}
Expand Down Expand Up @@ -283,7 +282,7 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor**
bool* delete_flag) {
_heap->pop();
auto res = _cur_child->next(row, delete_flag);
if (res == OLAP_SUCCESS) {
if (LIKELY(res == OLAP_SUCCESS)) {
_heap->push(_cur_child);
_cur_child = _heap->top();
} else if (res == OLAP_ERR_DATA_EOF) {
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/collect_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class CollectIterator {

virtual const RowCursor* current_row() const = 0;

virtual int32_t version() const = 0;
virtual int64_t version() const = 0;

virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0;
virtual ~LevelIterator() = 0;
Expand Down Expand Up @@ -96,14 +96,13 @@ class CollectIterator {

const RowCursor* current_row() const;

int32_t version() const;
int64_t version() const;

OLAPStatus next(const RowCursor** row, bool* delete_flag);

~Level0Iterator();

private:
// refresh_current_row
OLAPStatus _refresh_current_row();

RowsetReaderSharedPtr _rs_reader;
Expand All @@ -125,7 +124,7 @@ class CollectIterator {

const RowCursor* current_row() const;

int32_t version() const;
int64_t version() const;

OLAPStatus next(const RowCursor** row, bool* delete_flag);

Expand Down
148 changes: 65 additions & 83 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ std::string DeleteConditionHandler::construct_sub_predicates(const TCondition& c
} else if (op == ">") {
op += ">";
}
string condition_str = "";
string condition_str;
if ("IS" == op) {
condition_str = condition.column_name + " " + op + " " + condition.condition_values[0];
} else {
Expand All @@ -110,80 +110,78 @@ std::string DeleteConditionHandler::construct_sub_predicates(const TCondition& c
}

bool DeleteConditionHandler::is_condition_value_valid(const TabletColumn& column,
const TCondition& cond,
const std::string& condition_op,
const string& value_str) {
bool valid_condition = false;
if ("IS" == condition_op && ("NULL" == value_str || "NOT NULL" == value_str)) {
return true;
}

FieldType field_type = column.type();
if ("IS" == cond.condition_op && ("NULL" == value_str || "NOT NULL" == value_str)) {
valid_condition = true;
} else if (field_type == OLAP_FIELD_TYPE_TINYINT) {
valid_condition = valid_signed_number<int8_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_SMALLINT) {
valid_condition = valid_signed_number<int16_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_INT) {
valid_condition = valid_signed_number<int32_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_BIGINT) {
valid_condition = valid_signed_number<int64_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_LARGEINT) {
valid_condition = valid_signed_number<int128_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_TINYINT) {
valid_condition = valid_unsigned_number<uint8_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_SMALLINT) {
valid_condition = valid_unsigned_number<uint16_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_INT) {
valid_condition = valid_unsigned_number<uint32_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_UNSIGNED_BIGINT) {
valid_condition = valid_unsigned_number<uint64_t>(value_str);
} else if (field_type == OLAP_FIELD_TYPE_DECIMAL) {
valid_condition = valid_decimal(value_str, column.precision(), column.frac());
} else if (field_type == OLAP_FIELD_TYPE_CHAR || field_type == OLAP_FIELD_TYPE_VARCHAR) {
if (value_str.size() <= column.length()) {
valid_condition = true;
}
} else if (field_type == OLAP_FIELD_TYPE_DATE || field_type == OLAP_FIELD_TYPE_DATETIME) {
valid_condition = valid_datetime(value_str);
} else if (field_type == OLAP_FIELD_TYPE_BOOL) {
valid_condition = valid_bool(value_str);
} else {
OLAP_LOG_WARNING("unknown field type. [type=%d]", field_type);
switch(field_type) {
case OLAP_FIELD_TYPE_TINYINT:
return valid_signed_number<int8_t>(value_str);
case OLAP_FIELD_TYPE_SMALLINT:
return valid_signed_number<int16_t>(value_str);
case OLAP_FIELD_TYPE_INT:
return valid_signed_number<int32_t>(value_str);
case OLAP_FIELD_TYPE_BIGINT:
return valid_signed_number<int64_t>(value_str);
case OLAP_FIELD_TYPE_LARGEINT:
return valid_signed_number<int128_t>(value_str);
case OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
return valid_unsigned_number<uint8_t>(value_str);
case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
return valid_unsigned_number<uint16_t>(value_str);
case OLAP_FIELD_TYPE_UNSIGNED_INT:
return valid_unsigned_number<uint32_t>(value_str);
case OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
return valid_unsigned_number<uint64_t>(value_str);
case OLAP_FIELD_TYPE_DECIMAL:
return valid_decimal(value_str, column.precision(), column.frac());
case OLAP_FIELD_TYPE_CHAR:
case OLAP_FIELD_TYPE_VARCHAR:
return value_str.size() <= column.length();
case OLAP_FIELD_TYPE_DATE:
case OLAP_FIELD_TYPE_DATETIME:
return valid_datetime(value_str);
case OLAP_FIELD_TYPE_BOOL:
return valid_bool(value_str);
default:
OLAP_LOG_WARNING("unknown field type. [type=%d]", field_type);
}
return valid_condition;
return false;
}

OLAPStatus DeleteConditionHandler::check_condition_valid(const TabletSchema& schema,
const TCondition& cond) {
// 检查指定列名的列是否存在
// Check whether the column exists
int32_t field_index = schema.field_index(cond.column_name);
if (field_index < 0) {
OLAP_LOG_WARNING("field is not existent. [field_index=%d]", field_index);
return OLAP_ERR_DELETE_INVALID_CONDITION;
}

// 检查指定的列是不是key,是不是float或double类型
// Delete condition should only applied on key columns or duplicate key table, and
// the condition column type should not be float or double.
const TabletColumn& column = schema.column(field_index);

if ((!column.is_key() && schema.keys_type() != KeysType::DUP_KEYS) ||
column.type() == OLAP_FIELD_TYPE_DOUBLE || column.type() == OLAP_FIELD_TYPE_FLOAT) {
LOG(WARNING) << "field is not key column, or storage model is not duplicate, or data type "
"is float or double.";
return OLAP_ERR_DELETE_INVALID_CONDITION;
}

// 检查删除条件中指定的过滤值是否符合每个类型自身的要求
// 1. 对于整数类型(int8,int16,in32,int64,uint8,uint16,uint32,uint64),检查是否溢出
// 2. 对于decimal类型,检查是否超过建表时指定的精度和标度
// 3. 对于date和datetime类型,检查指定的过滤值是否符合日期格式以及是否指定错误的值
// 4. 对于string和varchar类型,检查指定的过滤值是否超过建表时指定的长度
// Check operator and operands size are matched.
if ("*=" != cond.condition_op && "!*=" != cond.condition_op &&
cond.condition_values.size() != 1) {
OLAP_LOG_WARNING("invalid condition value size. [size=%ld]", cond.condition_values.size());
return OLAP_ERR_DELETE_INVALID_CONDITION;
}

for (int i = 0; i < cond.condition_values.size(); i++) {
const string& value_str = cond.condition_values[i];
if (!is_condition_value_valid(column, cond, value_str)) {
LOG(WARNING) << "invalid condition value. [value=" << value_str << "]";
// Check each operand is valid
for (const auto& condition_value : cond.condition_values) {
if (!is_condition_value_valid(column, cond.condition_op, condition_value)) {
LOG(WARNING) << "invalid condition value. [value=" << condition_value << "]";
return OLAP_ERR_DELETE_INVALID_CONDITION;
}
}
Expand Down Expand Up @@ -227,32 +225,29 @@ bool DeleteHandler::_parse_condition(const std::string& condition_str, TConditio
}

OLAPStatus DeleteHandler::init(const TabletSchema& schema,
const DelPredicateArray& delete_conditions, int32_t version) {
const DelPredicateArray& delete_conditions, int64_t version) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;

DelPredicateArray::const_iterator it = delete_conditions.begin();
for (; it != delete_conditions.end(); ++it) {
for (const auto& delete_condition : delete_conditions) {
// 跳过版本号大于version的过滤条件
if (it->version() > version) {
if (delete_condition.version() > version) {
continue;
}

DeleteConditions temp;
temp.filter_version = it->version();
temp.filter_version = delete_condition.version();
temp.del_cond = new (std::nothrow) Conditions();

if (temp.del_cond == nullptr) {
LOG(FATAL) << "fail to malloc Conditions. size=" << sizeof(Conditions);
return OLAP_ERR_MALLOC_ERROR;
}

temp.del_cond->set_tablet_schema(&schema);
for (int i = 0; i != it->sub_predicates_size(); ++i) {
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
TCondition condition;
if (!_parse_condition(it->sub_predicates(i), &condition)) {
OLAP_LOG_WARNING("fail to parse condition. [condition=%s]",
it->sub_predicates(i).c_str());
if (!_parse_condition(sub_predicate, &condition)) {
OLAP_LOG_WARNING("fail to parse condition. [condition=%s]", sub_predicate.c_str());
return OLAP_ERR_DELETE_INVALID_PARAMETERS;
}

Expand All @@ -263,9 +258,8 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema,
}
}

for (int i = 0; i != it->in_predicates_size(); ++i) {
for (const auto& in_predicate : delete_condition.in_predicates()) {
TCondition condition;
const InPredicatePB& in_predicate = it->in_predicates(i);
condition.__set_column_name(in_predicate.column_name());
if (in_predicate.is_not_in()) {
condition.__set_condition_op("!*=");
Expand All @@ -290,32 +284,23 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema,
return OLAP_SUCCESS;
}

bool DeleteHandler::is_filter_data(const int32_t data_version, const RowCursor& row) const {
if (_del_conds.empty()) {
return false;
}

bool DeleteHandler::is_filter_data(const int64_t data_version, const RowCursor& row) const {
// 根据语义,存储在_del_conds的删除条件应该是OR关系
// 因此,只要数据符合其中一条过滤条件,则返回true
std::vector<DeleteConditions>::const_iterator it = _del_conds.begin();

for (; it != _del_conds.end(); ++it) {
if (data_version <= it->filter_version && it->del_cond->delete_conditions_eval(row)) {
for (const auto& del_cond : _del_conds) {
if (data_version <= del_cond.filter_version && del_cond.del_cond->delete_conditions_eval(row)) {
return true;
}
}

return false;
}

std::vector<int32_t> DeleteHandler::get_conds_version() {
std::vector<int32_t> conds_version;
std::vector<DeleteConditions>::const_iterator cond_iter = _del_conds.begin();

for (; cond_iter != _del_conds.end(); ++cond_iter) {
conds_version.push_back(cond_iter->filter_version);
std::vector<int64_t> DeleteHandler::get_conds_version() {
std::vector<int64_t> conds_version;
for (const auto& cond : _del_conds) {
conds_version.push_back(cond.filter_version);
}

return conds_version;
}

Expand All @@ -324,19 +309,16 @@ void DeleteHandler::finalize() {
return;
}

std::vector<DeleteConditions>::iterator it = _del_conds.begin();

for (; it != _del_conds.end(); ++it) {
it->del_cond->finalize();
delete it->del_cond;
for (auto& cond : _del_conds) {
cond.del_cond->finalize();
delete cond.del_cond;
}

_del_conds.clear();
_is_inited = false;
}

void DeleteHandler::get_delete_conditions_after_version(
int32_t version, std::vector<const Conditions*>* delete_conditions) const {
int64_t version, std::vector<const Conditions*>* delete_conditions) const {
for (auto& del_cond : _del_conds) {
if (del_cond.filter_version > version) {
delete_conditions->emplace_back(del_cond.del_cond);
Expand Down
Loading