Skip to content

Commit

Permalink
[opt](delete) Delete job should retry for failure that is not `DELETE…
Browse files Browse the repository at this point in the history
…_INVALID_XXX` (#37834)

## Proposed changes

fix #37363, delete job should fail
and abort for DELETE_INVALID_CONDITION/DELETE_INVALID_PARAMETERS and
retry for other failures.
  • Loading branch information
bobhan1 authored Jul 17, 2024
1 parent 01eeff3 commit 15a6f84
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 29 deletions.
31 changes: 16 additions & 15 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
dp->param<std::string>("error_msg"));
})
if (conditions.empty()) {
return Status::Error<DELETE_INVALID_PARAMETERS>(
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid parameters for store_cond. condition_size={}", conditions.size());
}

Expand Down Expand Up @@ -127,7 +127,7 @@ Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
if (TCondition tmp; !DeleteHandler::parse_condition(condition_str, &tmp)) {
LOG(WARNING) << "failed to parse condition_str, condtion="
<< ThriftDebugString(condition);
return Status::Error<DELETE_INVALID_CONDITION>(
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"failed to parse condition_str, condtion={}", ThriftDebugString(condition));
}
VLOG_NOTICE << __PRETTY_FUNCTION__ << " condition_str: " << condition_str;
Expand Down Expand Up @@ -235,8 +235,8 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC
// Check whether the column exists
int32_t field_index = schema.field_index(cond.column_name);
if (field_index < 0) {
return Status::Error<DELETE_INVALID_CONDITION>("field is not existent. [field_index={}]",
field_index);
return Status::Error<ErrorCode::INVALID_ARGUMENT>("field is not existent. [field_index={}]",
field_index);
}

// Delete condition should only applied on key columns or duplicate key table, and
Expand All @@ -245,21 +245,21 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC

if (column.type() == FieldType::OLAP_FIELD_TYPE_DOUBLE ||
column.type() == FieldType::OLAP_FIELD_TYPE_FLOAT) {
return Status::Error<DELETE_INVALID_CONDITION>("data type is float or double.");
return Status::Error<ErrorCode::INVALID_ARGUMENT>("data type is float or double.");
}

// Check operator and operands size are matched.
if ("*=" != cond.condition_op && "!*=" != cond.condition_op &&
cond.condition_values.size() != 1) {
return Status::Error<DELETE_INVALID_CONDITION>("invalid condition value size. [size={}]",
cond.condition_values.size());
return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition value size. [size={}]",
cond.condition_values.size());
}

// Check each operand is valid
for (const auto& condition_value : cond.condition_values) {
if (!is_condition_value_valid(column, cond.condition_op, condition_value)) {
return Status::Error<DELETE_INVALID_CONDITION>("invalid condition value. [value={}]",
condition_value);
return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition value. [value={}]",
condition_value);
}
}

Expand All @@ -273,23 +273,24 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC
const auto& err_msg =
fmt::format("column id does not exists in table={}, schema version={},",
schema.table_id(), schema.schema_version());
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
return Status::Error<ErrorCode::INVALID_ARGUMENT>(err_msg);
}
if (!iequal(schema.column_by_uid(cond.column_unique_id).name(), cond.column_name)) {
const auto& err_msg = fmt::format(
"colum name={} does not belongs to column uid={}, which column name={}, "
"colum name={} does not belongs to column uid={}, which "
"column name={}, "
"delete_cond.column_name ={}",
cond.column_name, cond.column_unique_id,
schema.column_by_uid(cond.column_unique_id).name(), cond.column_name);
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
return Status::Error<ErrorCode::INVALID_ARGUMENT>(err_msg);
}

return Status::OK();
}

Status DeleteHandler::parse_condition(const DeleteSubPredicatePB& sub_cond, TCondition* condition) {
if (!sub_cond.has_column_name() || !sub_cond.has_op() || !sub_cond.has_cond_value()) {
return Status::Error<DELETE_INVALID_PARAMETERS>(
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"fail to parse condition. condition={} {} {}", sub_cond.column_name(),
sub_cond.op(), sub_cond.cond_value());
}
Expand Down Expand Up @@ -335,8 +336,8 @@ Status DeleteHandler::parse_condition(const std::string& condition_str, TConditi
<< "]";
}
if (!matched) {
return Status::Error<DELETE_INVALID_PARAMETERS>("fail to sub condition. condition={}",
condition_str);
return Status::Error<ErrorCode::INVALID_ARGUMENT>("fail to sub condition. condition={}",
condition_str);
}

condition->column_name = what[1].str();
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
}

std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock);
DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
return Status::Error<TRY_LOCK_FAILED>(
"PushHandler::_do_streaming_ingestion get lock failed");
})
if (!base_migration_rlock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>(
"PushHandler::_do_streaming_ingestion get lock failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,10 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro

if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
if (pushTask.getPushType() == TPushType.DELETE) {
// DeleteHandler may return status code DELETE_INVALID_CONDITION and DELETE_INVALID_PARAMETERS,
// we don't need to retry if meet them.
// note that they will be converted to TStatusCode.INTERNAL_ERROR when being sent from be to fe
if (request.getTaskStatus().getStatusCode() == TStatusCode.INTERNAL_ERROR) {
// we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
// or DELETE_INVALID_PARAMETERS
// note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
false -9999782574499444.2 -25
true 99.9 234

-- !sql --
true 99.9 234

Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,33 @@ suite("test_delete_from_timeout","nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllBEs()

try {
sql "insert into ${tableName} values(1, 99.9, 234);"
sql "insert into ${tableName} values(1, 99.9, 234), (false, -9999782574499444.2, -25);"
qt_sql "select * from ${tableName} order by col1, col2, col3;"

GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -1900 /* DELETE_INVALID_CONDITION */, error_msg: "data type is float or double."])
[error_code: 33 /* INVALID_ARGUMENT */, error_msg: "invalid parameters for store_cond. condition_size=1"])
test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "data type is float or double."
exception "invalid parameters for store_cond. condition_size=1"
}

GetDebugPoint().clearDebugPointsForAllBEs()

GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
[error_code: -1903 /* DELETE_INVALID_PARAMETERS */, error_msg: "invalid parameters for store_cond. condition_size=1"])
test {
sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
exception "invalid parameters for store_cond. condition_size=1"
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")

t1 = Thread.start {
sleep(15000)
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
}

sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
t1.join()
qt_sql "select * from ${tableName} order by col1, col2, col3;"

} catch (Exception e) {
logger.info(e.getMessage())
AssertTrue(false)
assertTrue(false)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure")
GetDebugPoint().clearDebugPointsForAllBEs()
}
}

0 comments on commit 15a6f84

Please sign in to comment.