Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22435 Add JobTarget interface #3950

Merged
merged 55 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
bf5abb8
IGNITE-22435 Add JobTarget interface
ptupitsyn Jun 18, 2024
2a53da1
IGNITE-22435 Add JobTarget interface
ptupitsyn Jun 18, 2024
f592173
wip execution targets
ptupitsyn Jun 18, 2024
8f654c0
wip execution targets
ptupitsyn Jun 18, 2024
e7b6768
wip execution targets
ptupitsyn Jun 18, 2024
3613755
Fix IgniteComputeImpl
ptupitsyn Jun 18, 2024
370583d
Fix AntiHijackIgniteCompute
ptupitsyn Jun 18, 2024
05cd0b1
Fix FakeCompute
ptupitsyn Jun 18, 2024
5588769
Fix ClientCompute
ptupitsyn Jun 18, 2024
fbc8530
wip ClientCompute
ptupitsyn Jun 18, 2024
1952fc7
ClientCompute done
ptupitsyn Jun 18, 2024
b078ca5
wip IgniteComputeImpl
ptupitsyn Jun 18, 2024
5dc1789
wip IgniteComputeImpl
ptupitsyn Jun 18, 2024
a51ff68
IgniteComputeImpl cleanup done
ptupitsyn Jun 18, 2024
88c3aac
removing old methods
ptupitsyn Jun 18, 2024
99b9229
removing old methods
ptupitsyn Jun 18, 2024
5f9e4fd
fixing tests
ptupitsyn Jun 18, 2024
f1ff573
fixing tests
ptupitsyn Jun 18, 2024
a7c0a59
Remove all old methods - compilation broken
ptupitsyn Jun 18, 2024
4105a71
Remove all old methods - compilation broken - fixing tests
ptupitsyn Jun 18, 2024
9f85251
wip fixing tests
ptupitsyn Jun 19, 2024
2f7f520
wip fixing tests
ptupitsyn Jun 19, 2024
7b8392c
ItThinClientComputeTest fixed
ptupitsyn Jun 19, 2024
37829a2
fixing client tests
ptupitsyn Jun 19, 2024
8eab1d7
fix FakeCompute
ptupitsyn Jun 19, 2024
84f1766
Fix client tests
ptupitsyn Jun 19, 2024
ce2bfdd
Fix embedded tests
ptupitsyn Jun 19, 2024
18a01f0
wip
ptupitsyn Jun 19, 2024
53527ff
fixing checkstyle
ptupitsyn Jun 19, 2024
9780a40
Fix IgniteComputeImplTest
ptupitsyn Jun 19, 2024
bc7889f
Merge branch 'refs/heads/main' into ignite-22435
ptupitsyn Jun 19, 2024
b46471d
Fix merge
ptupitsyn Jun 19, 2024
ae790b4
Fix javadoc
ptupitsyn Jun 19, 2024
1c17876
Fix javadoc
ptupitsyn Jun 19, 2024
2f1b3fc
Merge branch 'refs/heads/main' into ignite-22435
ptupitsyn Jun 20, 2024
03c150a
Update modules/client/src/main/java/org/apache/ignite/internal/client…
ptupitsyn Jun 20, 2024
51e646f
Merge remote-tracking branch 'fork/ignite-22435' into ignite-22435
ptupitsyn Jun 20, 2024
315b57d
ClientCompute cleanup
ptupitsyn Jun 20, 2024
8967721
Update modules/client/src/test/java/org/apache/ignite/client/fakes/Fa…
ptupitsyn Jun 20, 2024
3396589
Update modules/compute/src/integrationTest/java/org/apache/ignite/int…
ptupitsyn Jun 20, 2024
d8e2da3
Update modules/compute/src/main/java/org/apache/ignite/internal/compu…
ptupitsyn Jun 20, 2024
a678027
Rename job targets
ptupitsyn Jun 20, 2024
e907ed2
wip naming
ptupitsyn Jun 20, 2024
ca89251
wip naming
ptupitsyn Jun 20, 2024
f69abad
Update modules/api/src/main/java/org/apache/ignite/compute/JobTarget.…
ptupitsyn Jun 20, 2024
cb3d974
Update modules/api/src/main/java/org/apache/ignite/compute/JobTarget.…
ptupitsyn Jun 20, 2024
15b3f92
Update modules/api/src/main/java/org/apache/ignite/compute/JobTarget.…
ptupitsyn Jun 20, 2024
305ee22
Update modules/api/src/main/java/org/apache/ignite/compute/JobTarget.…
ptupitsyn Jun 20, 2024
cd5e750
Update modules/api/src/main/java/org/apache/ignite/compute/JobTarget.…
ptupitsyn Jun 20, 2024
f31c643
Update modules/api/src/main/java/org/apache/ignite/compute/JobTarget.…
ptupitsyn Jun 20, 2024
18a4a7b
Update modules/api/src/main/java/org/apache/ignite/compute/JobTarget.…
ptupitsyn Jun 20, 2024
0dab8c8
Update modules/client/src/main/java/org/apache/ignite/internal/client…
ptupitsyn Jun 20, 2024
5cbf114
Fix checkstyle
ptupitsyn Jun 20, 2024
45afbbf
Fix checkstyle
ptupitsyn Jun 20, 2024
553977f
fix javadoc
ptupitsyn Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.compute;

import java.util.Objects;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.jetbrains.annotations.Nullable;

/**
* Colocated job execution target.
*/
public class ColocatedExecutionTarget implements JobTarget {
private final String tableName;

private final Object key;

private final @Nullable Mapper<?> keyMapper;

ColocatedExecutionTarget(String tableName, Object key, @Nullable Mapper<?> keyMapper) {
Objects.requireNonNull(tableName);
Objects.requireNonNull(key);

if (keyMapper == null && !(key instanceof Tuple)) {
throw new IllegalArgumentException("Key must be an instance of Tuple when keyMapper is not provided.");
}

this.tableName = tableName;
this.key = key;
this.keyMapper = keyMapper;
}

public String tableName() {
return tableName;
}

public Object key() {
return key;
}

public @Nullable Mapper<?> keyMapper() {
return keyMapper;
}
}
136 changes: 9 additions & 127 deletions modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;

/**
* Provides the ability to execute Compute jobs.
Expand All @@ -44,13 +42,13 @@ public interface IgniteCompute {
* Submits a {@link ComputeJob} of the given class for an execution on a single node from a set of candidate nodes.
*
* @param <R> Job result type.
* @param nodes Candidate nodes; the job will be executed on one of them.
* @param target Execution target.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @return Job execution object.
*/
<R> JobExecution<R> submit(
Set<ClusterNode> nodes,
JobTarget target,
JobDescriptor descriptor,
Object... args
);
Expand All @@ -60,151 +58,35 @@ <R> JobExecution<R> submit(
* {@code submit(...).resultAsync()}.
*
* @param <R> Job result type.
* @param nodes Candidate nodes; the job will be executed on one of them.
* @param target Execution target.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @return Job result future.
*/
default <R> CompletableFuture<R> executeAsync(
Set<ClusterNode> nodes,
JobTarget target,
JobDescriptor descriptor,
Object... args
) {
return this.<R>submit(nodes, descriptor, args).resultAsync();
return this.<R>submit(target, descriptor, args).resultAsync();
}

/**
* Executes a {@link ComputeJob} of the given class on a single node from a set of candidate nodes.
*
* @param <R> Job result type
* @param nodes Candidate nodes; the job will be executed on one of them.
* @param target Execution target.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @return Job result.
* @throws ComputeException If there is any problem executing the job.
*/
<R> R execute(
Set<ClusterNode> nodes,
JobDescriptor descriptor,
Object... args
);

/**
* Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the
* corresponding RAFT group.
*
* @param tableName Name of the table whose key is used to determine the node to execute the job on.
* @param key Key that identifies the node to execute the job on.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param <R> Job result type.
* @return Job execution object.
*/
<R> JobExecution<R> submitColocated(
String tableName,
Tuple key,
JobTarget target,
JobDescriptor descriptor,
Object... args
);

/**
* Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the
* corresponding RAFT group.
*
* @param tableName Name of the table whose key is used to determine the node to execute the job on.
* @param key Key that identifies the node to execute the job on.
* @param keyMapper Mapper used to map the key to a binary representation.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param <R> Job result type.
* @return Job execution object.
*/
<K, R> JobExecution<R> submitColocated(
String tableName,
K key,
Mapper<K> keyMapper,
JobDescriptor descriptor,
Object... args
);

/**
* Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the
* corresponding RAFT group. A shortcut for {@code submitColocated(...).resultAsync()}.
*
* @param tableName Name of the table whose key is used to determine the node to execute the job on.
* @param key Key that identifies the node to execute the job on.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param <R> Job result type.
* @return Job result future.
*/
default <R> CompletableFuture<R> executeColocatedAsync(
String tableName,
Tuple key,
JobDescriptor descriptor,
Object... args
) {
return this.<R>submitColocated(tableName, key, descriptor, args).resultAsync();
}

/**
* Submits a job of the given class for the execution on the node where the given key is located. The node is a leader of the
* corresponding RAFT group. A shortcut for {@code submitColocated(...).resultAsync()}.
*
* @param tableName Name of the table whose key is used to determine the node to execute the job on.
* @param key Key that identifies the node to execute the job on.
* @param keyMapper Mapper used to map the key to a binary representation.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param <R> Job result type.
* @return Job result future.
*/
default <K, R> CompletableFuture<R> executeColocatedAsync(
String tableName,
K key,
Mapper<K> keyMapper,
JobDescriptor descriptor,
Object... args
) {
return this.<K, R>submitColocated(tableName, key, keyMapper, descriptor, args).resultAsync();
}

/**
* Executes a job of the given class on the node where the given key is located. The node is a leader of the corresponding RAFT group.
*
* @param <R> Job result type.
* @param tableName Name of the table whose key is used to determine the node to execute the job on.
* @param key Key that identifies the node to execute the job on.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @return Job result.
* @throws ComputeException If there is any problem executing the job.
*/
<R> R executeColocated(
String tableName,
Tuple key,
JobDescriptor descriptor,
Object... args);

/**
* Executes a job of the given class on the node where the given key is located. The node is a leader of the corresponding RAFT group.
*
* @param <R> Job result type.
* @param tableName Name of the table whose key is used to determine the node to execute the job on.
* @param key Key that identifies the node to execute the job on.
* @param keyMapper Mapper used to map the key to a binary representation.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @return Job result.
* @throws ComputeException If there is any problem executing the job.
*/
<K, R> R executeColocated(
String tableName,
K key,
Mapper<K> keyMapper,
JobDescriptor descriptor,
Object... args);

/**
* Submits a {@link ComputeJob} of the given class for an execution on all nodes in the given node set.
*
Expand Down Expand Up @@ -235,7 +117,7 @@ default <R> CompletableFuture<Map<ClusterNode, R>> executeBroadcastAsync(
Object... args
) {
Map<ClusterNode, CompletableFuture<R>> futures = nodes.stream()
.collect(toMap(identity(), node -> this.executeAsync(Set.of(node), descriptor, args)));
.collect(toMap(identity(), node -> this.executeAsync(JobTarget.node(node), descriptor, args)));

return allOf(futures.values().toArray(CompletableFuture[]::new))
.thenApply(ignored -> {
Expand Down Expand Up @@ -268,7 +150,7 @@ default <R> Map<ClusterNode, R> executeBroadcast(
Map<ClusterNode, R> map = new HashMap<>();

for (ClusterNode node : nodes) {
map.put(node, execute(Set.of(node), descriptor, args));
map.put(node, execute(JobTarget.node(node), descriptor, args));
}

return map;
Expand Down
92 changes: 92 additions & 0 deletions modules/api/src/main/java/org/apache/ignite/compute/JobTarget.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.compute;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;

/**
* Job execution target.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
*/
public interface JobTarget {
/**
* Creates a job target for a specific node.
*
* @param node Node.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* @return Job target.
*/
static JobTarget node(ClusterNode node) {
return new NodesJobTarget(Set.of(node));
}

/**
* Creates a job target for any node from the provided collection.
*
* @param nodes Collection of nodes.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* @return Job target.
*/
static JobTarget anyNode(ClusterNode... nodes) {
return new NodesJobTarget(Set.of(nodes));
}

/**
* Creates a job target for any node from the provided collection.
*
* @param nodes Collection of nodes.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* @return Job target.
*/
static JobTarget anyNode(Collection<ClusterNode> nodes) {
return new NodesJobTarget(new HashSet<>(nodes));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention that the collection of nodes is treated as a Set (duplicates are ignored)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this matters with "any node" semantics because we don't explain how a node is picked from the provided collection. There is no guarantee of fairness or randomness on the API level.


/**
* Creates a job target for any node from the provided collection.
*
* @param nodes Collection of nodes.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* @return Job target.
*/
static JobTarget anyNode(Set<ClusterNode> nodes) {
return new NodesJobTarget(nodes);
}

/**
* Creates a colocated job target for a specific table and key.
*
* @param tableName Table name.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* @param key Key.
* @return Job target.
*/
static JobTarget colocated(String tableName, Tuple key) {
return new ColocatedExecutionTarget(tableName, key, null);
}

/**
* Creates a colocated job target for a specific table and key with mapper.
*
* @param tableName Table name.
ptupitsyn marked this conversation as resolved.
Show resolved Hide resolved
* @param key Key.
* @return Job target.
*/
static <K> JobTarget colocated(String tableName, K key, Mapper<K> keyMapper) {
return new ColocatedExecutionTarget(tableName, key, keyMapper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.compute;

import java.util.Objects;
import java.util.Set;
import org.apache.ignite.network.ClusterNode;

/**
* Nodes-based job execution target.
*/
public class NodesJobTarget implements JobTarget {
korlov42 marked this conversation as resolved.
Show resolved Hide resolved
private final Set<ClusterNode> nodes;

NodesJobTarget(Set<ClusterNode> nodes) {
Objects.requireNonNull(nodes);

if (nodes.isEmpty()) {
throw new IllegalArgumentException("Nodes collection must not be empty.");
}

this.nodes = nodes;
}

public Set<ClusterNode> nodes() {
return nodes;
}
}
Loading