Skip to content

Commit

Permalink
IGNITE-23499 Integrate cancellation of compute job with sql kill hand…
Browse files Browse the repository at this point in the history
…ler (#4898)
  • Loading branch information
xtern authored Dec 17, 2024
1 parent cd021ff commit 977f6c4
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 47 deletions.
1 change: 1 addition & 0 deletions modules/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-placement-driver-api')
implementation project(':ignite-binary-tuple')
implementation project(':ignite-client-common')
implementation project(':ignite-sql-engine-api')
implementation project(':ignite-system-view-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableViewInternal;
Expand Down Expand Up @@ -452,6 +454,29 @@ public CompletableFuture<byte[]> runReceiverAsync(byte[] payload, ClusterNode no
});
}

/** Returns a {@link OperationKillHandler kill handler} for the compute job. */
public OperationKillHandler killHandler() {
return new OperationKillHandler() {
@Override
public CompletableFuture<Boolean> cancelAsync(String operationId) {
UUID jobId = UUID.fromString(operationId);

return IgniteComputeImpl.this.cancelAsync(jobId)
.thenApply(res -> res != null ? res : Boolean.FALSE);
}

@Override
public boolean local() {
return false;
}

@Override
public CancellableOperationType type() {
return CancellableOperationType.COMPUTE;
}
};
}

@TestOnly
ComputeComponent computeComponent() {
return computeComponent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,8 @@ public class IgniteImpl implements Ignite {
metaStorageMgr.addElectionListener(catalogCompactionRunner::updateCoordinator);
this.catalogCompactionRunner = catalogCompactionRunner;

KillCommandHandler killCommandHandler = new KillCommandHandler(name, logicalTopologyService, clusterSvc.messagingService());

lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
params -> catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters) params).newLowWatermark()));

Expand Down Expand Up @@ -1085,7 +1087,7 @@ public class IgniteImpl implements Ignite {
txManager,
lowWatermark,
threadPoolsManager.commonScheduler(),
new KillCommandHandler(name, logicalTopologyService, clusterSvc.messagingService())
killCommandHandler
);

systemViewManager.register(qryEngine);
Expand Down Expand Up @@ -1125,6 +1127,8 @@ public class IgniteImpl implements Ignite {
clock
);

killCommandHandler.register(((IgniteComputeImpl) compute).killHandler());

authenticationManager = createAuthenticationManager();

ClientConnectorConfiguration clientConnectorConfiguration = nodeConfigRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface OperationKillHandler {
* be performed by the SQL engine.
*
* @return {@code True} if the handler can abort operations only on local node,
* {@code false} if the handler can abort operations across the entire cluster (in such a case .
* {@code false} if the handler can abort operations across the entire cluster.
*/
boolean local();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ public CompletableFuture<Void> reduceAsync(TaskExecutionContext taskContext, Map
}
}

private static class InfiniteJob implements ComputeJob<Void, Void> {
/** Infinite job. */
public static class InfiniteJob implements ComputeJob<Void, Void> {
@Override
public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, @Nullable Void arg) {
while (true) {
Expand Down
Loading

0 comments on commit 977f6c4

Please sign in to comment.