diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java index 1fe7258f6a625..8635391b28b07 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java @@ -66,8 +66,4 @@ public void stop() { each.stop(); } } - - @Override - public void close() { - } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java index 613e47b69e4fe..8f177233abe1d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java @@ -71,8 +71,4 @@ public void stop() { public InventoryTaskProgress getTaskProgress() { return new InventoryTaskProgress(position.get()); } - - @Override - public void close() { - } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java index 8438044489d50..1a1d09259db8e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java @@ -19,14 +19,13 @@ import org.apache.shardingsphere.data.pipeline.core.task.progress.TaskProgress; -import java.io.Closeable; import java.util.Collection; import java.util.concurrent.CompletableFuture; /** * Pipeline task interface. */ -public interface PipelineTask extends Closeable { +public interface PipelineTask { /** * Start task. @@ -53,9 +52,4 @@ public interface PipelineTask extends Closeable { * @return task progress */ TaskProgress getTaskProgress(); - - /** - * Close. - */ - void close(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java index 280c7345f10be..0de96b8b09851 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/TaskExecuteCallback.java @@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.IOUtils; import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback; /** @@ -39,6 +38,5 @@ public void onSuccess() { public void onFailure(final Throwable throwable) { log.error("onFailure, task ID={}", task.getTaskId(), throwable); task.stop(); - IOUtils.closeQuietly(task); } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java index 062946f7e6052..48ba7523af37b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java @@ -35,7 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; -import org.apache.shardingsphere.infra.util.close.QuietlyCloser; import java.util.Collection; import java.util.LinkedList; @@ -120,14 +119,8 @@ private void updateJobItemStatus(final JobStatus jobStatus) { @Override public void stop() { jobItemContext.setStopping(true); - for (PipelineTask each : inventoryTasks) { - each.stop(); - QuietlyCloser.close(each); - } - for (PipelineTask each : incrementalTasks) { - each.stop(); - QuietlyCloser.close(each); - } + inventoryTasks.forEach(PipelineTask::stop); + incrementalTasks.forEach(PipelineTask::stop); } private final class InventoryTaskExecuteCallback implements ExecuteCallback { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java index f3700cbb2de4a..0d18ccf4b601d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCIncrementalTask.java @@ -70,8 +70,4 @@ public void stop() { importer.stop(); } } - - @Override - public void close() { - } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java index 2226ae090445e..6e3eb9066ea2b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java @@ -82,8 +82,4 @@ public void stop() { public InventoryTaskProgress getTaskProgress() { return new InventoryTaskProgress(position.get()); } - - @Override - public void close() { - } } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java index 0e8f7d52a931c..015d3e6666af8 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java @@ -17,9 +17,8 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.task; -import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext; -import org.apache.shardingsphere.infra.util.close.QuietlyCloser; +import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; @@ -56,11 +55,9 @@ public void stop() { jobItemContext.setStopping(true); for (PipelineTask each : inventoryTasks) { each.stop(); - QuietlyCloser.close(each); } for (PipelineTask each : incrementalTasks) { each.stop(); - QuietlyCloser.close(each); } } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java index b7bfbf6942f3d..3dcf4159bcd48 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java @@ -78,7 +78,6 @@ void assertGetProgress() throws SQLException, ExecutionException, InterruptedExc PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), position); CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS); assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyIngestPosition.class)); - inventoryTask.close(); } private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {