Skip to content

Commit

Permalink
IGNITE-24027 Implement failover for partitioned broadcast (#5136)
Browse files Browse the repository at this point in the history
  • Loading branch information
valepakh authored Feb 4, 2025
1 parent 48a2246 commit 546dd95
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void broadcastAsync() {
// When broadcast a job
CompletableFuture<BroadcastExecution<String>> executionFut = compute().submitAsync(
BroadcastJobTarget.nodes(nodes),
JobDescriptor.<Object[], String>builder(InteractiveJobs.interactiveJobName()).build(),
InteractiveJobs.interactiveJobDescriptor(),
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -40,7 +41,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
Expand Down Expand Up @@ -72,7 +73,6 @@
* <p>The logic is that if we run the job on the remote node and this node has left the logical topology then we should restart a job on
* another node. This is not true for broadcast and local jobs. They should not be restarted.
*/
@SuppressWarnings("resource")
public abstract class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest {
/**
* Map from node name to node index in {@link super#cluster}.
Expand Down Expand Up @@ -136,6 +136,9 @@ void remoteExecutionWorkerShutdown() throws Exception {
// And.
execution.assertExecuting();

// TODO https://issues.apache.org/jira/browse/IGNITE-24353
// assertThat(execution.node().name(), is(workerNodeName));

// And save state BEFORE worker has failed.
long createTimeBeforeFail = execution.createTimeMillis();
long startTimeBeforeFail = execution.startTimeMillis();
Expand All @@ -154,6 +157,9 @@ void remoteExecutionWorkerShutdown() throws Exception {
String failoverWorker = InteractiveJobs.globalJob().currentWorkerName();
assertThat(remoteWorkerCandidates, hasItem(failoverWorker));

// TODO https://issues.apache.org/jira/browse/IGNITE-24353
// assertThat(execution.node().name(), is(failoverWorker));

// And check create time was not changed but start time changed.
assertThat(execution.createTimeMillis(), equalTo(createTimeBeforeFail));
assertThat(execution.startTimeMillis(), greaterThan(startTimeBeforeFail));
Expand Down Expand Up @@ -226,15 +232,15 @@ void broadcastExecutionWorkerShutdown() {
InteractiveJobs.initChannels(allNodeNames());

// When start broadcast job.
CompletableFuture<BroadcastExecution<Object>> executionFut = compute(entryNode).submitAsync(
CompletableFuture<BroadcastExecution<String>> executionFut = compute(entryNode).submitAsync(
BroadcastJobTarget.nodes(clusterNode(0), clusterNode(1), clusterNode(2)),
JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build(),
InteractiveJobs.interactiveJobDescriptor(),
null
);

assertThat(executionFut, willCompleteSuccessfully());
BroadcastExecution<Object> broadcastExecution = executionFut.join();
Collection<JobExecution<Object>> executions = broadcastExecution.executions();
BroadcastExecution<String> broadcastExecution = executionFut.join();
Collection<JobExecution<String>> executions = broadcastExecution.executions();

// Then all three jobs are alive.
assertThat(executions, hasSize(3));
Expand All @@ -245,7 +251,7 @@ void broadcastExecutionWorkerShutdown() {

// When stop one of workers.
String stoppedNodeName = node(1).name();
stopNode(node(1));
stopNode(1);

// Then two jobs are alive.
executions.forEach(execution -> {
Expand All @@ -266,6 +272,62 @@ void broadcastExecutionWorkerShutdown() {
AllInteractiveJobsApi.assertEachCalledOnce();
}

@Test
void partitionedBroadcastExecutionWorkerShutdown() {
// Prepare communication channels.
InteractiveJobs.initChannels(allNodeNames());

// Given table with replicas == 3 and partitions == 1.
createReplicatedTestTableWithOneRow();
// And partition leader for partition 1.
ClusterNode primaryReplica = getPrimaryReplica(node(0));
String firstWorkerName = primaryReplica.name();

// When start broadcast job on any node that is not primary replica.
Ignite entryNode = anyNodeExcept(primaryReplica);
CompletableFuture<BroadcastExecution<String>> executionFut = compute(entryNode).submitAsync(
BroadcastJobTarget.table(TABLE_NAME),
InteractiveJobs.interactiveJobDescriptor(),
null
);

assertThat(executionFut, willCompleteSuccessfully());
BroadcastExecution<String> broadcastExecution = executionFut.join();
Collection<JobExecution<String>> executions = broadcastExecution.executions();

// Then single job is alive.
assertThat(executions, hasSize(1));

JobExecution<String> execution = executions.stream().findFirst().orElseThrow();

InteractiveJobs.byNode(primaryReplica).assertAlive();
TestingJobExecution<String> testingJobExecution = new TestingJobExecution<>(execution);
testingJobExecution.assertExecuting();

// And it is running on primary replica node.
assertThat(execution.node().name(), equalTo(firstWorkerName));

// When stop worker node.
stopNode(primaryReplica);

// Get new primary replica
primaryReplica = getPrimaryReplica(entryNode);
String failoverNodeName = primaryReplica.name();
// Which is not the same node as before.
assertThat(failoverNodeName, not(equalTo(firstWorkerName)));

// And execution is running on the new primary replica. This will implicitly wait for the job to actually run on the new node.
InteractiveJobs.byNode(primaryReplica).assertAlive();
testingJobExecution.assertExecuting();

// And the same execution object points to the new job
// TODO https://issues.apache.org/jira/browse/IGNITE-24353
// assertThat(execution.node().name(), equalTo(failoverNodeName));

InteractiveJobs.all().finishReturnPartitionNumber();
assertThat(execution.resultAsync(), willBe("0"));
}

@Test
void cancelRemoteExecutionOnRestartedJob() throws Exception {
// Given entry node.
Expand Down Expand Up @@ -345,31 +407,27 @@ void colocatedExecutionWorkerShutdown() throws Exception {
private ClusterNode getPrimaryReplica(Ignite node) {
IgniteImpl igniteImpl = unwrapIgniteImpl(node);

try {
HybridClock clock = igniteImpl.clock();
TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME));
TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), table.partitionId(Tuple.create(1).set("K", 1)));
HybridClock clock = igniteImpl.clock();
TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME));
TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), table.partitionId(Tuple.create(1).set("K", 1)));

ReplicaMeta replicaMeta = igniteImpl.placementDriver().getPrimaryReplica(tablePartitionId, clock.now()).get();
if (replicaMeta == null || replicaMeta.getLeaseholder() == null) {
throw new RuntimeException("Can not find primary replica for partition.");
}
CompletableFuture<ReplicaMeta> replicaFuture = igniteImpl.placementDriver()
.awaitPrimaryReplica(tablePartitionId, clock.now(), 30, TimeUnit.SECONDS);

return unwrapIgniteImpl(nodeByName(replicaMeta.getLeaseholder())).node();
assertThat(replicaFuture, willCompleteSuccessfully());
ReplicaMeta replicaMeta = replicaFuture.join();

} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
if (replicaMeta == null || replicaMeta.getLeaseholder() == null) {
throw new RuntimeException("Can not find primary replica for partition.");
}

return clusterNode(nodeByName(replicaMeta.getLeaseholder()));
}

private void stopNode(ClusterNode clusterNode) {
stopNode(clusterNode.name());
}

private void stopNode(Ignite ignite) {
stopNode(ignite.name());
}

private Ignite anyNodeExcept(ClusterNode except) {
String candidateName = allNodeNames()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.network.ClusterNode;

/**
Expand Down Expand Up @@ -122,6 +124,10 @@ public static String interactiveJobName() {
return InteractiveJob.class.getName();
}

public static JobDescriptor<String, String> interactiveJobDescriptor() {
return JobDescriptor.<String, String>builder(interactiveJobName()).build();
}

/**
* Signals that are sent by test code to the jobs.
*/
Expand All @@ -146,6 +152,11 @@ public enum Signal {
*/
RETURN_WORKER_NAME,

/**
* Ask job to complete and return partition number.
*/
RETURN_PARTITION_ID,

/**
* Signal to the job to continue running and send current worker name to the response channel.
*/
Expand All @@ -165,10 +176,10 @@ private static Signal listenSignal() {
}

@Override
public CompletableFuture<String> executeAsync(JobExecutionContext context, String args) {
public CompletableFuture<String> executeAsync(JobExecutionContext context, String arg) {
RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();

offerArgsAsSignals(args);
offerArgAsSignal(arg);

try {
while (true) {
Expand Down Expand Up @@ -196,11 +207,11 @@ public CompletableFuture<String> executeAsync(JobExecutionContext context, Strin
}

/**
* If any of the args are strings, convert them to signals and offer them to the job.
* If argument is a string, convert it to signal and offer to the job.
*
* @param arg Job args.
* @param arg Job arg.
*/
private static void offerArgsAsSignals(String arg) {
private static void offerArgAsSignal(String arg) {
if (arg == null) {
return;
}
Expand All @@ -216,7 +227,7 @@ private static void offerArgsAsSignals(String arg) {
* Interactive job that communicates via {@link #NODE_CHANNELS} and {@link #NODE_SIGNALS}. Also, keeps track of how many times it was
* executed via {@link #RUNNING_INTERACTIVE_JOBS_CNT}.
*/
private static class InteractiveJob implements ComputeJob<Object[], String> {
private static class InteractiveJob implements ComputeJob<String, String> {
private static Signal listenSignal(BlockingQueue<Signal> channel) {
try {
return channel.take();
Expand All @@ -226,7 +237,7 @@ private static Signal listenSignal(BlockingQueue<Signal> channel) {
}

@Override
public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) {
public CompletableFuture<String> executeAsync(JobExecutionContext context, String arg) {
RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();

try {
Expand All @@ -247,6 +258,8 @@ public CompletableFuture<String> executeAsync(JobExecutionContext context, Objec
return completedFuture("Done");
case RETURN_WORKER_NAME:
return completedFuture(workerNodeName);
case RETURN_PARTITION_ID:
return completedFuture(Integer.toString(((HashPartition) context.partition()).partitionId()));
case GET_WORKER_NAME:
NODE_CHANNELS.get(workerNodeName).add(workerNodeName);
break;
Expand Down Expand Up @@ -355,6 +368,13 @@ public void finishReturnWorkerNames() {
sendTerminalSignal(Signal.RETURN_WORKER_NAME);
}

/**
* Finishes all {@link InteractiveJob}s by returning partition number.
*/
public void finishReturnPartitionNumber() {
sendTerminalSignal(Signal.RETURN_PARTITION_ID);
}

/**
* Finishes all {@link InteractiveJob}s by returning worker node names.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -32,7 +33,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
Expand Down Expand Up @@ -148,7 +148,7 @@ private static void offerArgsAsSignals(Object arg) {
/**
* Interactive map reduce task that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}.
*/
private static class GlobalInteractiveMapReduceTask implements MapReduceTask<String, Object, Object, List<String>> {
private static class GlobalInteractiveMapReduceTask implements MapReduceTask<String, String, String, List<String>> {
// When listening for signal is interrupted, if this flag is true, then corresponding method will throw exception,
// otherwise it will clean the interrupted status.
private boolean throwExceptionOnInterruption = true;
Expand All @@ -169,7 +169,7 @@ private Signal listenSignal() {
}

@Override
public CompletableFuture<List<MapReduceJob<Object, Object>>> splitAsync(TaskExecutionContext context, String args) {
public CompletableFuture<List<MapReduceJob<String, String>>> splitAsync(TaskExecutionContext context, String args) {
RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet();

offerArgsAsSignals(args);
Expand All @@ -188,8 +188,8 @@ public CompletableFuture<List<MapReduceJob<Object, Object>>> splitAsync(TaskExec
break;
case SPLIT_RETURN_ALL_NODES:
return completedFuture(context.ignite().clusterNodes().stream().map(node ->
MapReduceJob.builder()
.jobDescriptor(JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build())
MapReduceJob.<String, String>builder()
.jobDescriptor(InteractiveJobs.interactiveJobDescriptor())
.nodes(Set.of(node))
.build()
).collect(toList()));
Expand All @@ -208,7 +208,7 @@ public CompletableFuture<List<MapReduceJob<Object, Object>>> splitAsync(TaskExec
}

@Override
public CompletableFuture<List<String>> reduceAsync(TaskExecutionContext context, Map<UUID, Object> results) {
public CompletableFuture<List<String>> reduceAsync(TaskExecutionContext context, Map<UUID, String> results) {
RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet();
try {
while (true) {
Expand All @@ -220,9 +220,7 @@ public CompletableFuture<List<String>> reduceAsync(TaskExecutionContext context,
GLOBAL_CHANNEL.offer(ACK);
break;
case REDUCE_RETURN:
return completedFuture(results.values().stream()
.map(String.class::cast)
.collect(toList()));
return completedFuture(new ArrayList<>(results.values()));
case CHECK_CANCEL:
if (context.isCancelled()) {
throw new RuntimeException("Task is cancelled");
Expand Down
Loading

0 comments on commit 546dd95

Please sign in to comment.