Skip to content

Commit

Permalink
Remove useless PipelineTask.close() (#33392)
Browse files Browse the repository at this point in the history
* Remove useless PipelineTask.close()
  • Loading branch information
terrymanu authored Oct 24, 2024
1 parent 07da0a1 commit 2ecb1fa
Show file tree
Hide file tree
Showing 9 changed files with 4 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,4 @@ public void stop() {
each.stop();
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,4 @@ public void stop() {
public InventoryTaskProgress getTaskProgress() {
return new InventoryTaskProgress(position.get());
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@

import org.apache.shardingsphere.data.pipeline.core.task.progress.TaskProgress;

import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
* Pipeline task interface.
*/
public interface PipelineTask extends Closeable {
public interface PipelineTask {

/**
* Start task.
Expand All @@ -53,9 +52,4 @@ public interface PipelineTask extends Closeable {
* @return task progress
*/
TaskProgress getTaskProgress();

/**
* Close.
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;

/**
Expand All @@ -39,6 +38,5 @@ public void onSuccess() {
public void onFailure(final Throwable throwable) {
log.error("onFailure, task ID={}", task.getTaskId(), throwable);
task.stop();
IOUtils.closeQuietly(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.Collection;
import java.util.LinkedList;
Expand Down Expand Up @@ -120,14 +119,8 @@ private void updateJobItemStatus(final JobStatus jobStatus) {
@Override
public void stop() {
jobItemContext.setStopping(true);
for (PipelineTask each : inventoryTasks) {
each.stop();
QuietlyCloser.close(each);
}
for (PipelineTask each : incrementalTasks) {
each.stop();
QuietlyCloser.close(each);
}
inventoryTasks.forEach(PipelineTask::stop);
incrementalTasks.forEach(PipelineTask::stop);
}

private final class InventoryTaskExecuteCallback implements ExecuteCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,4 @@ public void stop() {
importer.stop();
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,4 @@ public void stop() {
public InventoryTaskProgress getTaskProgress() {
return new InventoryTaskProgress(position.get());
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.shardingsphere.data.pipeline.cdc.core.task;

import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;

Expand Down Expand Up @@ -56,11 +55,9 @@ public void stop() {
jobItemContext.setStopping(true);
for (PipelineTask each : inventoryTasks) {
each.stop();
QuietlyCloser.close(each);
}
for (PipelineTask each : incrementalTasks) {
each.stop();
QuietlyCloser.close(each);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ void assertGetProgress() throws SQLException, ExecutionException, InterruptedExc
PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), position);
CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
assertThat(inventoryTask.getTaskProgress().getPosition(), instanceOf(IntegerPrimaryKeyIngestPosition.class));
inventoryTask.close();
}

private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {
Expand Down

0 comments on commit 2ecb1fa

Please sign in to comment.