Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23499 Integrate cancellation of compute job with sql kill handler. #4898

Merged
merged 9 commits into from
Dec 17, 2024
1 change: 1 addition & 0 deletions modules/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-placement-driver-api')
implementation project(':ignite-binary-tuple')
implementation project(':ignite-client-common')
implementation project(':ignite-sql-engine-api')
implementation project(':ignite-system-view-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
import org.apache.ignite.internal.sql.engine.api.kill.OperationKillHandler;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableViewInternal;
Expand Down Expand Up @@ -452,6 +454,29 @@ public CompletableFuture<byte[]> runReceiverAsync(byte[] payload, ClusterNode no
});
}

/** Returns a {@link OperationKillHandler kill handler} for the compute job. */
public OperationKillHandler killHandler() {
return new OperationKillHandler() {
@Override
public CompletableFuture<Boolean> cancelAsync(String operationId) {
UUID jobId = UUID.fromString(operationId);

return IgniteComputeImpl.this.cancelAsync(jobId)
.thenApply(res -> res != null ? res : Boolean.FALSE);
}

@Override
public boolean local() {
return false;
}

@Override
public CancellableOperationType type() {
return CancellableOperationType.COMPUTE;
}
};
}

@TestOnly
ComputeComponent computeComponent() {
return computeComponent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ public class IgniteImpl implements Ignite {
metaStorageMgr.addElectionListener(catalogCompactionRunner::updateCoordinator);
this.catalogCompactionRunner = catalogCompactionRunner;

KillCommandHandler killCommandHandler = new KillCommandHandler(name, logicalTopologyService, clusterSvc.messagingService());

lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
params -> catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters) params).newLowWatermark()));

Expand Down Expand Up @@ -1083,7 +1085,7 @@ public class IgniteImpl implements Ignite {
txManager,
lowWatermark,
threadPoolsManager.commonScheduler(),
new KillCommandHandler(name, logicalTopologyService, clusterSvc.messagingService())
killCommandHandler
);

systemViewManager.register(qryEngine);
Expand Down Expand Up @@ -1123,6 +1125,8 @@ public class IgniteImpl implements Ignite {
clock
);

killCommandHandler.register(((IgniteComputeImpl) compute).killHandler());

authenticationManager = createAuthenticationManager();

ClientConnectorConfiguration clientConnectorConfiguration = nodeConfigRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,58 @@

package org.apache.ignite.internal.sql.engine.kill;

import static org.apache.ignite.compute.JobStatus.CANCELED;
import static org.apache.ignite.compute.JobStatus.EXECUTING;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType.COMPUTE;
import static org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType.QUERY;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType;
import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/**
* Integration tests for SQL '{@code KILL}' statement.
Expand All @@ -58,31 +80,40 @@ public void checkResourceLeak() {
assertThat(queryProcessor().openedCursors(), is(0));
}

@Test
public void killQueryMetadata() {
assertQuery("KILL QUERY '00000000-0000-0000-0000-000000000000'")
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killQueryMetadata(CancellableOperationType type) {
assertQuery(format("KILL {} '00000000-0000-0000-0000-000000000000'", type))
.columnMetadata(
new MetadataMatcher().name("APPLIED").type(ColumnType.BOOLEAN).nullable(false)
)
.check();
}

@Test
public void killWithInvalidQueryIdentifier() {
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killWithInvalidIdentifier(CancellableOperationType type) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove assumption.
assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not implemented yet");

SqlException err = assertThrowsSqlException(
SqlException.class,
Sql.RUNTIME_ERR,
"Invalid operation ID format [operationId=123, type=QUERY]",
() -> await(igniteSql().executeAsync(null, "KILL QUERY '123'"))
format("Invalid operation ID format [operationId=123, type={}]", type),
() -> await(igniteSql().executeAsync(null, format("KILL {} '123'", type)))
);

assertThat(err.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(err.getCause().getMessage(), equalTo("Invalid UUID string: 123"));
}

@Test
public void killNonExistentQuery() {
checkKillQuery(CLUSTER.aliveNode(), UUID.randomUUID(), false);
@ParameterizedTest
@EnumSource(CancellableOperationType.class)
public void killNonExistentOperation(CancellableOperationType type) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23488 Remove assumption.
assumeTrue(type != CancellableOperationType.TRANSACTION, type + " not implemented yet");

assertThat(executeKill(CLUSTER.aliveNode(), type, UUID.randomUUID(), false), is(false));
}

@Test
Expand All @@ -95,46 +126,141 @@ public void killQueryFromLocal() {
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();

checkKillQuery(node, targetQueryId, true);
assertThat(executeKillSqlQuery(node, targetQueryId), is(true));

assertThat(runningQueries(), is(empty()));
expectQueryCancelled(new DrainCursor(cursor));

checkKillQuery(node, targetQueryId, false);
checkKillQuery(node, targetQueryId, true, true);
assertThat(executeKillSqlQuery(node, targetQueryId), is(false));
assertThat(executeKill(node, QUERY, targetQueryId, true), is(true));
}

@Test
public void killComputeJobFromLocal() {
Ignite node = CLUSTER.aliveNode();
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
JobExecution<Void> execution = node.compute().submit(JobTarget.node(clusterNode(node)), job, null);

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to wait status here ? let`s change this test - kill job immediately.

Copy link
Contributor Author

@xtern xtern Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed this condition.

why do we need to wait status here ?

I thought that we need to wait until task will be deployed and I assumed that if this is not done, there possible flaky failures (that we will cancel a non-existent task). But locally I don't see flaky failures so I removed this check here and in the killComputeJobFromRemote test.


UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat(executeKillJob(node, jobId), is(true));

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));

assertThat(executeKillJob(node, jobId), is(false));
assertThat(executeKill(node, COMPUTE, jobId, true), is(true));
}

@Test
public void killQueryFromRemote() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we have no "real" tests for killing long-living queries ? WDYT, do we need such a tests ? Because we already have real infinite job in compute, but it not tested in sql.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean something that doesn't fit on a single page?
i.e. SELECT x FROM system_range(0, 100000)?

Ignite local = CLUSTER.node(0);
Ignite remote = CLUSTER.node(2);

AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1");
{
AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1");

List<QueryInfo> queries = runningQueries();
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();
List<QueryInfo> queries = runningQueries();
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();

checkKillQuery(remote, targetQueryId, true);
assertThat(executeKillSqlQuery(remote, targetQueryId), is(true));

assertThat(runningQueries(), is(empty()));
expectQueryCancelled(new DrainCursor(cursor));
assertThat(runningQueries(), is(empty()));
expectQueryCancelled(new DrainCursor(cursor));

assertThat(executeKillSqlQuery(remote, targetQueryId), is(false));
assertThat(executeKillSqlQuery(local, targetQueryId), is(false));
}

// No wait.
{
AsyncSqlCursor<InternalSqlRow> cursor = executeQueryInternal(local, "SELECT 1");

checkKillQuery(remote, targetQueryId, false);
checkKillQuery(local, targetQueryId, false);
List<QueryInfo> queries = runningQueries();
assertThat(queries.size(), is(1));
UUID targetQueryId = queries.get(0).id();

assertThat(executeKill(remote, QUERY, targetQueryId, true), is(true));

Awaitility.await().untilAsserted(() -> assertThat(runningQueries(), is(empty())));
expectQueryCancelled(new DrainCursor(cursor));
}
}

private static void checkKillQuery(Ignite node, UUID queryId, boolean expectedResult) {
checkKillQuery(node, queryId, expectedResult, false);
@Test
public void killComputeJobFromRemote() {
Ignite local = CLUSTER.node(0);
Ignite remote = CLUSTER.node(2);

// Single execution.
{
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
JobExecution<Void> execution = local.compute().submit(JobTarget.node(clusterNode(local)), job, null);

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat(executeKillJob(remote, jobId), is(true));

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));

assertThat(executeKillJob(remote, jobId), is(false));
assertThat(executeKillJob(local, jobId), is(false));
}

// Single execution with nowait.
{
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
JobExecution<Void> execution = local.compute().submit(JobTarget.node(clusterNode(local)), job, null);

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(EXECUTING)));

UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat(executeKill(remote, COMPUTE, jobId, true), is(true));

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));

assertThat(executeKill(remote, COMPUTE, jobId, true), is(true));
}

// Multiple executions.
{
JobDescriptor<Void, Void> job = JobDescriptor.builder(InfiniteJob.class).units(List.of()).build();
Map<ClusterNode, JobExecution<Void>> executions = local.compute().submitBroadcast(
Set.of(clusterNode(CLUSTER.node(0)), clusterNode(CLUSTER.node(1))), job, null);

executions.forEach((node, execution) -> {
UUID jobId = await(execution.idAsync());
assertThat(jobId, not(nullValue()));
assertThat("Node=" + node.name(), executeKillJob(remote, jobId), is(true));

Awaitility.await().until(execution::stateAsync, willBe(jobStateWithStatus(CANCELED)));

assertThat("Node=" + node.name(), executeKillJob(remote, jobId), is(false));
assertThat("Node=" + node.name(), executeKillJob(local, jobId), is(false));
});
}
}

private static void checkKillQuery(Ignite node, UUID queryId, boolean expectedResult, boolean noWait) {
String query = IgniteStringFormatter
.format("KILL QUERY '{}'{}", queryId, noWait ? " NO WAIT" : "");
private static boolean executeKillSqlQuery(Ignite node, UUID queryId) {
return executeKill(node, QUERY, queryId, false);
}

private static boolean executeKillJob(Ignite node, UUID jonId) {
return executeKill(node, COMPUTE, jonId, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static boolean executeKillJob(Ignite node, UUID jonId) {
return executeKill(node, COMPUTE, jonId, false);
private static boolean executeKillJob(Ignite node, UUID jobId) {
return executeKill(node, COMPUTE, jobId, false);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks

}

private static boolean executeKill(Ignite node, CancellableOperationType type, UUID queryId, boolean noWait) {
String query = format("KILL {} '{}'{}", type, queryId, noWait ? " NO WAIT" : "");

try (ResultSet<SqlRow> res = node.sql().execute(null, query)) {
assertThat(res.hasRowSet(), is(false));
assertThat(res.wasApplied(), is(expectedResult));

return res.wasApplied();
}
}

Expand All @@ -157,4 +283,21 @@ private static AsyncSqlCursor<InternalSqlRow> executeQueryInternal(Ignite node,

return await(fut);
}

private static class InfiniteJob implements ComputeJob<Void, Void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid copy-paste ? can it be public ItComputeSystemViewTest.InfiniteJob ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks

@Override
public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, @Nullable Void arg) {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// No op, just return from loop
break;
}
}

return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public CompletableFuture<Boolean> handle(KillCommand cmd) {

CompletableFuture<Boolean> killFut = invokeCancel(handler, cmd.operationId());

if (killFut.isDone() || !cmd.noWait()) {
if (killFut.isCompletedExceptionally() || !cmd.noWait()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I miss smth here, if noWait == true - we must return immediatelly i.e. return CompletableFuture.completedFuture(true); otherwise need to return killFut, isn`t it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

if noWait == true - we must return immediatelly i.e. return CompletableFuture.completedFuture(true);

...except handling exception described in javadoc

@throws IllegalArgumentException If the operation identifier is not in the correct format.

For example ``KILL QUERY '123' NO WAIT` throws exception instead of returning true 🤔

return killFut;
}

Expand Down