Skip to content

Commit

Permalink
IGNITE-23499 Integrate cancellation of compute job with sql kill hand…
Browse files Browse the repository at this point in the history
…ler.
  • Loading branch information
xtern committed Dec 13, 2024
1 parent 4294cf6 commit d62c624
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 31 deletions.
1 change: 1 addition & 0 deletions modules/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-placement-driver-api')
implementation project(':ignite-binary-tuple')
implementation project(':ignite-client-common')
implementation project(':ignite-sql-engine-api')
implementation project(':ignite-system-view-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableViewInternal;
Expand Down Expand Up @@ -452,6 +454,29 @@ public CompletableFuture<byte[]> runReceiverAsync(byte[] payload, ClusterNode no
});
}

/** Returns a {@link OperationKillHandler kill handler} for the compute job. */
public OperationKillHandler killHandler() {
return new OperationKillHandler() {
@Override
public CompletableFuture<Boolean> cancelAsync(String operationId) {
UUID jobId = UUID.fromString(operationId);

return IgniteComputeImpl.this.cancelAsync(jobId)
.thenApply(res -> res != null ? res : Boolean.FALSE);
}

@Override
public boolean local() {
return false;
}

@Override
public CancellableOperationType type() {
return CancellableOperationType.COMPUTE;
}
};
}

@TestOnly
ComputeComponent computeComponent() {
return computeComponent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@
import org.apache.ignite.internal.sql.configuration.local.SqlNodeExtensionConfiguration;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.api.kill.KillHandlerRegistry;
import org.apache.ignite.internal.sql.engine.exec.kill.KillCommandHandler;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
Expand Down Expand Up @@ -946,6 +947,8 @@ public class IgniteImpl implements Ignite {
metaStorageMgr.addElectionListener(catalogCompactionRunner::updateCoordinator);
this.catalogCompactionRunner = catalogCompactionRunner;

KillHandlerRegistry killHandlerRegistry = new KillCommandHandler(name, logicalTopologyService, clusterSvc.messagingService());

lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
params -> catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters) params).newLowWatermark()));

Expand Down Expand Up @@ -1083,7 +1086,7 @@ public class IgniteImpl implements Ignite {
txManager,
lowWatermark,
threadPoolsManager.commonScheduler(),
new KillCommandHandler(name, logicalTopologyService, clusterSvc.messagingService())
(KillCommandHandler) killHandlerRegistry
);

systemViewManager.register(qryEngine);
Expand Down Expand Up @@ -1123,6 +1126,8 @@ public class IgniteImpl implements Ignite {
clock
);

killHandlerRegistry.register(((IgniteComputeImpl) compute).killHandler());

authenticationManager = createAuthenticationManager();

ClientConnectorConfiguration clientConnectorConfiguration = nodeConfigRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,58 @@

package org.apache.ignite.internal.sql.engine.kill;

import static org.apache.ignite.compute.JobStatus.CANCELED;
import static org.apache.ignite.compute.JobStatus.EXECUTING;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType.COMPUTE;
import static org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType.QUERY;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/**
* Integration tests for SQL '{@code KILL}' statement.
Expand All @@ -58,31 +80,40 @@ public void checkResourceLeak() {
assertThat(queryProcessor().openedCursors(), is(0));
}

@Test
public void killQueryMetadata() {
assertQuery("KILL QUERY '00000000-0000-0000-0000-000000000000'")
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killQueryMetadata(CancellableOperationType type) {
assertQuery(format("KILL {} '00000000-0000-0000-0000-000000000000'", type))
.columnMetadata(
new MetadataMatcher().name("APPLIED").type(ColumnType.BOOLEAN).nullable(false)
)
.check();
}

@Test
public void killWithInvalidQueryIdentifier() {
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killWithInvalidIdentifier(CancellableOperationType type) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove assumption.
assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not implemented yet");

SqlException err = assertThrowsSqlException(
SqlException.class,
Sql.RUNTIME_ERR,
"Invalid operation ID format [operationId=123, type=QUERY]",
() -> await(igniteSql().executeAsync(null, "KILL QUERY '123'"))
format("Invalid operation ID format [operationId=123, type={}]", type),
() -> await(igniteSql().executeAsync(null, format("KILL {} '123'", type)))
);

assertThat(err.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(err.getCause().getMessage(), equalTo("Invalid UUID string: 123"));
}

@Test
public void killNonExistentQuery() {
checkKillQuery(CLUSTER.aliveNode(), UUID.randomUUID(), false);
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killNonExistentOperation(CancellableOperationType type) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove assumption.
assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not implemented yet");

assertThat(executeKill(CLUSTER.aliveNode(), type, UUID.randomUUID(), false), is(false));
}

@Test
Expand All @@ -95,46 +126,140 @@ public void killQueryFromLocal() {
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();

checkKillQuery(node, targetQueryId, true);
assertThat(executeKillSqlQuery(node, targetQueryId), is(true));

assertThat(runningQueries(), is(empty()));
expectQueryCancelled(new DrainCursor(cursor));

checkKillQuery(node, targetQueryId, false);
checkKillQuery(node, targetQueryId, true, true);
assertThat(executeKillSqlQuery(node, targetQueryId), is(false));
assertThat(executeKill(node, QUERY, targetQueryId, true), is(true));
}

@Test
public void killComputeJobFromLocal() {
Ignite node = CLUSTER.aliveNode();
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
JobExecution<Void> execution = node.compute().submit(JobTarget.node(clusterNode(node)), job, null);

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat(executeKillJob(node, jobId), is(true));

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));

assertThat(executeKillJob(node, jobId), is(false));
assertThat(executeKill(node, COMPUTE, jobId, true), is(true));
}

@Test
public void killQueryFromRemote() {
Ignite local = CLUSTER.node(0);
Ignite remote = CLUSTER.node(2);

AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1");
{
AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1");

List<QueryInfo> queries = runningQueries();
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();
List<QueryInfo> queries = runningQueries();
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();

checkKillQuery(remote, targetQueryId, true);
assertThat(executeKillSqlQuery(remote, targetQueryId), is(true));

assertThat(runningQueries(), is(empty()));
expectQueryCancelled(new DrainCursor(cursor));
assertThat(runningQueries(), is(empty()));
expectQueryCancelled(new DrainCursor(cursor));

assertThat(executeKillSqlQuery(remote, targetQueryId), is(false));
assertThat(executeKillSqlQuery(local, targetQueryId), is(false));
}

// No wait.
{
AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1");

List<QueryInfo> queries = runningQueries();
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();

assertThat(executeKill(remote, QUERY, targetQueryId, true), is(true));

Awaitility.await().untilAsserted(() -> assertThat(runningQueries(), is(empty())));
expectQueryCancelled(new DrainCursor(cursor));
}
}

checkKillQuery(remote, targetQueryId, false);
checkKillQuery(local, targetQueryId, false);
@Test
public void killComputeJobFromRemote() {
Ignite local = CLUSTER.node(0);
Ignite remote = CLUSTER.node(2);

// Single execution.
{
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
JobExecution<Void> execution = local.compute().submit(JobTarget.node(clusterNode(local)), job, null);

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat(executeKillJob(remote, jobId), is(true));

assertThat(execution.stateAsync(), willBe(jobStateWithStatus(CANCELED)));

assertThat(executeKillJob(remote, jobId), is(false));
assertThat(executeKillJob(local, jobId), is(false));
}

// Single execution with nowait.
{
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
JobExecution<Void> execution = local.compute().submit(JobTarget.node(clusterNode(local)), job, null);

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat(executeKill(remote, COMPUTE, jobId, true), is(true));

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));

assertThat(executeKill(remote, COMPUTE, jobId, true), is(true));
}

// Multiple executions.
{
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
Map<ClusterNode, JobExecution<Void>> executions = local.compute().submitBroadcast(
Set.of(clusterNode(CLUSTER.node(0)), clusterNode(CLUSTER.node(1))), job, null);

executions.forEach((node, execution) -> {
UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat("Node=" + node.name(), executeKillJob(remote, jobId), is(true));

assertThat(execution.stateAsync(), willBe(jobStateWithStatus(CANCELED)));
assertThat("Node=" + node.name(), executeKillJob(remote, jobId), is(false));
assertThat("Node=" + node.name(), executeKillJob(local, jobId), is(false));
});
}
}

private static void checkKillQuery(Ignite node, UUID queryId, boolean expectedResult) {
checkKillQuery(node, queryId, expectedResult, false);
private static boolean executeKillSqlQuery(Ignite node, UUID queryId) {
return executeKill(node, QUERY, queryId, false);
}

private static void checkKillQuery(Ignite node, UUID queryId, boolean expectedResult, boolean noWait) {
String query = IgniteStringFormatter
.format("KILL QUERY '{}'{}", queryId, noWait ? " NO WAIT" : "");
private static boolean executeKillJob(Ignite node, UUID jonId) {
return executeKill(node, COMPUTE, jonId, false);
}

private static boolean executeKill(Ignite node, CancellableOperationType type, UUID queryId, boolean noWait) {
String query = format("KILL {} '{}'{}", type, queryId, noWait ? " NO WAIT" : "");

try (ResultSet<SqlRow> res = node.sql().execute(null, query)) {
assertThat(res.hasRowSet(), is(false));
assertThat(res.wasApplied(), is(expectedResult));

return res.wasApplied();
}
}

Expand All @@ -157,4 +282,21 @@ private static AsyncSqlCursor<InternalSqlRow> executeQueryInternal(Ignite node,

return await(fut);
}

private static class InfiniteJob implements ComputeJob<Void, Void> {
@Override
public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, @Nullable Void arg) {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// No op, just return from loop
break;
}
}

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public CompletableFuture<Boolean> handle(KillCommand cmd) {

CompletableFuture<Boolean> killFut = invokeCancel(handler, cmd.operationId());

if (killFut.isDone() || !cmd.noWait()) {
if (killFut.isCompletedExceptionally() || !cmd.noWait()) {
return killFut;
}

Expand Down

0 comments on commit d62c624

Please sign in to comment.