Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22539 .NET: Add JobTarget #3993

Merged
merged 24 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Apache.Ignite.Tests;

using System;
using System.Threading.Tasks;
using Ignite.Compute;
using NUnit.Framework;
using Security.Exception;

Expand Down Expand Up @@ -93,11 +94,11 @@ private async Task EnableAuthn(bool enable)
}

using var client = await IgniteClient.StartAsync(GetConfig(_authnEnabled));
var nodes = await client.GetClusterNodesAsync();
var nodes = JobTarget.AnyNode(await client.GetClusterNodesAsync());

try
{
await client.Compute.SubmitAsync<object>(nodes, new(EnableAuthnJob), enable ? 1 : 0);
await client.Compute.SubmitAsync(nodes, new JobDescriptor<object>(EnableAuthnJob), enable ? 1 : 0);
}
catch (IgniteClientConnectionException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ public async Task TestClientSendsComputeJobToTargetNodeWhenDirectConnectionExist
using var client = await IgniteClient.StartAsync(clientCfg);
client.WaitForConnections(3);

IJobExecution<string> exec2 = await client.Compute.SubmitAsync<string>(
new[] { server2.Node }, new(string.Empty));
var job = new JobDescriptor<string>(string.Empty);

IJobExecution<string> exec3 = await client.Compute.SubmitAsync<string>(
new[] { server3.Node }, new(string.Empty));
IJobExecution<string> exec2 = await client.Compute.SubmitAsync(JobTarget.Node(server2.Node), job);
IJobExecution<string> exec3 = await client.Compute.SubmitAsync(JobTarget.Node(server3.Node), job);

Assert.AreEqual("s2", await exec2.GetResultAsync());
Assert.AreEqual("s3", await exec3.GetResultAsync());
Expand All @@ -67,12 +66,10 @@ public async Task TestClientSendsComputeJobToDefaultNodeWhenDirectConnectionToTa
using var server3 = new FakeServer(nodeName: "s3");

using var client = await server1.ConnectClientAsync();
var job = new JobDescriptor<string>(string.Empty);

IJobExecution<string> exec2 = await client.Compute.SubmitAsync<string>(
new[] { server2.Node }, new(string.Empty));

IJobExecution<string> exec3 = await client.Compute.SubmitAsync<string>(
new[] { server3.Node }, new(string.Empty));
IJobExecution<string> exec2 = await client.Compute.SubmitAsync(JobTarget.Node(server2.Node), job);
IJobExecution<string> exec3 = await client.Compute.SubmitAsync(JobTarget.Node(server3.Node), job);

Assert.AreEqual("s1", await exec2.GetResultAsync());
Assert.AreEqual("s1", await exec3.GetResultAsync());
Expand Down Expand Up @@ -100,13 +97,13 @@ public async Task TestClientRetriesComputeJobOnPrimaryAndDefaultNodes()
client.WaitForConnections(2);

var nodeNames = new HashSet<string>();
var job = new JobDescriptor<string>(string.Empty);

for (int i = 0; i < 100; i++)
{
var node = i % 2 == 0 ? server1.Node : server2.Node;

IJobExecution<string> jobExecution = await client.Compute.SubmitAsync<string>(
new[] { node }, new(string.Empty));
IJobExecution<string> jobExecution = await client.Compute.SubmitAsync(JobTarget.Node(node), job);

string res = await jobExecution.GetResultAsync();

Expand Down
206 changes: 125 additions & 81 deletions modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Apache.Ignite.Tests;
using System;
using System.Threading.Tasks;
using Compute;
using Ignite.Compute;
using Ignite.Table;
using Internal.Proto;
using NUnit.Framework;
Expand Down Expand Up @@ -50,10 +51,9 @@ public async Task TestPutRoutesRequestToPrimaryNode([Values(true, false)] bool w
{
var keyTuple = new IgniteTuple { ["KEY"] = key };

var primaryNodeNameExec = await client.Compute.SubmitColocatedAsync<string>(
TableName,
keyTuple,
new(ComputeTests.NodeNameJob));
var primaryNodeNameExec = await client.Compute.SubmitAsync(
JobTarget.Colocated(TableName, keyTuple),
ComputeTests.NodeNameJob);

var primaryNodeName = await primaryNodeNameExec.GetResultAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,13 @@ public async Task TestExecuteColocatedTupleKeyRoutesRequestToPrimaryNode(int key
var key = new IgniteTuple { ["ID"] = keyId };

// Warm up.
await client.Compute.SubmitColocatedAsync<object?>(FakeServer.ExistingTableName, key, new("job"));
var jobTarget = JobTarget.Colocated(FakeServer.ExistingTableName, key);
var jobDescriptor = new JobDescriptor<object?>("job");

await client.Compute.SubmitAsync(jobTarget, jobDescriptor);

await AssertOpOnNode(
_ => client.Compute.SubmitColocatedAsync<object?>(FakeServer.ExistingTableName, key, new("job")),
_ => client.Compute.SubmitAsync(jobTarget, jobDescriptor),
ClientOp.ComputeExecuteColocated,
expectedNode);
}
Expand All @@ -376,13 +379,14 @@ public async Task TestExecuteColocatedObjectKeyRoutesRequestToPrimaryNode(int ke
var expectedNode = node == 1 ? _server1 : _server2;
var key = new SimpleKey(keyId);

var jobTarget = JobTarget.Colocated(FakeServer.ExistingTableName, key);
var jobDescriptor = new JobDescriptor<object?>("job");

// Warm up.
await client.Compute.SubmitColocatedAsync<object?, SimpleKey>(
FakeServer.ExistingTableName, key, new("job"));
await client.Compute.SubmitAsync(jobTarget, jobDescriptor);

await AssertOpOnNode(
_ => client.Compute.SubmitColocatedAsync<object?, SimpleKey>(
FakeServer.ExistingTableName, key, new("job")),
_ => client.Compute.SubmitAsync(jobTarget, jobDescriptor),
ClientOp.ComputeExecuteColocated,
expectedNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ public async Task TestCustomColocationColumnOrder([Values(true, false)] bool rev
var schemas = table.GetFieldValue<IDictionary<int, Task<Schema>>>("_schemas");
var schema = schemas[1].GetAwaiter().GetResult();
var clusterNodes = await Client.GetClusterNodesAsync();
var jobTarget = JobTarget.AnyNode(clusterNodes);
var job = new JobDescriptor<int>(TableRowColocationHashJob);

for (int i = 0; i < 100; i++)
{
Expand All @@ -194,12 +196,7 @@ public async Task TestCustomColocationColumnOrder([Values(true, false)] bool rev
using var writer = ProtoCommon.GetMessageWriter();
var (clientColocationHash, _) = ser.Write(writer, null, schema, key);

var serverColocationHashExec = await Client.Compute.SubmitAsync<int>(
clusterNodes,
new(TableRowColocationHashJob),
tableName,
i);

var serverColocationHashExec = await Client.Compute.SubmitAsync(jobTarget, job, tableName, i);
var serverColocationHash = await serverColocationHashExec.GetResultAsync();

Assert.AreEqual(serverColocationHash, clientColocationHash, key.ToString());
Expand Down Expand Up @@ -334,11 +331,11 @@ private async Task AssertClientAndServerHashesAreEqual(int timePrecision = 9, in

private async Task<int> GetServerHash(byte[] bytes, int count, int timePrecision, int timestampPrecision)
{
var nodes = await Client.GetClusterNodesAsync();
var target = JobTarget.AnyNode(await Client.GetClusterNodesAsync());

IJobExecution<int> jobExecution = await Client.Compute.SubmitAsync<int>(
nodes,
new(ColocationHashJob),
IJobExecution<int> jobExecution = await Client.Compute.SubmitAsync(
target,
new JobDescriptor<int>(ColocationHashJob),
count,
bytes,
timePrecision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Apache.Ignite.Tests.Table;
using System.Linq;
using System.Threading.Tasks;
using Compute;
using Ignite.Compute;
using Ignite.Table;
using NUnit.Framework;

Expand Down Expand Up @@ -95,8 +96,9 @@ public async Task TestClientUsesLatestSchemaOnWriteDropColumn([ValueSource(nameo
break;

case TestMode.Compute:
await Client.Compute.SubmitColocatedAsync<string>(
table.Name, rec2, new(ComputeTests.NodeNameJob));
await Client.Compute.SubmitAsync(
JobTarget.Colocated(table.Name, rec2),
ComputeTests.NodeNameJob);
break;

default:
Expand Down Expand Up @@ -152,8 +154,9 @@ public async Task TestClientUsesLatestSchemaOnWriteAddColumn([ValueSource(nameof

case TestMode.Compute:
// ExecuteColocated requires key part only.
await Client.Compute.SubmitColocatedAsync<string>(
table.Name, rec, new(ComputeTests.NodeNameJob));
await Client.Compute.SubmitAsync(
JobTarget.Colocated(table.Name, rec),
ComputeTests.NodeNameJob);
break;

default:
Expand Down Expand Up @@ -292,8 +295,9 @@ public async Task TestClientUsesLatestSchemaOnWritePoco([ValueSource(nameof(Test
break;

case TestMode.Compute:
var jobExecution = await Client.Compute.SubmitColocatedAsync<string, Poco>(
table.Name, new Poco(1, "foo"), new(ComputeTests.NodeNameJob));
var jobExecution = await Client.Compute.SubmitAsync(
JobTarget.Colocated(table.Name, new Poco(1, "foo")),
ComputeTests.NodeNameJob);

await jobExecution.GetResultAsync();

Expand Down
3 changes: 3 additions & 0 deletions modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static async Task WaitForConditionAsync(
public static void SetFieldValue(this object obj, string fieldName, object? value) =>
GetNonPublicField(obj, fieldName).SetValue(obj, value);

public static bool IsRecordClass(this Type type) =>
type.GetMethods().Any(m => m.Name == "<Clone>$" && m.ReturnType == type);

public static ILoggerFactory GetConsoleLoggerFactory(LogLevel minLevel) => new ConsoleLogger(minLevel);

public static void CheckByteArrayPoolLeak(int timeoutMs = 1000)
Expand Down
5 changes: 2 additions & 3 deletions modules/platforms/dotnet/Apache.Ignite.Tests/ToStringTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public void TestAllPublicFacingTypesHaveConsistentToString()
continue;
}

if (code.Contains("record " + GetCleanTypeName(type)) ||
code.Contains("record struct " + GetCleanTypeName(type)))
if (code.Contains("record struct " + GetCleanTypeName(type)))
{
// records provide property-based ToString() in the same format we use.
continue;
Expand Down Expand Up @@ -120,7 +119,7 @@ private static IEnumerable<Type> GetPublicFacingTypes()
continue;
}

if (type.IsInterface || type.IsAbstract || type.IsEnum)
if (type.IsInterface || type.IsAbstract || type.IsEnum || type.IsRecordClass())
{
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public enum ClientOperationType
TupleContainsKey,

/// <summary>
/// Compute (<see cref="ICompute.SubmitAsync{T}"/>, <see cref="ICompute.SubmitBroadcast{T}"/>).
/// Compute (<see cref="ICompute.SubmitAsync{TTarget,TResult}"/>, <see cref="ICompute.SubmitBroadcast{T}"/>).
/// </summary>
ComputeExecute,

Expand Down
51 changes: 10 additions & 41 deletions modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ namespace Apache.Ignite.Compute;
using System.Collections.Generic;
using System.Threading.Tasks;
using Network;
using Table;

/// <summary>
/// Ignite Compute API provides distributed job execution functionality.
Expand All @@ -30,58 +29,28 @@ public interface ICompute
/// <summary>
/// Submits a compute job represented by the given class for an execution on one of the specified nodes.
/// </summary>
/// <param name="nodes">Nodes to use for the job execution.</param>
/// <param name="jobDescriptor">Job descriptor.</param>
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<IJobExecution<T>> SubmitAsync<T>(
IEnumerable<IClusterNode> nodes,
JobDescriptor jobDescriptor,
params object?[]? args);

/// <summary>
/// Submits a compute job represented by the given class for an execution on one of the nodes where the given key is located.
/// </summary>
/// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
/// <param name="key">Table key to be used to determine the target node for job execution.</param>
/// <param name="jobDescriptor">Job descriptor.</param>
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<IJobExecution<T>> SubmitColocatedAsync<T>(
string tableName,
IIgniteTuple key,
JobDescriptor jobDescriptor,
params object?[]? args);

/// <summary>
/// Submits a compute job represented by the given class for an execution on one of the nodes where the given key is located.
/// </summary>
/// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
/// <param name="key">Table key to be used to determine the target node for job execution.</param>
/// <param name="target">Job execution target.</param>
/// <param name="jobDescriptor">Job descriptor.</param>
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <typeparam name="TKey">Key type.</typeparam>
/// <typeparam name="TTarget">Job target type.</typeparam>
/// <typeparam name="TResult">Job result type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<IJobExecution<T>> SubmitColocatedAsync<T, TKey>(
string tableName,
TKey key,
JobDescriptor jobDescriptor,
Task<IJobExecution<TResult>> SubmitAsync<TTarget, TResult>(
IJobTarget<TTarget> target,
JobDescriptor<TResult> jobDescriptor,
params object?[]? args)
where TKey : notnull;
where TTarget : notnull;

/// <summary>
/// Submits a compute job represented by the given class for an execution on all of the specified nodes.
/// </summary>
/// <param name="nodes">Nodes to use for the job execution.</param>
/// <param name="jobDescriptor">Job descriptor.</param>
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <typeparam name="TResult">Job result type.</typeparam>
/// <returns>A map of <see cref="Task"/> representing the asynchronous operation for every node.</returns>
IDictionary<IClusterNode, Task<IJobExecution<T>>> SubmitBroadcast<T>(
IDictionary<IClusterNode, Task<IJobExecution<TResult>>> SubmitBroadcast<TResult>(
IEnumerable<IClusterNode> nodes,
JobDescriptor jobDescriptor,
JobDescriptor<TResult> jobDescriptor,
params object?[]? args);
}
31 changes: 31 additions & 0 deletions modules/platforms/dotnet/Apache.Ignite/Compute/IJobTarget.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Apache.Ignite.Compute;

/// <summary>
/// Compute job target.
/// </summary>
/// <typeparam name="T">Underlying data type.</typeparam>
public interface IJobTarget<out T>
where T : notnull
{
/// <summary>
/// Gets the target data.
/// </summary>
T Data { get; }
}
12 changes: 10 additions & 2 deletions modules/platforms/dotnet/Apache.Ignite/Compute/JobDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Apache.Ignite.Compute;

using System;
using System.Collections.Generic;

/// <summary>
Expand All @@ -25,7 +26,14 @@ namespace Apache.Ignite.Compute;
/// <param name="JobClassName">Java class name of the job to execute.</param>
/// <param name="DeploymentUnits">Deployment units.</param>
/// <param name="Options">Options.</param>
public sealed record JobDescriptor(
/// <typeparam name="TResult">Result type.</typeparam>
public sealed record JobDescriptor<TResult>(
string JobClassName,
IEnumerable<DeploymentUnit>? DeploymentUnits = null,
JobExecutionOptions? Options = null);
JobExecutionOptions? Options = null)
{
/// <summary>
/// Gets the result type of the job.
/// </summary>
public Type ResultType => typeof(TResult);
}
Loading