Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24027 Implement failover for partitioned broadcast #5136

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void broadcastAsync() {
// When broadcast a job
CompletableFuture<BroadcastExecution<String>> executionFut = compute().submitAsync(
BroadcastJobTarget.nodes(nodes),
JobDescriptor.<Object[], String>builder(InteractiveJobs.interactiveJobName()).build(),
InteractiveJobs.interactiveJobDescriptor(),
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -40,7 +41,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
Expand Down Expand Up @@ -72,7 +73,6 @@
* <p>The logic is that if we run the job on the remote node and this node has left the logical topology then we should restart a job on
* another node. This is not true for broadcast and local jobs. They should not be restarted.
*/
@SuppressWarnings("resource")
public abstract class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest {
/**
* Map from node name to node index in {@link super#cluster}.
Expand Down Expand Up @@ -136,6 +136,9 @@ void remoteExecutionWorkerShutdown() throws Exception {
// And.
execution.assertExecuting();

// TODO https://issues.apache.org/jira/browse/IGNITE-24353
// assertThat(execution.node().name(), is(workerNodeName));

// And save state BEFORE worker has failed.
long createTimeBeforeFail = execution.createTimeMillis();
long startTimeBeforeFail = execution.startTimeMillis();
Expand All @@ -154,6 +157,9 @@ void remoteExecutionWorkerShutdown() throws Exception {
String failoverWorker = InteractiveJobs.globalJob().currentWorkerName();
assertThat(remoteWorkerCandidates, hasItem(failoverWorker));

// TODO https://issues.apache.org/jira/browse/IGNITE-24353
// assertThat(execution.node().name(), is(failoverWorker));

// And check create time was not changed but start time changed.
assertThat(execution.createTimeMillis(), equalTo(createTimeBeforeFail));
assertThat(execution.startTimeMillis(), greaterThan(startTimeBeforeFail));
Expand Down Expand Up @@ -226,15 +232,15 @@ void broadcastExecutionWorkerShutdown() {
InteractiveJobs.initChannels(allNodeNames());

// When start broadcast job.
CompletableFuture<BroadcastExecution<Object>> executionFut = compute(entryNode).submitAsync(
CompletableFuture<BroadcastExecution<String>> executionFut = compute(entryNode).submitAsync(
BroadcastJobTarget.nodes(clusterNode(0), clusterNode(1), clusterNode(2)),
JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build(),
InteractiveJobs.interactiveJobDescriptor(),
null
);

assertThat(executionFut, willCompleteSuccessfully());
BroadcastExecution<Object> broadcastExecution = executionFut.join();
Collection<JobExecution<Object>> executions = broadcastExecution.executions();
BroadcastExecution<String> broadcastExecution = executionFut.join();
Collection<JobExecution<String>> executions = broadcastExecution.executions();

// Then all three jobs are alive.
assertThat(executions, hasSize(3));
Expand All @@ -245,7 +251,7 @@ void broadcastExecutionWorkerShutdown() {

// When stop one of workers.
String stoppedNodeName = node(1).name();
stopNode(node(1));
stopNode(1);

// Then two jobs are alive.
executions.forEach(execution -> {
Expand All @@ -266,6 +272,62 @@ void broadcastExecutionWorkerShutdown() {
AllInteractiveJobsApi.assertEachCalledOnce();
}

@Test
void partitionedBroadcastExecutionWorkerShutdown() {
// Prepare communication channels.
InteractiveJobs.initChannels(allNodeNames());

// Given table with replicas == 3 and partitions == 1.
createReplicatedTestTableWithOneRow();
// And partition leader for partition 1.
ClusterNode primaryReplica = getPrimaryReplica(node(0));
String firstWorkerName = primaryReplica.name();

// When start broadcast job on any node that is not primary replica.
Ignite entryNode = anyNodeExcept(primaryReplica);
CompletableFuture<BroadcastExecution<String>> executionFut = compute(entryNode).submitAsync(
BroadcastJobTarget.table(TABLE_NAME),
InteractiveJobs.interactiveJobDescriptor(),
null
);

assertThat(executionFut, willCompleteSuccessfully());
BroadcastExecution<String> broadcastExecution = executionFut.join();
Collection<JobExecution<String>> executions = broadcastExecution.executions();

// Then single job is alive.
assertThat(executions, hasSize(1));

JobExecution<String> execution = executions.stream().findFirst().orElseThrow();

InteractiveJobs.byNode(primaryReplica).assertAlive();
TestingJobExecution<String> testingJobExecution = new TestingJobExecution<>(execution);
testingJobExecution.assertExecuting();

// And it is running on primary replica node.
assertThat(execution.node().name(), equalTo(firstWorkerName));

// When stop worker node.
stopNode(primaryReplica);

// Get new primary replica
primaryReplica = getPrimaryReplica(entryNode);
String failoverNodeName = primaryReplica.name();
// Which is not the same node as before.
assertThat(failoverNodeName, not(equalTo(firstWorkerName)));

// And execution is running on the new primary replica. This will implicitly wait for the job to actually run on the new node.
InteractiveJobs.byNode(primaryReplica).assertAlive();
testingJobExecution.assertExecuting();

// And the same execution object points to the new job
// TODO https://issues.apache.org/jira/browse/IGNITE-24353
// assertThat(execution.node().name(), equalTo(failoverNodeName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to check after it if the job can finish and the result is as expected.


InteractiveJobs.all().finishReturnPartitionNumber();
assertThat(execution.resultAsync(), willBe("0"));
}

@Test
void cancelRemoteExecutionOnRestartedJob() throws Exception {
// Given entry node.
Expand Down Expand Up @@ -345,31 +407,27 @@ void colocatedExecutionWorkerShutdown() throws Exception {
private ClusterNode getPrimaryReplica(Ignite node) {
IgniteImpl igniteImpl = unwrapIgniteImpl(node);

try {
HybridClock clock = igniteImpl.clock();
TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME));
TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), table.partitionId(Tuple.create(1).set("K", 1)));
HybridClock clock = igniteImpl.clock();
TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME));
TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), table.partitionId(Tuple.create(1).set("K", 1)));

ReplicaMeta replicaMeta = igniteImpl.placementDriver().getPrimaryReplica(tablePartitionId, clock.now()).get();
if (replicaMeta == null || replicaMeta.getLeaseholder() == null) {
throw new RuntimeException("Can not find primary replica for partition.");
}
CompletableFuture<ReplicaMeta> replicaFuture = igniteImpl.placementDriver()
.awaitPrimaryReplica(tablePartitionId, clock.now(), 30, TimeUnit.SECONDS);

return unwrapIgniteImpl(nodeByName(replicaMeta.getLeaseholder())).node();
assertThat(replicaFuture, willCompleteSuccessfully());
ReplicaMeta replicaMeta = replicaFuture.join();

} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
if (replicaMeta == null || replicaMeta.getLeaseholder() == null) {
throw new RuntimeException("Can not find primary replica for partition.");
}

return clusterNode(nodeByName(replicaMeta.getLeaseholder()));
}

private void stopNode(ClusterNode clusterNode) {
stopNode(clusterNode.name());
}

private void stopNode(Ignite ignite) {
stopNode(ignite.name());
}

private Ignite anyNodeExcept(ClusterNode except) {
String candidateName = allNodeNames()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.network.ClusterNode;

/**
Expand Down Expand Up @@ -122,6 +124,10 @@ public static String interactiveJobName() {
return InteractiveJob.class.getName();
}

public static JobDescriptor<String, String> interactiveJobDescriptor() {
return JobDescriptor.<String, String>builder(interactiveJobName()).build();
}

/**
* Signals that are sent by test code to the jobs.
*/
Expand All @@ -146,6 +152,11 @@ public enum Signal {
*/
RETURN_WORKER_NAME,

/**
* Ask job to complete and return partition number.
*/
RETURN_PARTITION_ID,

/**
* Signal to the job to continue running and send current worker name to the response channel.
*/
Expand All @@ -165,10 +176,10 @@ private static Signal listenSignal() {
}

@Override
public CompletableFuture<String> executeAsync(JobExecutionContext context, String args) {
public CompletableFuture<String> executeAsync(JobExecutionContext context, String arg) {
RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();

offerArgsAsSignals(args);
offerArgAsSignal(arg);

try {
while (true) {
Expand Down Expand Up @@ -196,11 +207,11 @@ public CompletableFuture<String> executeAsync(JobExecutionContext context, Strin
}

/**
* If any of the args are strings, convert them to signals and offer them to the job.
* If argument is a string, convert it to signal and offer to the job.
*
* @param arg Job args.
* @param arg Job arg.
*/
private static void offerArgsAsSignals(String arg) {
private static void offerArgAsSignal(String arg) {
if (arg == null) {
return;
}
Expand All @@ -216,7 +227,7 @@ private static void offerArgsAsSignals(String arg) {
* Interactive job that communicates via {@link #NODE_CHANNELS} and {@link #NODE_SIGNALS}. Also, keeps track of how many times it was
* executed via {@link #RUNNING_INTERACTIVE_JOBS_CNT}.
*/
private static class InteractiveJob implements ComputeJob<Object[], String> {
private static class InteractiveJob implements ComputeJob<String, String> {
private static Signal listenSignal(BlockingQueue<Signal> channel) {
try {
return channel.take();
Expand All @@ -226,7 +237,7 @@ private static Signal listenSignal(BlockingQueue<Signal> channel) {
}

@Override
public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) {
public CompletableFuture<String> executeAsync(JobExecutionContext context, String arg) {
RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();

try {
Expand All @@ -247,6 +258,8 @@ public CompletableFuture<String> executeAsync(JobExecutionContext context, Objec
return completedFuture("Done");
case RETURN_WORKER_NAME:
return completedFuture(workerNodeName);
case RETURN_PARTITION_ID:
return completedFuture(Integer.toString(((HashPartition) context.partition()).partitionId()));
case GET_WORKER_NAME:
NODE_CHANNELS.get(workerNodeName).add(workerNodeName);
break;
Expand Down Expand Up @@ -355,6 +368,13 @@ public void finishReturnWorkerNames() {
sendTerminalSignal(Signal.RETURN_WORKER_NAME);
}

/**
* Finishes all {@link InteractiveJob}s by returning partition number.
*/
public void finishReturnPartitionNumber() {
sendTerminalSignal(Signal.RETURN_PARTITION_ID);
}

/**
* Finishes all {@link InteractiveJob}s by returning worker node names.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -32,7 +33,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
Expand Down Expand Up @@ -148,7 +148,7 @@ private static void offerArgsAsSignals(Object arg) {
/**
* Interactive map reduce task that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}.
*/
private static class GlobalInteractiveMapReduceTask implements MapReduceTask<String, Object, Object, List<String>> {
private static class GlobalInteractiveMapReduceTask implements MapReduceTask<String, String, String, List<String>> {
// When listening for signal is interrupted, if this flag is true, then corresponding method will throw exception,
// otherwise it will clean the interrupted status.
private boolean throwExceptionOnInterruption = true;
Expand All @@ -169,7 +169,7 @@ private Signal listenSignal() {
}

@Override
public CompletableFuture<List<MapReduceJob<Object, Object>>> splitAsync(TaskExecutionContext context, String args) {
public CompletableFuture<List<MapReduceJob<String, String>>> splitAsync(TaskExecutionContext context, String args) {
RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();

offerArgsAsSignals(args);
Expand All @@ -188,8 +188,8 @@ public CompletableFuture<List<MapReduceJob<Object, Object>>> splitAsync(TaskExec
break;
case SPLIT_RETURN_ALL_NODES:
return completedFuture(context.ignite().clusterNodes().stream().map(node ->
MapReduceJob.builder()
.jobDescriptor(JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build())
MapReduceJob.<String, String>builder()
.jobDescriptor(InteractiveJobs.interactiveJobDescriptor())
.nodes(Set.of(node))
.build()
).collect(toList()));
Expand All @@ -208,7 +208,7 @@ public CompletableFuture<List<MapReduceJob<Object, Object>>> splitAsync(TaskExec
}

@Override
public CompletableFuture<List<String>> reduceAsync(TaskExecutionContext context, Map<UUID, Object> results) {
public CompletableFuture<List<String>> reduceAsync(TaskExecutionContext context, Map<UUID, String> results) {
RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet();
try {
while (true) {
Expand All @@ -220,9 +220,7 @@ public CompletableFuture<List<String>> reduceAsync(TaskExecutionContext context,
GLOBAL_CHANNEL.offer(ACK);
break;
case REDUCE_RETURN:
return completedFuture(results.values().stream()
.map(String.class::cast)
.collect(toList()));
return completedFuture(new ArrayList<>(results.values()));
case CHECK_CANCEL:
if (context.isCancelled()) {
throw new RuntimeException("Task is cancelled");
Expand Down
Loading