diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java index 321dd1e5b74..b905ae9a0b3 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeErrorsBaseTest.java @@ -127,7 +127,7 @@ void broadcastAsync() { // When broadcast a job CompletableFuture> executionFut = compute().submitAsync( BroadcastJobTarget.nodes(nodes), - JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build(), + InteractiveJobs.interactiveJobDescriptor(), null ); diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java index 6f28ba3ccc0..51034fe8692 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -40,7 +41,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.compute.BroadcastExecution; import org.apache.ignite.compute.BroadcastJobTarget; @@ -72,7 +73,6 @@ *

The logic is that if we run the job on the remote node and this node has left the logical topology then we should restart a job on * another node. This is not true for broadcast and local jobs. They should not be restarted. */ -@SuppressWarnings("resource") public abstract class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest { /** * Map from node name to node index in {@link super#cluster}. @@ -136,6 +136,9 @@ void remoteExecutionWorkerShutdown() throws Exception { // And. execution.assertExecuting(); + // TODO https://issues.apache.org/jira/browse/IGNITE-24353 + // assertThat(execution.node().name(), is(workerNodeName)); + // And save state BEFORE worker has failed. long createTimeBeforeFail = execution.createTimeMillis(); long startTimeBeforeFail = execution.startTimeMillis(); @@ -154,6 +157,9 @@ void remoteExecutionWorkerShutdown() throws Exception { String failoverWorker = InteractiveJobs.globalJob().currentWorkerName(); assertThat(remoteWorkerCandidates, hasItem(failoverWorker)); + // TODO https://issues.apache.org/jira/browse/IGNITE-24353 + // assertThat(execution.node().name(), is(failoverWorker)); + // And check create time was not changed but start time changed. assertThat(execution.createTimeMillis(), equalTo(createTimeBeforeFail)); assertThat(execution.startTimeMillis(), greaterThan(startTimeBeforeFail)); @@ -226,15 +232,15 @@ void broadcastExecutionWorkerShutdown() { InteractiveJobs.initChannels(allNodeNames()); // When start broadcast job. - CompletableFuture> executionFut = compute(entryNode).submitAsync( + CompletableFuture> executionFut = compute(entryNode).submitAsync( BroadcastJobTarget.nodes(clusterNode(0), clusterNode(1), clusterNode(2)), - JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build(), + InteractiveJobs.interactiveJobDescriptor(), null ); assertThat(executionFut, willCompleteSuccessfully()); - BroadcastExecution broadcastExecution = executionFut.join(); - Collection> executions = broadcastExecution.executions(); + BroadcastExecution broadcastExecution = executionFut.join(); + Collection> executions = broadcastExecution.executions(); // Then all three jobs are alive. assertThat(executions, hasSize(3)); @@ -245,7 +251,7 @@ void broadcastExecutionWorkerShutdown() { // When stop one of workers. String stoppedNodeName = node(1).name(); - stopNode(node(1)); + stopNode(1); // Then two jobs are alive. executions.forEach(execution -> { @@ -266,6 +272,62 @@ void broadcastExecutionWorkerShutdown() { AllInteractiveJobsApi.assertEachCalledOnce(); } + @Test + void partitionedBroadcastExecutionWorkerShutdown() { + // Prepare communication channels. + InteractiveJobs.initChannels(allNodeNames()); + + // Given table with replicas == 3 and partitions == 1. + createReplicatedTestTableWithOneRow(); + // And partition leader for partition 1. + ClusterNode primaryReplica = getPrimaryReplica(node(0)); + String firstWorkerName = primaryReplica.name(); + + // When start broadcast job on any node that is not primary replica. + Ignite entryNode = anyNodeExcept(primaryReplica); + CompletableFuture> executionFut = compute(entryNode).submitAsync( + BroadcastJobTarget.table(TABLE_NAME), + InteractiveJobs.interactiveJobDescriptor(), + null + ); + + assertThat(executionFut, willCompleteSuccessfully()); + BroadcastExecution broadcastExecution = executionFut.join(); + Collection> executions = broadcastExecution.executions(); + + // Then single job is alive. + assertThat(executions, hasSize(1)); + + JobExecution execution = executions.stream().findFirst().orElseThrow(); + + InteractiveJobs.byNode(primaryReplica).assertAlive(); + TestingJobExecution testingJobExecution = new TestingJobExecution<>(execution); + testingJobExecution.assertExecuting(); + + // And it is running on primary replica node. + assertThat(execution.node().name(), equalTo(firstWorkerName)); + + // When stop worker node. + stopNode(primaryReplica); + + // Get new primary replica + primaryReplica = getPrimaryReplica(entryNode); + String failoverNodeName = primaryReplica.name(); + // Which is not the same node as before. + assertThat(failoverNodeName, not(equalTo(firstWorkerName))); + + // And execution is running on the new primary replica. This will implicitly wait for the job to actually run on the new node. + InteractiveJobs.byNode(primaryReplica).assertAlive(); + testingJobExecution.assertExecuting(); + + // And the same execution object points to the new job + // TODO https://issues.apache.org/jira/browse/IGNITE-24353 + // assertThat(execution.node().name(), equalTo(failoverNodeName)); + + InteractiveJobs.all().finishReturnPartitionNumber(); + assertThat(execution.resultAsync(), willBe("0")); + } + @Test void cancelRemoteExecutionOnRestartedJob() throws Exception { // Given entry node. @@ -345,31 +407,27 @@ void colocatedExecutionWorkerShutdown() throws Exception { private ClusterNode getPrimaryReplica(Ignite node) { IgniteImpl igniteImpl = unwrapIgniteImpl(node); - try { - HybridClock clock = igniteImpl.clock(); - TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME)); - TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), table.partitionId(Tuple.create(1).set("K", 1))); + HybridClock clock = igniteImpl.clock(); + TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME)); + TablePartitionId tablePartitionId = new TablePartitionId(table.tableId(), table.partitionId(Tuple.create(1).set("K", 1))); - ReplicaMeta replicaMeta = igniteImpl.placementDriver().getPrimaryReplica(tablePartitionId, clock.now()).get(); - if (replicaMeta == null || replicaMeta.getLeaseholder() == null) { - throw new RuntimeException("Can not find primary replica for partition."); - } + CompletableFuture replicaFuture = igniteImpl.placementDriver() + .awaitPrimaryReplica(tablePartitionId, clock.now(), 30, TimeUnit.SECONDS); - return unwrapIgniteImpl(nodeByName(replicaMeta.getLeaseholder())).node(); + assertThat(replicaFuture, willCompleteSuccessfully()); + ReplicaMeta replicaMeta = replicaFuture.join(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + if (replicaMeta == null || replicaMeta.getLeaseholder() == null) { + throw new RuntimeException("Can not find primary replica for partition."); } + + return clusterNode(nodeByName(replicaMeta.getLeaseholder())); } private void stopNode(ClusterNode clusterNode) { stopNode(clusterNode.name()); } - private void stopNode(Ignite ignite) { - stopNode(ignite.name()); - } - private Ignite anyNodeExcept(ClusterNode except) { String candidateName = allNodeNames() .stream() diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java index 4cc1697a17c..1f1b3fd60e6 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java @@ -32,7 +32,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.JobExecutionContext; +import org.apache.ignite.internal.table.partition.HashPartition; import org.apache.ignite.network.ClusterNode; /** @@ -122,6 +124,10 @@ public static String interactiveJobName() { return InteractiveJob.class.getName(); } + public static JobDescriptor interactiveJobDescriptor() { + return JobDescriptor.builder(interactiveJobName()).build(); + } + /** * Signals that are sent by test code to the jobs. */ @@ -146,6 +152,11 @@ public enum Signal { */ RETURN_WORKER_NAME, + /** + * Ask job to complete and return partition number. + */ + RETURN_PARTITION_ID, + /** * Signal to the job to continue running and send current worker name to the response channel. */ @@ -165,10 +176,10 @@ private static Signal listenSignal() { } @Override - public CompletableFuture executeAsync(JobExecutionContext context, String args) { + public CompletableFuture executeAsync(JobExecutionContext context, String arg) { RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet(); - offerArgsAsSignals(args); + offerArgAsSignal(arg); try { while (true) { @@ -196,11 +207,11 @@ public CompletableFuture executeAsync(JobExecutionContext context, Strin } /** - * If any of the args are strings, convert them to signals and offer them to the job. + * If argument is a string, convert it to signal and offer to the job. * - * @param arg Job args. + * @param arg Job arg. */ - private static void offerArgsAsSignals(String arg) { + private static void offerArgAsSignal(String arg) { if (arg == null) { return; } @@ -216,7 +227,7 @@ private static void offerArgsAsSignals(String arg) { * Interactive job that communicates via {@link #NODE_CHANNELS} and {@link #NODE_SIGNALS}. Also, keeps track of how many times it was * executed via {@link #RUNNING_INTERACTIVE_JOBS_CNT}. */ - private static class InteractiveJob implements ComputeJob { + private static class InteractiveJob implements ComputeJob { private static Signal listenSignal(BlockingQueue channel) { try { return channel.take(); @@ -226,7 +237,7 @@ private static Signal listenSignal(BlockingQueue channel) { } @Override - public CompletableFuture executeAsync(JobExecutionContext context, Object... args) { + public CompletableFuture executeAsync(JobExecutionContext context, String arg) { RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet(); try { @@ -247,6 +258,8 @@ public CompletableFuture executeAsync(JobExecutionContext context, Objec return completedFuture("Done"); case RETURN_WORKER_NAME: return completedFuture(workerNodeName); + case RETURN_PARTITION_ID: + return completedFuture(Integer.toString(((HashPartition) context.partition()).partitionId())); case GET_WORKER_NAME: NODE_CHANNELS.get(workerNodeName).add(workerNodeName); break; @@ -355,6 +368,13 @@ public void finishReturnWorkerNames() { sendTerminalSignal(Signal.RETURN_WORKER_NAME); } + /** + * Finishes all {@link InteractiveJob}s by returning partition number. + */ + public void finishReturnPartitionNumber() { + sendTerminalSignal(Signal.RETURN_PARTITION_ID); + } + /** * Finishes all {@link InteractiveJob}s by returning worker node names. */ diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java index 8aec9bb9c8e..eed993692f3 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -32,7 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.compute.JobDescriptor; import org.apache.ignite.compute.task.MapReduceJob; import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecutionContext; @@ -148,7 +148,7 @@ private static void offerArgsAsSignals(Object arg) { /** * Interactive map reduce task that communicates via {@link #GLOBAL_CHANNEL} and {@link #GLOBAL_SIGNALS}. */ - private static class GlobalInteractiveMapReduceTask implements MapReduceTask> { + private static class GlobalInteractiveMapReduceTask implements MapReduceTask> { // When listening for signal is interrupted, if this flag is true, then corresponding method will throw exception, // otherwise it will clean the interrupted status. private boolean throwExceptionOnInterruption = true; @@ -169,7 +169,7 @@ private Signal listenSignal() { } @Override - public CompletableFuture>> splitAsync(TaskExecutionContext context, String args) { + public CompletableFuture>> splitAsync(TaskExecutionContext context, String args) { RUNNING_GLOBAL_SPLIT_CNT.incrementAndGet(); offerArgsAsSignals(args); @@ -188,8 +188,8 @@ public CompletableFuture>> splitAsync(TaskExec break; case SPLIT_RETURN_ALL_NODES: return completedFuture(context.ignite().clusterNodes().stream().map(node -> - MapReduceJob.builder() - .jobDescriptor(JobDescriptor.builder(InteractiveJobs.interactiveJobName()).build()) + MapReduceJob.builder() + .jobDescriptor(InteractiveJobs.interactiveJobDescriptor()) .nodes(Set.of(node)) .build() ).collect(toList())); @@ -208,7 +208,7 @@ public CompletableFuture>> splitAsync(TaskExec } @Override - public CompletableFuture> reduceAsync(TaskExecutionContext context, Map results) { + public CompletableFuture> reduceAsync(TaskExecutionContext context, Map results) { RUNNING_GLOBAL_REDUCE_CNT.incrementAndGet(); try { while (true) { @@ -220,9 +220,7 @@ public CompletableFuture> reduceAsync(TaskExecutionContext context, GLOBAL_CHANNEL.offer(ACK); break; case REDUCE_RETURN: - return completedFuture(results.values().stream() - .map(String.class::cast) - .collect(toList())); + return completedFuture(new ArrayList<>(results.values())); case CHECK_CANCEL: if (context.isCancelled()) { throw new RuntimeException("Task is cancelled"); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index 9df4f1fcb4a..953b60158eb 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -200,16 +200,17 @@ public CompletableFuture> submitAsync( } else if (target instanceof TableJobTarget) { TableJobTarget tableJobTarget = (TableJobTarget) target; return requiredTable(tableJobTarget.tableName()) - .thenCompose(table -> table.partitionManager().primaryReplicasAsync()) - .thenCompose(replicas -> toBroadcastExecution(replicas.entrySet().stream() - .map(entry -> submitForBroadcast( - entry.getValue(), - entry.getKey(), - descriptor, - argHolder, - cancellationToken - ))) - ); + .thenCompose(table -> table.partitionManager().primaryReplicasAsync() + .thenCompose(replicas -> toBroadcastExecution(replicas.entrySet().stream() + .map(entry -> submitForBroadcast( + entry.getValue(), + entry.getKey(), + table.tableId(), + descriptor, + argHolder, + cancellationToken + ))) + )); } throw new IllegalArgumentException("Unsupported job target: " + target); @@ -244,12 +245,17 @@ private CompletableFuture> submitForBroadcast( ) { ExecutionOptions options = ExecutionOptions.from(descriptor.options()); - return submitForBroadcast(node, descriptor, options, argHolder, cancellationToken); + // No failover nodes for broadcast. We use failover here in order to complete futures with exceptions if worker node has left the + // cluster. + NextWorkerSelector nextWorkerSelector = CompletableFutures::nullCompletedFuture; + + return submitForBroadcast(node, descriptor, options, nextWorkerSelector, argHolder, cancellationToken); } private CompletableFuture> submitForBroadcast( ClusterNode node, - @Nullable Partition partition, + Partition partition, + int tableId, JobDescriptor descriptor, @Nullable ComputeJobDataHolder argHolder, @Nullable CancellationToken cancellationToken @@ -260,14 +266,18 @@ private CompletableFuture> submitForBroadcast( .partition(partition) .build(); - // TODO https://issues.apache.org/jira/browse/IGNITE-24027 - return submitForBroadcast(node, descriptor, options, argHolder, cancellationToken); + PartitionNextWorkerSelector nextWorkerSelector = new PartitionNextWorkerSelector( + placementDriver, topologyService, clock, + tableId, partition + ); + return submitForBroadcast(node, descriptor, options, nextWorkerSelector, argHolder, cancellationToken); } private CompletableFuture> submitForBroadcast( ClusterNode node, JobDescriptor descriptor, ExecutionOptions options, + NextWorkerSelector nextWorkerSelector, @Nullable ComputeJobDataHolder argHolder, @Nullable CancellationToken cancellationToken ) { @@ -275,12 +285,10 @@ private CompletableFuture> submitForBroadcast( return failedFuture(new NodeNotFoundException(Set.of(node.name()))); } - // No failover nodes for broadcast. We use failover here in order to complete futures with exceptions - // if worker node has left the cluster. return unmarshalResult( executeOnOneNodeWithFailover( node, - CompletableFutures::nullCompletedFuture, + nextWorkerSelector, descriptor.units(), descriptor.jobClassName(), options, @@ -451,17 +459,17 @@ public CompletableFuture> submitPartitionedIn @Nullable ComputeJobDataHolder arg, @Nullable CancellationToken cancellationToken ) { + HashPartition partition = new HashPartition(partitionId); ExecutionOptions options = ExecutionOptions.builder() .priority(jobExecutionOptions.priority()) .maxRetries(jobExecutionOptions.maxRetries()) - .partition(new HashPartition(partitionId)) + .partition(partition) .build(); return primaryReplicaForPartition(table, partitionId) .thenCompose(primaryNode -> executeOnOneNodeWithFailover( primaryNode, - // TODO https://issues.apache.org/jira/browse/IGNITE-24027 - CompletableFutures::nullCompletedFuture, + new PartitionNextWorkerSelector(placementDriver, topologyService, clock, table.tableId(), partition), units, jobClassName, options, arg, cancellationToken )); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/NextColocatedWorkerSelector.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/NextColocatedWorkerSelector.java index 29fb3bc1c1b..4041d57b28b 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/NextColocatedWorkerSelector.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/NextColocatedWorkerSelector.java @@ -17,42 +17,29 @@ package org.apache.ignite.internal.compute; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.network.TopologyService; import org.apache.ignite.internal.placementdriver.PlacementDriver; -import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.table.TableViewInternal; -import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.Tuple; import org.apache.ignite.table.mapper.Mapper; import org.jetbrains.annotations.Nullable; /** - * Next worker selector that returns primary replica node for next worker. If there is no such node (we lost the majority, for example) the - * {@code CompletableFuture.completedFuture(null)} will be returned. + * Next worker selector that returns a node that holds a specified key as a next worker. If there is no such node (we lost the majority, for + * example) the {@code CompletableFuture.completedFuture(null)} will be returned. * * @param type of the key for the colocated table. */ -public class NextColocatedWorkerSelector implements NextWorkerSelector { - private static final int PRIMARY_REPLICA_ASK_CLOCK_ADDITION_MILLIS = 10_000; - - private static final int AWAIT_FOR_PRIMARY_REPLICA_SECONDS = 15; - - private final PlacementDriver placementDriver; - - private final TopologyService topologyService; - - private final HybridClock clock; - +class NextColocatedWorkerSelector extends PrimaryReplicaNextWorkerSelector { @Nullable private final K key; @Nullable private final Mapper keyMapper; + @Nullable private final Tuple tuple; private final TableViewInternal table; @@ -63,7 +50,8 @@ public class NextColocatedWorkerSelector implements NextWorkerSelector { HybridClock clock, TableViewInternal table, K key, - Mapper keyMapper) { + Mapper keyMapper + ) { this(placementDriver, topologyService, clock, table, key, keyMapper, null); } @@ -72,7 +60,8 @@ public class NextColocatedWorkerSelector implements NextWorkerSelector { TopologyService topologyService, HybridClock clock, TableViewInternal table, - Tuple tuple) { + Tuple tuple + ) { this(placementDriver, topologyService, clock, table, null, null, tuple); } @@ -83,33 +72,17 @@ private NextColocatedWorkerSelector( TableViewInternal table, @Nullable K key, @Nullable Mapper keyMapper, - @Nullable Tuple tuple) { - this.placementDriver = placementDriver; - this.topologyService = topologyService; + @Nullable Tuple tuple + ) { + super(placementDriver, topologyService, clock); this.table = table; - this.clock = clock; this.key = key; this.keyMapper = keyMapper; this.tuple = tuple; } - private CompletableFuture tryToFindPrimaryReplica(TablePartitionId tablePartitionId) { - return placementDriver.awaitPrimaryReplica( - tablePartitionId, - clock.now().addPhysicalTime(PRIMARY_REPLICA_ASK_CLOCK_ADDITION_MILLIS), - AWAIT_FOR_PRIMARY_REPLICA_SECONDS, - TimeUnit.SECONDS - ).thenApply(ReplicaMeta::getLeaseholderId) - .thenApply(topologyService::getById); - } - @Override - public CompletableFuture next() { - TablePartitionId tablePartitionId = tablePartitionId(); - return tryToFindPrimaryReplica(tablePartitionId); - } - - private TablePartitionId tablePartitionId() { + protected TablePartitionId tablePartitionId() { if (key != null && keyMapper != null) { return new TablePartitionId(table.tableId(), table.partitionId(key, keyMapper)); } else { diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/PartitionNextWorkerSelector.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/PartitionNextWorkerSelector.java new file mode 100644 index 00000000000..f6db5bb7209 --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/PartitionNextWorkerSelector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.partition.HashPartition; +import org.apache.ignite.table.partition.Partition; + +/** + * Next worker selector that returns node that holds a primary replica for the specified partition as a next worker. If there is no such + * node (we lost the majority, for example) the {@code CompletableFuture.completedFuture(null)} will be returned. + */ +class PartitionNextWorkerSelector extends PrimaryReplicaNextWorkerSelector { + private final TablePartitionId tablePartitionId; + + PartitionNextWorkerSelector( + PlacementDriver placementDriver, + TopologyService topologyService, + HybridClock clock, + int tableId, + Partition partition + ) { + super(placementDriver, topologyService, clock); + + this.tablePartitionId = new TablePartitionId(tableId, ((HashPartition) partition).partitionId()); + } + + @Override + protected TablePartitionId tablePartitionId() { + return tablePartitionId; + } +} diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/PrimaryReplicaNextWorkerSelector.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/PrimaryReplicaNextWorkerSelector.java new file mode 100644 index 00000000000..23ddf89b31a --- /dev/null +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/PrimaryReplicaNextWorkerSelector.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.compute; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.ReplicaMeta; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.network.ClusterNode; + +/** + * Next worker selector that returns a node that holds a primary replica for the partition specified in a subclass as a next worker. If + * there is no such node (we lost the majority, for example) the {@code CompletableFuture.completedFuture(null)} will be returned. + */ +abstract class PrimaryReplicaNextWorkerSelector implements NextWorkerSelector { + private static final int PRIMARY_REPLICA_ASK_CLOCK_ADDITION_MILLIS = 10_000; + + private static final int AWAIT_FOR_PRIMARY_REPLICA_SECONDS = 15; + + private final PlacementDriver placementDriver; + + private final TopologyService topologyService; + + private final HybridClock clock; + + PrimaryReplicaNextWorkerSelector( + PlacementDriver placementDriver, + TopologyService topologyService, + HybridClock clock + ) { + this.placementDriver = placementDriver; + this.topologyService = topologyService; + this.clock = clock; + } + + @Override + public CompletableFuture next() { + return placementDriver.awaitPrimaryReplica( + tablePartitionId(), + clock.now().addPhysicalTime(PRIMARY_REPLICA_ASK_CLOCK_ADDITION_MILLIS), + AWAIT_FOR_PRIMARY_REPLICA_SECONDS, + TimeUnit.SECONDS + ) + .thenApply(ReplicaMeta::getLeaseholderId) + .thenApply(topologyService::getById); + } + + protected abstract TablePartitionId tablePartitionId(); +}