diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 862efa5f04..30f1af5a72 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -374,7 +374,7 @@ private class TableOptimizingProcess implements OptimizingProcess { public TaskRuntime poll() { lock.lock(); try { - return status != Status.CLOSED || status != Status.FAILED ? taskQueue.poll() : null; + return status != Status.CLOSED && status != Status.FAILED ? taskQueue.poll() : null; } finally { lock.unlock(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java index aacdaa78d9..8ef5830639 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java @@ -90,7 +90,7 @@ public void complete(OptimizerThread thread, OptimizingTaskResult result) { void reset() { invokeConsistency( () -> { - statusMachine.accept(Status.PLANNED); + statusMachine.accept(Status.SCHEDULED); startTime = AmoroServiceConstants.INVALID_TIME; endTime = AmoroServiceConstants.INVALID_TIME; token = null; @@ -98,6 +98,7 @@ void reset() { failReason = null; taskDescriptor.reset(); future.reset(); + statusMachine.accept(Status.PLANNED); // The cost time should not be reset since it is the total cost time of all runs. persistTaskRuntime(); }); @@ -265,8 +266,8 @@ public TaskQuota getCurrentQuota() { .put(Status.SCHEDULED, ImmutableSet.of(Status.PLANNED, Status.ACKED, Status.CANCELED)) .put( Status.ACKED, - ImmutableSet.of(Status.PLANNED, Status.SUCCESS, Status.FAILED, Status.CANCELED)) - .put(Status.FAILED, ImmutableSet.of(Status.PLANNED)) + ImmutableSet.of(Status.SCHEDULED, Status.SUCCESS, Status.FAILED, Status.CANCELED)) + .put(Status.FAILED, ImmutableSet.of(Status.SCHEDULED)) .put(Status.CANCELED, ImmutableSet.of()) .put(Status.SUCCESS, ImmutableSet.of()) .build(); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java index 24ce7c0366..057083ee32 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java @@ -173,12 +173,12 @@ public void testRetryTask() { OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta); // 1.poll task - TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); + TaskRuntime task = queue.pollTask(MAX_POLLING_TIME); Assert.assertNotNull(task); for (int i = 0; i < TableProperties.SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER_DEFAULT; i++) { queue.retryTask(task); - TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME); + TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME); Assert.assertEquals(retryTask.getTaskId(), task.getTaskId()); retryTask.schedule(optimizerThread); retryTask.ack(optimizerThread); @@ -189,7 +189,7 @@ public void testRetryTask() { } queue.retryTask(task); - TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME); + TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME); Assert.assertEquals(retryTask.getTaskId(), task.getTaskId()); retryTask.schedule(optimizerThread); retryTask.ack(optimizerThread);