Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22478 Use single argument in Compute API #3926

Merged
merged 110 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 103 commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
a76a6a4
wip
PakhomovAlexander Jun 14, 2024
98d1e50
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 14, 2024
4aae832
wip
PakhomovAlexander Jun 14, 2024
94c9b2b
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 18, 2024
c17affb
Add marshaller api
PakhomovAlexander Jun 18, 2024
2f1a5c4
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 18, 2024
2985a92
Fix errors after merge main
PakhomovAlexander Jun 18, 2024
c7d3fad
wip
PakhomovAlexander Jun 19, 2024
558e9da
Fix compile errors
PakhomovAlexander Jun 19, 2024
a7b0747
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 19, 2024
bf724cc
Fix testa
PakhomovAlexander Jun 19, 2024
e816570
Fix tests
PakhomovAlexander Jun 19, 2024
7e45f73
wip
PakhomovAlexander Jun 19, 2024
63a4bb6
integrating marshallers into compute api...
PakhomovAlexander Jun 20, 2024
adcc418
the test with pojo is working
PakhomovAlexander Jun 20, 2024
ea9bdef
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 21, 2024
22c3111
WIP
PakhomovAlexander Jun 21, 2024
370a4b5
WIP
PakhomovAlexander Jun 23, 2024
7f311a4
WIP
PakhomovAlexander Jun 23, 2024
4e23b0f
Fix compilation in archtest
PakhomovAlexander Jun 24, 2024
46c8a71
Fix MapReduce tests
PakhomovAlexander Jun 24, 2024
a3145a1
Fix all checkstyle issues
PakhomovAlexander Jun 24, 2024
391b5b8
Fix checkstyle and add javadoc
PakhomovAlexander Jun 24, 2024
8e948e2
Add more java doc
PakhomovAlexander Jun 24, 2024
0abd3ca
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 25, 2024
f59d45d
Merge upstream and fix npe
PakhomovAlexander Jun 25, 2024
a2a2430
Update modules/api/src/main/java/org/apache/ignite/compute/JobDescrip…
PakhomovAlexander Jun 25, 2024
29e1730
Update modules/api/src/main/java/org/apache/ignite/compute/JobDescrip…
PakhomovAlexander Jun 25, 2024
ea60216
Update modules/api/src/main/java/org/apache/ignite/compute/task/MapRe…
PakhomovAlexander Jun 25, 2024
8e3f451
Update modules/client/src/main/java/org/apache/ignite/internal/client…
PakhomovAlexander Jun 25, 2024
7a5ff93
Update modules/compute/src/main/java/org/apache/ignite/internal/compu…
PakhomovAlexander Jun 25, 2024
58156c2
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 25, 2024
b2807c7
Merge upstream and fix review comments
PakhomovAlexander Jun 25, 2024
7a69949
Minor fixes
PakhomovAlexander Jun 25, 2024
f85e843
Fix arch tests
PakhomovAlexander Jun 25, 2024
a054549
Use factory method instead of anonymous object for default ByteArrayM…
PakhomovAlexander Jun 25, 2024
fd6f4bf
WIP
PakhomovAlexander Jun 26, 2024
d7b5ad8
Fix reciever API
PakhomovAlexander Jun 27, 2024
acc8133
Fix tests
PakhomovAlexander Jun 27, 2024
71dae15
Fix receiver deserialzation
PakhomovAlexander Jun 27, 2024
299bbde
Fix checkstyle
PakhomovAlexander Jun 27, 2024
6e0d47f
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 27, 2024
afff2f4
Merge upstream
PakhomovAlexander Jun 27, 2024
2fdd452
WIP on generic types in Compute API
PakhomovAlexander Jun 27, 2024
0f5081a
WIP on generic types in Compute API
PakhomovAlexander Jun 27, 2024
f4d69fa
Fix receiver
PakhomovAlexander Jun 27, 2024
788d940
Fix compute test
PakhomovAlexander Jun 27, 2024
1284699
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 27, 2024
ce2676f
Add java doc to MapReduceTask
PakhomovAlexander Jun 27, 2024
21a072a
Fix javadoc
PakhomovAlexander Jun 27, 2024
0cea899
Fix build and javadoc
PakhomovAlexander Jun 27, 2024
97ae879
Fix error groups
PakhomovAlexander Jun 27, 2024
434ed8d
Fix error groups
PakhomovAlexander Jun 27, 2024
7858757
Fix PlatformTestNodeRunner
ptupitsyn Jun 28, 2024
5ab1c7c
Fixing .NET
ptupitsyn Jun 28, 2024
8fd810b
.NET: Require single arg
ptupitsyn Jun 28, 2024
d97ea55
.NET: Require single arg - fixing ColocationHashTests
ptupitsyn Jun 28, 2024
199c89f
Fix TestCustomColocationColumnOrder
ptupitsyn Jun 28, 2024
9778e58
Fix ColocationHashJob
ptupitsyn Jun 28, 2024
f386ce0
Fix ItThinClientComputeTest
ptupitsyn Jun 28, 2024
74e5a31
Fix ToStringJob
ptupitsyn Jun 28, 2024
c45a147
Fix TestAllSupportedArgTypes
ptupitsyn Jun 28, 2024
79b3c84
Fix testBigDecimalPropagation
ptupitsyn Jun 28, 2024
dbf5bf4
Fix TestBigDecimalPropagation
ptupitsyn Jun 28, 2024
4521d34
Fix TestExecuteOnSpecificNode
ptupitsyn Jun 28, 2024
758b2da
Remove TestExecuteWithArgs - redundant
ptupitsyn Jun 28, 2024
753281c
Fix IgniteExceptionJob
ptupitsyn Jun 28, 2024
4323063
Fix TestReceiver
ptupitsyn Jun 28, 2024
2d8d20f
Fix ReceiverRunnerJob array allocation
ptupitsyn Jun 28, 2024
131bca4
StreamerReceiverSerializer cleanup
ptupitsyn Jun 28, 2024
27ecd0c
Fix WriteReceiverPayload
ptupitsyn Jun 28, 2024
d3e184e
Fixing ClientStreamerWithReceiverBatchSendRequest
ptupitsyn Jun 28, 2024
d33a834
Fixing ItAbstractDataStreamerTest
ptupitsyn Jun 28, 2024
0dd7e11
Fix TestReceiverException
ptupitsyn Jun 28, 2024
76019dc
Fixing tests
ptupitsyn Jun 28, 2024
1fb55c4
Fix UpsertElementTypeNameReceiver
ptupitsyn Jun 28, 2024
78907df
Fix TestWithReceiverAllDataTypes
ptupitsyn Jun 28, 2024
f79c580
Fix FakeServer
ptupitsyn Jun 28, 2024
680dfdf
Update modules/client/src/main/java/org/apache/ignite/internal/client…
PakhomovAlexander Jun 28, 2024
d970195
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
fa267eb
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
2e03a73
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
922668f
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
9dce1f9
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
52e50bb
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
420b91a
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
9075e70
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
4ca89d9
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
d107dda
Update modules/api/src/main/java/org/apache/ignite/compute/IgniteComp…
PakhomovAlexander Jun 28, 2024
0229ef8
IGNITE-22602 .NET: Use single argument in Compute API
ptupitsyn Jun 28, 2024
3cb9a89
Fix compute impl
ptupitsyn Jun 28, 2024
6f04d61
Fix ComputeTests
ptupitsyn Jun 28, 2024
6d3d5dd
Fixing tests
ptupitsyn Jun 28, 2024
da3fdb6
Tests fixed
ptupitsyn Jun 28, 2024
e8f0576
Fixing data streamer
ptupitsyn Jun 28, 2024
61ad1c3
Fixing data streamer
ptupitsyn Jun 28, 2024
b649a51
Fix DataStreamerTests
ptupitsyn Jun 28, 2024
172ce56
Fix RecordView
ptupitsyn Jun 28, 2024
c2398ca
Fix KeyValueView
ptupitsyn Jun 28, 2024
f32672c
Merge remote-tracking branch 'origin/IGNITE-22478' into IGNITE-22478
PakhomovAlexander Jun 28, 2024
3d5ab7d
Add generic to JobExecutionInternal and fix compile error after sugge…
PakhomovAlexander Jun 28, 2024
709f178
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 28, 2024
5bd255d
Rename and merge
PakhomovAlexander Jun 28, 2024
0c344f9
IGNITE-22478 Compute argument -> single
isapego Jun 28, 2024
dc0b12b
IGNITE-22478 Fix tests
isapego Jun 28, 2024
866ecbb
IGNITE-22478 Change result type
isapego Jun 28, 2024
c56d087
IGNITE-22478 Fix tests
isapego Jun 28, 2024
71c6b4a
IGNITE-22478 Fix test
isapego Jun 28, 2024
49844a3
IGNITE-22478 Fix test
isapego Jun 28, 2024
246ee8c
Merge remote-tracking branch 'refs/remotes/upstream/main' into IGNITE…
PakhomovAlexander Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,46 @@
package org.apache.ignite.compute;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.marshaling.Marshaler;
import org.jetbrains.annotations.Nullable;

/**
* A Compute job that may be executed on a single Ignite node, on several nodes, or on the entire cluster.
* Core Ignite Compute Job interface. If you want to define your own job, you should implement this interface and
* deploy the job to the cluster with Deployment API. Then, you can execute this job on the cluster by calling
* {@link IgniteCompute} APIs.
*
* @param <R> Job result type.
* <p>If you want to pass/return custom data structures to/from the job, you should also implement {@link Marshaler}
* and return it from {@link #inputMarshaler()} and {@link #resultMarshaler()} methods.
*
* @param <T> Type of the job argument.
* @param <R> Type of the job result.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface ComputeJob<R> {
public interface ComputeJob<T, R> {
/**
* Executes the job on an Ignite node.
*
* @param context The execution context.
* @param args Job arguments.
* @param arg Job arguments.
* @return Job future. Can be null if the job is synchronous and does not return any result.
*/
@Nullable CompletableFuture<R> executeAsync(JobExecutionContext context, Object... args);
@Nullable CompletableFuture<R> executeAsync(JobExecutionContext context, @Nullable T arg);

/**
* Marshaler for the input argument. Default is {@code null} meaning that only primitive types are supported.
*
* @return Input marshaler.
*/
default @Nullable Marshaler<T, byte[]> inputMarshaler() {
return null;
}

/**
* Marshaler for the job result. Default is {@code null} meaning that only primitive types are supported.
*
* @return Result marshaler.
*/
default @Nullable Marshaler<R, byte[]> resultMarshaler() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,94 +31,100 @@
import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;

/**
* Provides the ability to execute Compute jobs.
*
* @see ComputeJob
* @see ComputeJob#executeAsync(JobExecutionContext, Object...)
* @see ComputeJob#executeAsync(JobExecutionContext, Object)
*/
public interface IgniteCompute {
/**
* Submits a {@link ComputeJob} of the given class for an execution on a single node from a set of candidate nodes.
*
* @param <R> Job result type.
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param target Execution target.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param arg Argument of the job.
* @return Job execution object.
*/
<R> JobExecution<R> submit(
<T, R> JobExecution<R> submit(
JobTarget target,
JobDescriptor descriptor,
Object... args
JobDescriptor<T, R> descriptor,
@Nullable T arg
);

/**
* Submits a {@link ComputeJob} of the given class for an execution on a single node from a set of candidate nodes. A shortcut for
* {@code submit(...).resultAsync()}.
*
* @param <R> Job result type.
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param target Execution target.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param arg Argument of the job.
* @return Job result future.
*/
default <R> CompletableFuture<R> executeAsync(
default <T, R> CompletableFuture<R> executeAsync(
JobTarget target,
JobDescriptor descriptor,
Object... args
JobDescriptor<T, R> descriptor,
@Nullable T arg
) {
return this.<R>submit(target, descriptor, args).resultAsync();
return submit(target, descriptor, arg).resultAsync();
}

/**
* Executes a {@link ComputeJob} of the given class on a single node from a set of candidate nodes.
*
* @param <R> Job result type
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param target Execution target.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param arg Argument of the job.
* @return Job result.
* @throws ComputeException If there is any problem executing the job.
*/
<R> R execute(
<T, R> R execute(
JobTarget target,
JobDescriptor descriptor,
Object... args
JobDescriptor<T, R> descriptor,
@Nullable T arg
);

/**
* Submits a {@link ComputeJob} of the given class for an execution on all nodes in the given node set.
*
* @param <R> Job result type.
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param nodes Nodes to execute the job on.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param arg Argument of the job.
* @return Map from node to job execution object.
*/
<R> Map<ClusterNode, JobExecution<R>> submitBroadcast(
<T, R> Map<ClusterNode, JobExecution<R>> submitBroadcast(
Set<ClusterNode> nodes,
JobDescriptor descriptor,
Object... args
JobDescriptor<T, R> descriptor,
@Nullable T arg
);

/**
* Executes a {@link ComputeJob} of the given class on all nodes in the given node set.
*
* @param <R> Job result type.
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param nodes Nodes to execute the job on.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param arg Argument of the job.
* @return Map from node to job result.
*/
default <R> CompletableFuture<Map<ClusterNode, R>> executeBroadcastAsync(
default <T, R> CompletableFuture<Map<ClusterNode, R>> executeBroadcastAsync(
Set<ClusterNode> nodes,
JobDescriptor descriptor,
Object... args
JobDescriptor<T, R> descriptor,
@Nullable T arg
) {
Map<ClusterNode, CompletableFuture<R>> futures = nodes.stream()
.collect(toMap(identity(), node -> this.executeAsync(JobTarget.node(node), descriptor, args)));
.collect(toMap(identity(), node -> executeAsync(JobTarget.node(node), descriptor, arg)));

return allOf(futures.values().toArray(CompletableFuture[]::new))
.thenApply(ignored -> {
Expand All @@ -136,22 +142,23 @@ default <R> CompletableFuture<Map<ClusterNode, R>> executeBroadcastAsync(
/**
* Executes a {@link ComputeJob} of the given class on all nodes in the given node set.
*
* @param <R> Job result type.
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param nodes Nodes to execute the job on.
* @param descriptor Job descriptor.
* @param args Arguments of the job.
* @param arg Argument of the job.
* @return Map from node to job result.
* @throws ComputeException If there is any problem executing the job.
*/
default <R> Map<ClusterNode, R> executeBroadcast(
default <T, R> Map<ClusterNode, R> executeBroadcast(
Set<ClusterNode> nodes,
JobDescriptor descriptor,
Object... args
JobDescriptor<T, R> descriptor,
@Nullable T arg
) {
Map<ClusterNode, R> map = new HashMap<>();

for (ClusterNode node : nodes) {
map.put(node, execute(JobTarget.node(node), descriptor, args));
map.put(node, execute(JobTarget.node(node), descriptor, arg));
}

return map;
Expand All @@ -160,36 +167,39 @@ default <R> Map<ClusterNode, R> executeBroadcast(
/**
* Submits a {@link MapReduceTask} of the given class for an execution.
*
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param units Deployment units.
* @param taskClassName Map reduce task class name.
* @param args Task arguments.
* @param <R> Task result type.
* @param arg Task argument.
* @return Task execution interface.
*/
<R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args);
<T, R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, String taskClassName, @Nullable T arg);

/**
* Submits a {@link MapReduceTask} of the given class for an execution. A shortcut for {@code submitMapReduce(...).resultAsync()}.
*
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param units Deployment units.
* @param taskClassName Map reduce task class name.
* @param args Task arguments.
* @param <R> Task result type.
* @param arg Task argument.
* @return Task result future.
*/
default <R> CompletableFuture<R> executeMapReduceAsync(List<DeploymentUnit> units, String taskClassName, Object... args) {
return this.<R>submitMapReduce(units, taskClassName, args).resultAsync();
default <T, R> CompletableFuture<R> executeMapReduceAsync(List<DeploymentUnit> units, String taskClassName, @Nullable T arg) {
return this.<T, R>submitMapReduce(units, taskClassName, arg).resultAsync();
}

/**
* Executes a {@link MapReduceTask} of the given class.
*
* @param <T> Job argument (T)ype.
* @param <R> Job (R)esult type.
* @param units Deployment units.
* @param taskClassName Map reduce task class name.
* @param args Task arguments.
* @param <R> Task result type.
* @param arg Task argument.
* @return Task result.
* @throws ComputeException If there is any problem executing the task.
*/
<R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, Object... args);
<T, R> R executeMapReduce(List<DeploymentUnit> units, String taskClassName, @Nullable T arg);
}
Loading