Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Multicaster.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.10.34916.146
Expand All @@ -9,7 +8,7 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Multicaster.Tests", "test\M
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{7ED9EEF8-6BF3-4F69-BD7E-D1B25905BD30}"
ProjectSection(SolutionItems) = preProject
Directory.Build.props = Directory.Build.props
src\Directory.Build.props = src\Directory.Build.props
Directory.Packages.props = Directory.Packages.props
README.md = README.md
EndProjectSection
Expand Down
9 changes: 5 additions & 4 deletions Directory.Build.props → src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<LangVersion>12</LangVersion>

<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>$(MSBuildThisFileDirectory)\src\Multicaster\opensource.snk</AssemblyOriginatorKeyFile>
<AssemblyOriginatorKeyFile>$(MSBuildThisFileDirectory)\Multicaster\opensource.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<!-- Prevent local path leakage by artifacts -->
Expand All @@ -30,17 +30,18 @@
<PackageReadmeFile>README.md</PackageReadmeFile>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<ContinuousIntegrationBuild Condition="'$(CI)' == 'true'">true</ContinuousIntegrationBuild>
</PropertyGroup>

<ItemGroup>
<None Include="$(MSBuildThisFileDirectory)\README.md" Pack="true" PackagePath="/" />
<None Include="$(MSBuildThisFileDirectory)\src\Multicaster\Icon.png" Pack="true" PackagePath="/" />
<None Include="$(MSBuildThisFileDirectory)\..\README.md" Pack="true" PackagePath="/" />
<None Include="$(MSBuildThisFileDirectory)\Multicaster\Icon.png" Pack="true" PackagePath="/" />
</ItemGroup>

<!--
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" PrivateAssets="all"/>
</ItemGroup>
-->
</Project>
</Project>
16 changes: 15 additions & 1 deletion src/Multicaster.Distributed.Nats/NatsGroupProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

namespace Cysharp.Runtime.Multicast.Distributed.Nats;

/// <summary>
/// Provides functionality for managing multicast groups over a NATS (NATS.io) connection.
/// </summary>
public class NatsGroupProvider : IMulticastGroupProvider
{
private readonly ConcurrentDictionary<(string Name, Type KeyType, Type ReceiverType), object> _groups = new();
Expand All @@ -18,10 +21,16 @@ public class NatsGroupProvider : IMulticastGroupProvider
private readonly IRemoteSerializer _serializer;
private readonly MessagePackSerializerOptions _messagePackSerializerOptionsForKey;

/// <summary>
/// Initializes a new instance of the <see cref="NatsGroupProvider"/> class with the specified proxy factory, serializer, and configuration options.
/// </summary>
public NatsGroupProvider(IRemoteProxyFactory proxyFactory, IRemoteSerializer serializer, IOptions<NatsGroupOptions> options)
: this(proxyFactory, serializer, options.Value)
{}

/// <summary>
/// Initializes a new instance of the <see cref="NatsGroupProvider"/> class, which provides functionality for managing groups using NATS messaging.
/// </summary>
public NatsGroupProvider(IRemoteProxyFactory proxyFactory, IRemoteSerializer serializer, NatsGroupOptions options)
{
_connection = new NatsConnection(NatsOpts.Default with { Url = options.Url });
Expand All @@ -34,10 +43,12 @@ public NatsGroupProvider(IRemoteProxyFactory proxyFactory, IRemoteSerializer ser
);
}

/// <inheritdoc />
public IMulticastAsyncGroup<TKey, TReceiver> GetOrAddGroup<TKey, TReceiver>(string name)
where TKey : IEquatable<TKey>
=> (IMulticastAsyncGroup<TKey, TReceiver>)_groups.GetOrAdd((name, typeof(TKey), typeof(TReceiver)), _ => new NatsGroup<TKey, TReceiver>(name, _connection, _proxyFactory, _serializer, _messagePackSerializerOptionsForKey, Remove));

/// <inheritdoc />
public IMulticastSyncGroup<TKey, TReceiver> GetOrAddSynchronousGroup<TKey, TReceiver>(string name)
where TKey : IEquatable<TKey>
=> (IMulticastSyncGroup<TKey, TReceiver>)_groups.GetOrAdd((name, typeof(TKey), typeof(TReceiver)), _ => new NatsGroup<TKey, TReceiver>(name, _connection, _proxyFactory, _serializer, _messagePackSerializerOptionsForKey, Remove));
Expand All @@ -49,6 +60,9 @@ private void Remove<TKey, TReceiver>(NatsGroup<TKey, TReceiver> group)
}
}

/// <summary>
/// Represents configuration options for a NATS group, including server connection details and serialization settings.
/// </summary>
public class NatsGroupOptions
{
/// <summary>
Expand All @@ -60,4 +74,4 @@ public class NatsGroupOptions
/// Gets or sets a MessagePackSerializerOptions used for serializing the key.
/// </summary>
public MessagePackSerializerOptions MessagePackSerializerOptionsForKey { get; set; } = MessagePackSerializer.DefaultOptions;
}
}
3 changes: 3 additions & 0 deletions src/Multicaster/Distributed/IDistributedGroup.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
namespace Cysharp.Runtime.Multicast.Distributed;

/// <summary>
/// Represents a marker interface for a group that utilizes a distributed backplane.
/// </summary>
public interface IDistributedGroup;
57 changes: 56 additions & 1 deletion src/Multicaster/IMulticastGroup.cs
Original file line number Diff line number Diff line change
@@ -1,33 +1,88 @@
namespace Cysharp.Runtime.Multicast;

/// <summary>
/// Represents a multicast group that allows sending messages to multiple receivers based on specified inclusion or
/// exclusion criteria.
/// </summary>
/// <typeparam name="TKey">The type of the key used to identify receivers. Must implement <see cref="IEquatable{T}"/>.</typeparam>
/// <typeparam name="TReceiver">The type of the receiver that messages are sent to.</typeparam>
public interface IMulticastGroup<TKey, TReceiver>
where TKey : IEquatable<TKey>
{
/// <summary>
/// Gets a receiver that processes all messages without filtering.
/// </summary>
TReceiver All { get; }

/// <summary>
/// Returns a new instance of the receiver with elements excluded based on the specified keys.
/// </summary>
TReceiver Except(IEnumerable<TKey> excludes);

/// <summary>
/// Filters the current set of targets to include only the specified ones.
/// </summary>
TReceiver Only(IEnumerable<TKey> targets);

/// <summary>
/// Retrieves a single instance of <typeparamref name="TReceiver"/> associated with the specified <typeparamref name="TKey"/>.
/// </summary>
TReceiver Single(TKey target);
}

/// <summary>
/// Represents a multicast group that supports asynchronous operations for managing receivers.
/// </summary>
public interface IMulticastAsyncGroup<TKey, TReceiver> : IMulticastGroup<TKey, TReceiver>, IDisposable
where TKey : IEquatable<TKey>
{
/// <summary>
/// Adds a key and its associated receiver to the group asynchronously.
/// </summary>
ValueTask AddAsync(TKey key, TReceiver receiver, CancellationToken cancellationToken = default);

/// <summary>
/// Removes the item associated with the specified key from the group asynchronously.
/// </summary>
ValueTask RemoveAsync(TKey key, CancellationToken cancellationToken = default);

/// <summary>
/// Counts the total number of items in the group asynchronously.
/// </summary>
ValueTask<int> CountAsync(CancellationToken cancellationToken = default);
}

/// <summary>
/// Represents a multicast group that supports synchronous operations for managing receivers.
/// </summary>
public interface IMulticastSyncGroup<TKey, TReceiver> : IMulticastGroup<TKey, TReceiver>, IDisposable
where TKey : IEquatable<TKey>
{
/// <summary>
/// Adds a key and its associated receiver to the group.
/// </summary>
void Add(TKey key, TReceiver receiver);

/// <summary>
/// Removes the item associated with the specified key from the group.
/// </summary>
void Remove(TKey key);

/// <summary>
/// Counts the total number of items in the group.
/// </summary>
int Count();
}

/// <summary>
/// Provides extension methods for working with multicast groups.
/// </summary>
public static class MulticastGroupExtensions
{
/// <summary>
/// Returns a new instance of the receiver with elements excluded based on the specified key.
/// </summary>
public static TReceiver Except<TKey, TReceiver>(this IMulticastGroup<TKey, TReceiver> group, TKey exclude)
where TKey : IEquatable<TKey>
=> group.Except([exclude]);
}
}
10 changes: 10 additions & 0 deletions src/Multicaster/IMulticastGroupProvider.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
namespace Cysharp.Runtime.Multicast;

/// <summary>
/// Provides functionality to manage and retrieve multicast groups, supporting both asynchronous and synchronous communication patterns.
/// </summary>
public interface IMulticastGroupProvider
{
/// <summary>
/// Retrieves an existing multicast group by name or creates a new one if it does not exist.
/// </summary>
IMulticastAsyncGroup<TKey, TReceiver> GetOrAddGroup<TKey, TReceiver>(string name)
where TKey : IEquatable<TKey>;

/// <summary>
/// Retrieves an existing synchronous multicast group by name or creates a new one if it does not exist.
/// </summary>
IMulticastSyncGroup<TKey, TReceiver> GetOrAddSynchronousGroup<TKey, TReceiver>(string name)
where TKey : IEquatable<TKey>;
}
7 changes: 6 additions & 1 deletion src/Multicaster/InMemory/DynamicInMemoryProxyFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
using System.Reflection;
using System.Reflection.Emit;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace Cysharp.Runtime.Multicast.InMemory;

/// <summary>
/// Provides a factory for dynamically creating in-memory proxy instances for a specified interface type.
/// </summary>
public class DynamicInMemoryProxyFactory : IInMemoryProxyFactory
{
public static IInMemoryProxyFactory Instance { get; } = new DynamicInMemoryProxyFactory();
Expand Down Expand Up @@ -227,4 +232,4 @@ static Core()
public static T Create(IReceiverHolder<TKey, T> receivers, ImmutableArray<TKey> excludes, ImmutableArray<TKey>? targets)
=> _factory(receivers, excludes, targets);
}
}
}
6 changes: 5 additions & 1 deletion src/Multicaster/InMemory/InMemoryGroupProvider.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace Cysharp.Runtime.Multicast.InMemory;

public class InMemoryGroupProvider : IMulticastGroupProvider
Expand All @@ -13,10 +15,12 @@ public InMemoryGroupProvider(IInMemoryProxyFactory proxyFactory)
_proxyFactory = proxyFactory;
}

/// <inheritdoc />
public IMulticastAsyncGroup<TKey, T> GetOrAddGroup<TKey, T>(string name)
where TKey : IEquatable<TKey>
=> (IMulticastAsyncGroup<TKey, T>)_groups.GetOrAdd((typeof(TKey), typeof(T), name), _ => new InMemoryGroup<TKey, T>(name, _proxyFactory, Remove));

/// <inheritdoc />
public IMulticastSyncGroup<TKey, T> GetOrAddSynchronousGroup<TKey, T>(string name)
where TKey : IEquatable<TKey>
=> (IMulticastSyncGroup<TKey, T>)_groups.GetOrAdd((typeof(TKey), typeof(T), name), _ => new InMemoryGroup<TKey, T>(name, _proxyFactory, Remove));
Expand Down Expand Up @@ -108,4 +112,4 @@ private void ThrowIfDisposed()
{
if (_disposed) throw new ObjectDisposedException(nameof(InMemoryGroup<TKey, T>));
}
}
}
4 changes: 3 additions & 1 deletion src/Multicaster/InMemory/InMemoryProxyBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Cysharp.Runtime.Multicast.InMemory;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

public abstract class InMemoryProxyBase<TKey, T>
where TKey : IEquatable<TKey>
{
Expand Down Expand Up @@ -566,4 +568,4 @@ private void ThrowIfNotSingle()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool CanInvoke(ReceiverRegistration<TKey, T> r)
=> !r.HasKey || r.Key is null || (!_excludes.Contains(r.Key) && (_targets is null || _targets.Value.Contains(r.Key)));
}
}
9 changes: 4 additions & 5 deletions src/Multicaster/InMemory/InMemoryProxyFactory.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Collections.Immutable;
using System.Runtime.InteropServices;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace Cysharp.Runtime.Multicast.InMemory;

public interface IInMemoryProxyFactory
Expand Down Expand Up @@ -153,4 +152,4 @@ public void Dispose()
{
_onDispose(_state);
}
}
}
7 changes: 7 additions & 0 deletions src/Multicaster/Remoting/DynamicRemoteProxyFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

namespace Cysharp.Runtime.Multicast.Remoting;

/// <summary>
/// Provides a factory for dynamically creating remote proxy instances that implement a specified interface.
/// </summary>
public class DynamicRemoteProxyFactory : IRemoteProxyFactory
{
/// <summary>
/// Gets the singleton instance of the remote proxy factory.
/// </summary>
public static IRemoteProxyFactory Instance { get; } = new DynamicRemoteProxyFactory();

private static readonly AssemblyBuilder _assemblyBuilder;
Expand All @@ -17,6 +23,7 @@ static DynamicRemoteProxyFactory()
_moduleBuilder = _assemblyBuilder.DefineDynamicModule("Multicaster");
}

/// <inheritdoc />
public T Create<T>(IRemoteReceiverWriter writer, IRemoteSerializer serializer)
{
return Core<T>.Create(writer, serializer);
Expand Down
Loading