diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs new file mode 100644 index 00000000000..fbe37172fa0 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteClientGroupTests.cs @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Tests; + +using System; +using System.Threading.Tasks; +using NUnit.Framework; + +/// +/// Tests for . +/// +public class IgniteClientGroupTests +{ + private FakeServer _server; + + [SetUp] + public void StartServer() => _server = new FakeServer(); + + [TearDown] + public void StopServer() => _server.Dispose(); + + [Test] + public async Task TestGetClient() + { + using IgniteClientGroup group = CreateGroup(); + IIgnite client = await group.GetIgniteAsync(); + IIgnite client2 = await group.GetIgniteAsync(); + + Assert.IsNotNull(client); + Assert.AreSame(client, client2); + + await client.Tables.GetTablesAsync(); + } + + [Test] + public async Task TestRoundRobin() + { + using IgniteClientGroup group = CreateGroup(size: 3); + + var client1 = await group.GetIgniteAsync(); + var client2 = await group.GetIgniteAsync(); + var client3 = await group.GetIgniteAsync(); + + Assert.AreNotSame(client1, client2); + Assert.AreNotSame(client2, client3); + + Assert.AreSame(client1, await group.GetIgniteAsync()); + Assert.AreSame(client2, await group.GetIgniteAsync()); + Assert.AreSame(client3, await group.GetIgniteAsync()); + + Assert.AreSame(client1, await group.GetIgniteAsync()); + Assert.AreSame(client2, await group.GetIgniteAsync()); + Assert.AreSame(client3, await group.GetIgniteAsync()); + } + + [Test] + public async Task TestGroupReconnectsDisposedClient() + { + using IgniteClientGroup group = CreateGroup(); + IIgnite client = await group.GetIgniteAsync(); + + await client.Tables.GetTablesAsync(); + ((IDisposable)client).Dispose(); + + IIgnite client2 = await group.GetIgniteAsync(); + await client2.Tables.GetTablesAsync(); + + Assert.AreNotSame(client, client2); + } + + [Test] + public void TestConstructorValidatesArgs() + { + // ReSharper disable once ObjectCreationAsStatement + Assert.Throws(() => new IgniteClientGroup(null!)); + } + + [Test] + public async Task TestUseAfterDispose() + { + IgniteClientGroup group = CreateGroup(size: 2); + + var client1 = await group.GetIgniteAsync(); + var client2 = await group.GetIgniteAsync(); + + Assert.AreNotSame(client1, client2); + + group.Dispose(); + + // Group and clients are disposed, all operations should throw. + Assert.IsTrue(group.IsDisposed); + + Assert.ThrowsAsync(async () => await group.GetIgniteAsync()); + Assert.ThrowsAsync(async () => await client1.Tables.GetTablesAsync()); + Assert.ThrowsAsync(async () => await client2.Tables.GetTablesAsync()); + } + + [Test] + public async Task TestToString() + { + var group = CreateGroup(5); + + await group.GetIgniteAsync(); + await group.GetIgniteAsync(); + + Assert.AreEqual("IgniteClientGroup { Connected = 2, Size = 5 }", group.ToString()); + } + + [Test] + public void TestConfigurationCantBeChanged() + { + IgniteClientGroup group = CreateGroup(3); + + var configuration = group.Configuration; + configuration.Size = 100; + + Assert.AreEqual(3, group.Configuration.Size); + Assert.AreNotSame(configuration, group.Configuration); + } + + private IgniteClientGroup CreateGroup(int size = 1) => + new IgniteClientGroup( + new IgniteClientGroupConfiguration + { + Size = size, + ClientConfiguration = new IgniteClientConfiguration(_server.Endpoint) + }); +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs index fc82f0043e2..c57101e404b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Tests; using System; +using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; using System.Net; using System.Net.Sockets; @@ -35,7 +36,7 @@ public abstract class IgniteServerBase : IDisposable private readonly object _disposeSyncRoot = new(); - private volatile Socket? _handler; + private readonly ConcurrentDictionary _handlers = new(); private bool _disposed; @@ -66,7 +67,13 @@ public bool DropNewConnections protected Socket Listener => _listener; - public void DropExistingConnection() => _handler?.Dispose(); + public void DropExistingConnection() + { + foreach (var handler in _handlers.Keys) + { + handler.Dispose(); + } + } public void Dispose() { @@ -124,7 +131,14 @@ protected virtual void Dispose(bool disposing) } _cts.Cancel(); - _handler?.Dispose(); + + foreach (var handler in _handlers.Keys) + { + handler.Dispose(); + } + + _handlers.Clear(); + _listener.Dispose(); _cts.Dispose(); @@ -159,21 +173,28 @@ private void ListenLoopInternal() { while (!_cts.IsCancellationRequested) { - using Socket handler = _listener.Accept(); + Socket handler = _listener.Accept(); if (DropNewConnections) { handler.Disconnect(true); - _handler = null; + handler.Dispose(); continue; } - _handler = handler; - handler.NoDelay = true; + _handlers[handler] = null; + + Task.Run(() => + { + using (handler) + { + handler.NoDelay = true; - Handle(handler, _cts.Token); - handler.Disconnect(true); - _handler = null; + Handle(handler, _cts.Token); + handler.Disconnect(true); + _handlers.TryRemove(handler, out _); + } + }); } } diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs new file mode 100644 index 00000000000..978352d3d61 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroup.cs @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite; + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Internal; +using Internal.Common; + +/// +/// Ignite client group. Thread safe. +/// +/// Creates and maintains up to Ignite clients and returns them in a round-robin fashion. +/// Ignite clients are thread safe, so there is no rent/return semantics. +/// +/// +/// Register as a singleton in DI container: +/// +/// builder.Services.AddSingleton(_ => new IgniteClientGroup( +/// new IgniteClientGroupConfiguration +/// { +/// Size = 3, +/// ClientConfiguration = new("localhost"), +/// })); +/// +/// Invoke from a controller: +/// +/// public async Task<IActionResult> Index([FromServices] IgniteClientGroup igniteGroup) +/// { +/// IIgnite ignite = await igniteGroup.GetIgniteAsync(); +/// var tables = await ignite.Tables.GetTablesAsync(); +/// return Ok(tables); +/// } +/// +/// +/// +public sealed class IgniteClientGroup : IDisposable +{ + private readonly IgniteClientGroupConfiguration _configuration; + + private readonly IgniteClientInternal?[] _clients; + + private readonly SemaphoreSlim _clientsLock = new(1); + + private int _disposed; + + private int _clientIndex; + + /// + /// Initializes a new instance of the class. + /// + /// Configuration. + public IgniteClientGroup(IgniteClientGroupConfiguration configuration) + { + IgniteArgumentCheck.NotNull(configuration); + IgniteArgumentCheck.NotNull(configuration.ClientConfiguration); + IgniteArgumentCheck.Ensure(configuration.Size > 0, nameof(configuration.Size), "Group size must be positive."); + + _configuration = Copy(configuration); + _clients = new IgniteClientInternal[configuration.Size]; + } + + /// + /// Gets the configuration. + /// + public IgniteClientGroupConfiguration Configuration => Copy(_configuration); + + /// + /// Gets a value indicating whether the group is disposed. + /// + public bool IsDisposed => Interlocked.CompareExchange(ref _disposed, 0, 0) == 1; + + /// + /// Gets an Ignite client from the group. Creates a new one if necessary. + /// Performs round-robin balancing across grouped instances. + /// + /// Ignite client. + [SuppressMessage("Reliability", "CA2000:Dispose objects before losing scope", Justification = "Managed by the group.")] + public async ValueTask GetIgniteAsync() + { + ObjectDisposedException.ThrowIf(IsDisposed, this); + + int index = Interlocked.Increment(ref _clientIndex) % _clients.Length; + + IgniteClientInternal? client = _clients[index]; + if (client is { IsDisposed: false }) + { + return client; + } + + await _clientsLock.WaitAsync().ConfigureAwait(false); + + try + { + client = _clients[index]; + if (client is { IsDisposed: false }) + { + return client; + } + + client = await CreateClientAsync().ConfigureAwait(false); + _clients[index] = client; + + return client; + } + finally + { + _clientsLock.Release(); + } + } + + /// + public void Dispose() + { + if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1) + { + return; + } + + _clientsLock.Wait(); + + foreach (var client in _clients) + { + // Dispose is not supposed to throw, so we expect all clients to dispose correctly. + client?.Dispose(); + } + + _clientsLock.Dispose(); + } + + /// + public override string ToString() => + new IgniteToStringBuilder(typeof(IgniteClientGroup)) + .Append(_clients.Count(static c => c is { IsDisposed: false }), "Connected") + .Append(Configuration.Size, "Size") + .Build(); + + private static IgniteClientGroupConfiguration Copy(IgniteClientGroupConfiguration cfg) => + cfg with { ClientConfiguration = cfg.ClientConfiguration with { } }; + + private async Task CreateClientAsync() + { + var client = await IgniteClient.StartAsync(Configuration.ClientConfiguration).ConfigureAwait(false); + + return (IgniteClientInternal)client; + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroupConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroupConfiguration.cs new file mode 100644 index 00000000000..90b569df8ad --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite/IgniteClientGroupConfiguration.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite; + +/// +/// Ignite client group configuration. See for more details. +/// +public sealed record IgniteClientGroupConfiguration +{ + /// + /// Gets or sets the group size (maximum number of clients). + /// + public int Size { get; set; } + + /// + /// Gets or sets the client configuration. + /// + public required IgniteClientConfiguration ClientConfiguration { get; set; } +} diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs index b95172cf966..5678025dba9 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs @@ -118,6 +118,11 @@ private ClientFailoverSocket(IgniteClientConfiguration configuration, ILogger lo /// public Guid ClientId { get; } = Guid.NewGuid(); + /// + /// Gets a value indicating whether the socket is disposed. + /// + public bool IsDisposed => _disposed; + /// /// Connects the socket. /// diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs index 9fd3a8f6f7c..e04ca9cb1f8 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs @@ -68,6 +68,11 @@ public IgniteClientInternal(ClientFailoverSocket socket) /// public ISql Sql { get; } + /// + /// Gets a value indicating whether the client is disposed. + /// + public bool IsDisposed => Socket.IsDisposed; + /// /// Gets the underlying socket. ///