diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index e97639b7d95045..07c6b5371e7083 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -122,12 +122,10 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR std::shared_lock base_migration_rlock(tablet->get_migration_lock(), std::try_to_lock); DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", { - return Status::Error( - "PushHandler::_do_streaming_ingestion get lock failed"); + return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion get lock failed"); }) if (!base_migration_rlock.owns_lock()) { - return Status::Error( - "PushHandler::_do_streaming_ingestion get lock failed"); + return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion get lock failed"); } PUniqueId load_id; load_id.set_hi(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 964bb493d4b83a..d7fecf826aa498 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -33,6 +33,7 @@ import org.apache.doris.cloud.master.CloudReportHandler; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Status; import org.apache.doris.load.DeleteJob; import org.apache.doris.load.loadv2.IngestionLoadJob; import org.apache.doris.system.Backend; @@ -319,18 +320,31 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro long backendId = pushTask.getBackendId(); long signature = task.getSignature(); long transactionId = ((PushTask) task).getTransactionId(); + long tableId = pushTask.getTableId(); + long partitionId = pushTask.getPartitionId(); + long pushIndexId = pushTask.getIndexId(); + long pushTabletId = pushTask.getTabletId(); if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) { if (pushTask.getPushType() == TPushType.DELETE) { // we don't need to retry if the returned status code is DELETE_INVALID_CONDITION // or DELETE_INVALID_PARAMETERS // note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe - if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) { - pushTask.countDownToZero(request.getTaskStatus().getStatusCode(), - task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString()); - AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); - LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString()); + TStatus taskStatus = request.getTaskStatus(); + String msg = task.getBackendId() + ": " + taskStatus.getErrorMsgs().toString(); + LOG.warn("finish push replica, signature={}, error: {}", + signature, taskStatus.getErrorMsgs().toString()); + if (taskStatus.getStatusCode() == TStatusCode.OBTAIN_LOCK_FAILED) { + // retry if obtain lock failed + return; + } + if (taskStatus.getStatusCode() == TStatusCode.INVALID_ARGUMENT) { + pushTask.countDownToZero(taskStatus.getStatusCode(), msg); + } else { + pushTask.countDownLatchWithStatus(backendId, pushTabletId, + new Status(taskStatus.getStatusCode(), msg)); } + AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); } return; } @@ -344,10 +358,6 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro return; } - long tableId = pushTask.getTableId(); - long partitionId = pushTask.getPartitionId(); - long pushIndexId = pushTask.getIndexId(); - long pushTabletId = pushTask.getTabletId(); // push finish type: // numOfFinishTabletInfos tabletId schemaHash // Normal: 1 / / @@ -445,7 +455,7 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) thro AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature); LOG.warn("finish push replica error", e); if (pushTask.getPushType() == TPushType.DELETE) { - pushTask.countDownLatch(backendId, pushTabletId); + pushTask.countDownLatchWithStatus(backendId, pushTabletId, Status.CANCELLED); } } finally { olapTable.writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java index 0dadef4dee1d2e..af98e5bd2b3484 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java @@ -220,6 +220,18 @@ public void countDownLatch(long backendId, long tabletId) { } } + public void countDownLatchWithStatus(long backendId, long tabletId, Status st) { + if (this.latch == null) { + return; + } + if (latch.markedCountDownWithStatus(backendId, tabletId, st)) { + if (LOG.isDebugEnabled()) { + LOG.debug("pushTask current latch count with status: {}. backend: {}, tablet:{}, st::{}", + latch.getCount(), backendId, tabletId, st); + } + } + } + // call this always means one of tasks is failed. count down to zero to finish entire task public void countDownToZero(TStatusCode code, String errMsg) { if (this.latch != null) { diff --git a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy index b15309891da067..769fc88b053987 100644 --- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy +++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy @@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure", + [error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too many versions"]) + + test { + sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """ + exception "too many versions" + } + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail") def t1 = Thread.start {