diff --git a/.github/workflows/build-canary.yml b/.github/workflows/build-canary.yml
index 3a071cc98..f94bc5c16 100644
--- a/.github/workflows/build-canary.yml
+++ b/.github/workflows/build-canary.yml
@@ -4,6 +4,7 @@ on:
push:
branches:
- main
+ - vNext
tags:
- "!*" # not a tag push
paths-ignore:
diff --git a/.github/workflows/build-debug.yml b/.github/workflows/build-debug.yml
index a296c2e63..1e66d10e6 100644
--- a/.github/workflows/build-debug.yml
+++ b/.github/workflows/build-debug.yml
@@ -4,6 +4,7 @@ on:
push:
branches:
- main
+ - vNext
tags:
- "!*" # not a tag push
paths-ignore:
@@ -13,6 +14,7 @@ on:
pull_request:
branches:
- main
+ - vNext
paths-ignore:
- '**.md'
- .github/**
diff --git a/Directory.Build.props b/Directory.Build.props
index 0f05ca832..26e638cd2 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -1,6 +1,6 @@
- 6.1.3
+ 7.0.0
diff --git a/Directory.Packages.props b/Directory.Packages.props
index bb5de49c0..cac34b822 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -9,8 +9,10 @@
2.5.129
4.3.1
3.9.0
+ 0.1.11
+
@@ -20,7 +22,11 @@
+
+
+
+
@@ -46,4 +52,4 @@
-
\ No newline at end of file
+
diff --git a/MagicOnion.sln b/MagicOnion.sln
index d03c2bfb0..41869b4d8 100644
--- a/MagicOnion.sln
+++ b/MagicOnion.sln
@@ -120,6 +120,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MagicOnion.Serialization.Me
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MagicOnion.Abstractions.Tests", "tests\MagicOnion.Abstractions.Tests\MagicOnion.Abstractions.Tests.csproj", "{D340EFB8-128A-4B49-A47A-F00A905D10AC}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ChatApp.Console", "samples\ChatApp\ChatApp.Console\ChatApp.Console.csproj", "{AF21B7BD-7399-41B7-B0D4-08ACDC952E50}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Microbenchmark", "Microbenchmark", "{F1FD52DD-E8A4-4CF0-A857-1A22443A0324}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microbenchmark.Client", "perf\Microbenchmark\Microbenchmark.Client\Microbenchmark.Client.csproj", "{CDBB141A-E0A9-4FD8-8260-1FB1E95C4E80}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -266,6 +272,14 @@ Global
{D340EFB8-128A-4B49-A47A-F00A905D10AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D340EFB8-128A-4B49-A47A-F00A905D10AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D340EFB8-128A-4B49-A47A-F00A905D10AC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {AF21B7BD-7399-41B7-B0D4-08ACDC952E50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {AF21B7BD-7399-41B7-B0D4-08ACDC952E50}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {AF21B7BD-7399-41B7-B0D4-08ACDC952E50}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {AF21B7BD-7399-41B7-B0D4-08ACDC952E50}.Release|Any CPU.Build.0 = Release|Any CPU
+ {CDBB141A-E0A9-4FD8-8260-1FB1E95C4E80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {CDBB141A-E0A9-4FD8-8260-1FB1E95C4E80}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {CDBB141A-E0A9-4FD8-8260-1FB1E95C4E80}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {CDBB141A-E0A9-4FD8-8260-1FB1E95C4E80}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -312,6 +326,9 @@ Global
{2996029B-D329-499F-8525-69614A820135} = {1987061F-8970-4018-8D58-6932961C9EB4}
{701E193F-587D-4C20-8970-6E215B0634F8} = {7ACC27E8-8FBE-4807-B91F-B89AF3CFF7E0}
{D340EFB8-128A-4B49-A47A-F00A905D10AC} = {7ACC27E8-8FBE-4807-B91F-B89AF3CFF7E0}
+ {AF21B7BD-7399-41B7-B0D4-08ACDC952E50} = {FEE2B9AB-A1D0-41BA-A172-FC95935542DF}
+ {F1FD52DD-E8A4-4CF0-A857-1A22443A0324} = {A0CED9FB-5B18-4EE3-859F-CE3A6F90A82A}
+ {CDBB141A-E0A9-4FD8-8260-1FB1E95C4E80} = {F1FD52DD-E8A4-4CF0-A857-1A22443A0324}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D5B2E7E3-B727-40A1-BE68-7BAC9B9DE2FE}
diff --git a/perf/BenchmarkApp/PerformanceTest.Client/Program.cs b/perf/BenchmarkApp/PerformanceTest.Client/Program.cs
index 284ce3c6e..c946bb662 100644
--- a/perf/BenchmarkApp/PerformanceTest.Client/Program.cs
+++ b/perf/BenchmarkApp/PerformanceTest.Client/Program.cs
@@ -52,7 +52,7 @@ async Task Main(
// Create a control channel
using var channelControl = GrpcChannel.ForAddress(config.Url);
var controlServiceClient = MagicOnionClient.Create(channelControl);
- controlServiceClient.SetMemoryProfilerCollectAllocations(true);
+ await controlServiceClient.SetMemoryProfilerCollectAllocationsAsync(true);
ServerInformation serverInfo;
WriteLog("Gathering the server information...");
diff --git a/perf/BenchmarkApp/PerformanceTest.Server/PerfTestControlService.cs b/perf/BenchmarkApp/PerformanceTest.Server/PerfTestControlService.cs
index d8d62d59b..004b66d3c 100644
--- a/perf/BenchmarkApp/PerformanceTest.Server/PerfTestControlService.cs
+++ b/perf/BenchmarkApp/PerformanceTest.Server/PerfTestControlService.cs
@@ -25,7 +25,7 @@ public UnaryResult GetServerInformationAsync()
ApplicationInformation.Current.IsAttached));
}
- public UnaryResult SetMemoryProfilerCollectAllocations(bool enable)
+ public UnaryResult SetMemoryProfilerCollectAllocationsAsync(bool enable)
{
MemoryProfiler.CollectAllocations(enable);
return UnaryResult.CompletedResult;
diff --git a/perf/BenchmarkApp/PerformanceTest.Shared/IPerfTestControlService.cs b/perf/BenchmarkApp/PerformanceTest.Shared/IPerfTestControlService.cs
index e747c92e4..e4d621fed 100644
--- a/perf/BenchmarkApp/PerformanceTest.Shared/IPerfTestControlService.cs
+++ b/perf/BenchmarkApp/PerformanceTest.Shared/IPerfTestControlService.cs
@@ -9,7 +9,7 @@ public interface IPerfTestControlService : IService
{
UnaryResult GetServerInformationAsync();
- UnaryResult SetMemoryProfilerCollectAllocations(bool enable);
+ UnaryResult SetMemoryProfilerCollectAllocationsAsync(bool enable);
UnaryResult CreateMemoryProfilerSnapshotAsync(string name);
}
diff --git a/perf/Microbenchmark/Microbenchmark.Client/.gitignore b/perf/Microbenchmark/Microbenchmark.Client/.gitignore
new file mode 100644
index 000000000..1c2dac683
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/.gitignore
@@ -0,0 +1 @@
+BenchmarkDotNet.Artifacts
\ No newline at end of file
diff --git a/perf/Microbenchmark/Microbenchmark.Client/ChannelAsyncStreamReader.cs b/perf/Microbenchmark/Microbenchmark.Client/ChannelAsyncStreamReader.cs
new file mode 100644
index 000000000..bfa5bf7bb
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/ChannelAsyncStreamReader.cs
@@ -0,0 +1,37 @@
+using System.Runtime.CompilerServices;
+using System.Threading.Channels;
+using Grpc.Core;
+
+namespace Microbenchmark.Client;
+
+class ChannelAsyncStreamReader : IAsyncStreamReader
+{
+ readonly ChannelReader reader;
+
+ public T Current { get; private set; } = default!;
+
+ public ChannelAsyncStreamReader(Channel channel)
+ {
+ reader = channel.Reader;
+ }
+
+ public Task MoveNext(CancellationToken cancellationToken)
+ {
+ return MoveNextCore(cancellationToken).AsTask();
+ }
+
+ [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
+ async ValueTask MoveNextCore(CancellationToken cancellationToken)
+ {
+ if (await reader.WaitToReadAsync())
+ {
+ if (reader.TryRead(out var item))
+ {
+ Current = item;
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/perf/Microbenchmark/Microbenchmark.Client/ChannelClientStreamWriter.cs b/perf/Microbenchmark/Microbenchmark.Client/ChannelClientStreamWriter.cs
new file mode 100644
index 000000000..7136ed34b
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/ChannelClientStreamWriter.cs
@@ -0,0 +1,28 @@
+using System.Threading.Channels;
+using Grpc.Core;
+
+namespace Microbenchmark.Client;
+
+class ChannelClientStreamWriter : IClientStreamWriter
+{
+ readonly ChannelWriter writer;
+
+ public WriteOptions? WriteOptions { get; set; }
+
+ public ChannelClientStreamWriter(ChannelWriter writer)
+ {
+ this.writer = writer;
+ }
+
+ public Task CompleteAsync()
+ {
+ writer.Complete();
+ return Task.CompletedTask;
+ }
+
+ public Task WriteAsync(T message)
+ {
+ writer.TryWrite(message);
+ return Task.CompletedTask;
+ }
+}
diff --git a/perf/Microbenchmark/Microbenchmark.Client/HubMethodBenchmarks.cs b/perf/Microbenchmark/Microbenchmark.Client/HubMethodBenchmarks.cs
new file mode 100644
index 000000000..f4eaeaf88
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/HubMethodBenchmarks.cs
@@ -0,0 +1,82 @@
+using BenchmarkDotNet.Attributes;
+using MagicOnion.Client.DynamicClient;
+
+namespace Microbenchmark.Client;
+
+[MemoryDiagnoser, RankColumn]
+[ShortRunJob]
+public class HubMethodBenchmarks
+{
+ readonly StreamingHubClientTestHelper helper;
+ readonly Task responseTask;
+ readonly ITestHub client;
+
+ static ReadOnlySpan IntResponse => [0xcd, 0x30, 0x39 /* 12345 (int) */];
+ static ReadOnlySpan NilResponse => [0xc0 /* Nil */];
+ static ReadOnlySpan StringResponse => [0xa6, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x21 /* "Hello!" (string) */];
+
+ public HubMethodBenchmarks()
+ {
+ this.helper = new StreamingHubClientTestHelper(new TestHubReceiver(), DynamicStreamingHubClientFactoryProvider.Instance);
+ this.responseTask = Task.Run(async () =>
+ {
+ while (true)
+ {
+ var req = await helper.ReadRequestNoDeserializeAsync();
+ if (req.MethodId is -2087943100 or 1273874383)
+ {
+ // Parameter_Zero_Return_ValueType, Parameter_Many_Return_ValueType
+ helper.WriteResponse(req.MessageId, req.MethodId, IntResponse);
+ }
+ else if (req.MethodId is -1841486598)
+ {
+ // ValueTask_Parameter_Zero_NoReturn
+ helper.WriteResponse(req.MessageId, req.MethodId, NilResponse);
+ }
+ else if (req.MethodId is -440496944 or -1110031569)
+ {
+ // Parameter_Zero_Return_RefType, Parameter_Many_Return_RefType
+ helper.WriteResponse(req.MessageId, req.MethodId, StringResponse);
+ }
+ }
+ });
+ this.client = helper.ConnectAsync().GetAwaiter().GetResult();
+ }
+
+ [Benchmark]
+ public Task Void_Parameter_Zero_NoReturn()
+ {
+ client.Void_Parameter_Zero_NoReturn();
+ return Task.CompletedTask;
+ }
+
+ [Benchmark]
+ public async Task ValueTask_Parameter_Zero_NoReturn()
+ {
+ await client.ValueTask_Parameter_Zero_NoReturn();
+ }
+
+ [Benchmark]
+ public async Task Parameter_Zero_Return_ValueType()
+ {
+ var value = await client.Parameter_Zero_Return_ValueType();
+ }
+
+ [Benchmark]
+ public async Task Parameter_Many_Return_ValueType()
+ {
+ var value = await client.Parameter_Many_Return_ValueType("Hello", 12345, true);
+ }
+
+ [Benchmark]
+ public async Task Parameter_Zero_Return_RefType()
+ {
+ var value = await client.Parameter_Zero_Return_RefType();
+ }
+
+ [Benchmark]
+ public async Task Parameter_Many_Return_RefType()
+ {
+ var value = await client.Parameter_Many_Return_RefType("Hello", 12345, true);
+ }
+}
diff --git a/perf/Microbenchmark/Microbenchmark.Client/HubReceiverBroadcastBenchmarks.cs b/perf/Microbenchmark/Microbenchmark.Client/HubReceiverBroadcastBenchmarks.cs
new file mode 100644
index 000000000..b8c198142
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/HubReceiverBroadcastBenchmarks.cs
@@ -0,0 +1,65 @@
+using BenchmarkDotNet.Attributes;
+using MagicOnion.Client.DynamicClient;
+
+namespace Microbenchmark.Client;
+
+[MemoryDiagnoser, RankColumn]
+[ShortRunJob]
+public class HubReceiverBroadcastBenchmarks
+{
+ StreamingHubClientTestHelper helper = default!;
+ ITestHub client = default!;
+ TestHubReceiver receiver = default!;
+
+ static ReadOnlySpan BroadcastMessage_Parameter_Zero => [0x92, 0xce, 0x76, 0xe4, 0x37, 0x1b /* 1994667803 */, 0xc0 /* Nil */]; // [MethodId(int), SerializedArgument]
+ static ReadOnlySpan BroadcastMessage_Parameter_Many => [0x92, 0xce, 0x4c, 0xb8, 0x83, 0xca /* 1287160778 */, 0x93, 0xa6, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x21, 0xcd, 0x30, 0x39, 0xc3 /* [ "Hello", 12345, true ] */]; // [MethodId(int), SerializedArgument]
+
+ void Setup()
+ {
+ this.receiver = new TestHubReceiver();
+ this.helper = new StreamingHubClientTestHelper(receiver, DynamicStreamingHubClientFactoryProvider.Instance);
+ this.client = helper.ConnectAsync().GetAwaiter().GetResult();
+ }
+
+ [GlobalSetup(Targets = [nameof(Parameter_Zero), nameof(Parameter_Many)])]
+ public void UnsetSynchronizationContext()
+ {
+ SynchronizationContext.SetSynchronizationContext(null);
+ Setup();
+ }
+
+ [GlobalSetup(Targets = [nameof(Parameter_Zero_With_SynchronizationContext), nameof(Parameter_Many_With_SynchronizationContext)])]
+ public void SetSynchronizationContext()
+ {
+ SynchronizationContext.SetSynchronizationContext(new MySynchronizationContext());
+ Setup();
+ }
+
+ [Benchmark]
+ public void Parameter_Zero()
+ {
+ helper.WriteResponseRaw(BroadcastMessage_Parameter_Zero);
+ receiver.Received.Wait();
+ }
+
+ [Benchmark]
+ public void Parameter_Many()
+ {
+ helper.WriteResponseRaw(BroadcastMessage_Parameter_Many);
+ receiver.Received.Wait();
+ }
+
+ [Benchmark]
+ public void Parameter_Zero_With_SynchronizationContext()
+ {
+ helper.WriteResponseRaw(BroadcastMessage_Parameter_Zero);
+ receiver.Received.Wait();
+ }
+
+ [Benchmark]
+ public void Parameter_Many_With_SynchronizationContext()
+ {
+ helper.WriteResponseRaw(BroadcastMessage_Parameter_Many);
+ receiver.Received.Wait();
+ }
+}
diff --git a/perf/Microbenchmark/Microbenchmark.Client/Microbenchmark.Client.csproj b/perf/Microbenchmark/Microbenchmark.Client/Microbenchmark.Client.csproj
new file mode 100644
index 000000000..64dab9f3c
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/Microbenchmark.Client.csproj
@@ -0,0 +1,21 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+
+ true
+ ..\..\..\src\MagicOnion\opensource.snk
+
+
+
+
+
+
+
+
+
+
+
diff --git a/perf/Microbenchmark/Microbenchmark.Client/Program.cs b/perf/Microbenchmark/Microbenchmark.Client/Program.cs
new file mode 100644
index 000000000..60e603a1b
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/Program.cs
@@ -0,0 +1,46 @@
+using BenchmarkDotNet.Running;
+using MagicOnion;
+using Microbenchmark.Client;
+
+//BenchmarkRunner.Run();
+BenchmarkRunner.Run();
+
+#if FALSE
+var b = new HubMethodBenchmarks();
+for (var i = 0; i < 1000000; i++)
+ await b.Parameter_Zero_Return_ValueType();
+#endif
+
+class MySynchronizationContext : SynchronizationContext;
+
+class TestHubReceiver : ITestHubReceiver
+{
+ public ManualResetEventSlim Received { get; } = new();
+
+ public void Parameter_Zero()
+ {
+ Received.Set();
+ }
+
+ public void Parameter_Many(string arg0, int arg1, bool arg2)
+ {
+ Received.Set();
+ }
+}
+
+public interface ITestHub : IStreamingHub
+{
+ void Void_Parameter_Zero_NoReturn();
+ ValueTask ValueTask_Parameter_Zero_NoReturn();
+
+ Task Parameter_Zero_Return_ValueType();
+ Task Parameter_Many_Return_ValueType(string arg0, int arg1, bool arg2);
+ Task Parameter_Zero_Return_RefType();
+ Task Parameter_Many_Return_RefType(string arg0, int arg1, bool arg2);
+}
+
+public interface ITestHubReceiver
+{
+ void Parameter_Zero();
+ void Parameter_Many(string arg0, int arg1, bool arg2);
+}
diff --git a/perf/Microbenchmark/Microbenchmark.Client/StreamingHubClientTestHelper.cs b/perf/Microbenchmark/Microbenchmark.Client/StreamingHubClientTestHelper.cs
new file mode 100644
index 000000000..c6490db6f
--- /dev/null
+++ b/perf/Microbenchmark/Microbenchmark.Client/StreamingHubClientTestHelper.cs
@@ -0,0 +1,204 @@
+using System;
+using System.Buffers;
+using System.Diagnostics;
+using System.Threading.Channels;
+using Grpc.Core;
+using MagicOnion;
+using MagicOnion.Client;
+using MagicOnion.Internal;
+using MagicOnion.Internal.Buffers;
+using MessagePack;
+
+namespace Microbenchmark.Client;
+
+class StreamingHubClientTestHelper
+ where TStreamingHub : IStreamingHub
+ where TReceiver : class
+{
+ readonly Channel requestChannel;
+ readonly ChannelClientStreamWriter requestStream;
+ readonly Channel responseChannel;
+ readonly ChannelAsyncStreamReader responseStream;
+
+ readonly CallInvoker callInvokerMock;
+ readonly TReceiver receiver;
+ readonly IStreamingHubClientFactoryProvider? factoryProvider;
+
+ public TReceiver Receiver => receiver;
+ public CallInvoker CallInvoker => callInvokerMock;
+
+ public StreamingHubClientTestHelper(TReceiver receiver, IStreamingHubClientFactoryProvider? factoryProvider = null)
+ {
+ this.receiver = receiver;
+ this.requestChannel = Channel.CreateUnbounded(new() { SingleReader = true, SingleWriter = true });
+ this.requestStream = new ChannelClientStreamWriter(requestChannel);
+ this.responseChannel = Channel.CreateUnbounded(new() { SingleReader = true, SingleWriter = true });
+ this.responseStream = new ChannelAsyncStreamReader(responseChannel);
+
+ this.factoryProvider = factoryProvider;
+
+ callInvokerMock = new MockCallInvoker(this);
+ }
+
+ class MockCallInvoker(StreamingHubClientTestHelper parent) : CallInvoker
+ {
+ public override TResponse BlockingUnaryCall(Method method, string? host, CallOptions options, TRequest request)
+ => throw new NotImplementedException();
+
+ public override AsyncUnaryCall AsyncUnaryCall(Method method, string? host, CallOptions options, TRequest request)
+ => throw new NotImplementedException();
+
+ public override AsyncServerStreamingCall AsyncServerStreamingCall(Method method, string? host, CallOptions options, TRequest request)
+ => throw new NotImplementedException();
+
+ public override AsyncClientStreamingCall AsyncClientStreamingCall(Method method, string? host, CallOptions options)
+ => throw new NotImplementedException();
+
+ public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Method method, string? host, CallOptions options)
+ => new(
+ (IClientStreamWriter)parent.requestStream,
+ (IAsyncStreamReader)parent.responseStream,
+ _ => Task.FromResult(new Metadata { { "x-magiconion-streaminghub-version", "2" } }),
+ _ => Status.DefaultSuccess,
+ _ => Metadata.Empty,
+ _ => { },
+ new object());
+ }
+
+ public async Task ConnectAsync(CancellationToken cancellationToken = default)
+ {
+ return await StreamingHubClient.ConnectAsync(
+ callInvokerMock,
+ receiver,
+ cancellationToken: cancellationToken,
+ factoryProvider: factoryProvider
+ );
+ }
+
+ public async Task ConnectAsync(StreamingHubClientOptions options, CancellationToken cancellationToken = default)
+ {
+ return await StreamingHubClient.ConnectAsync(
+ callInvokerMock,
+ receiver,
+ options,
+ cancellationToken: cancellationToken,
+ factoryProvider: factoryProvider
+ );
+ }
+
+ public async ValueTask> ReadRequestRawAsync()
+ {
+ var requestPayload = await requestChannel.Reader.ReadAsync();
+ return requestPayload.Memory;
+ }
+
+ public async ValueTask<(int MessageId, int MethodId, T Request)> ReadRequestAsync()
+ {
+ var requestPayload = await requestChannel.Reader.ReadAsync();
+ try
+ {
+ return ReadRequestPayload(requestPayload.Memory);
+ }
+ finally
+ {
+ StreamingHubPayloadPool.Shared.Return(requestPayload);
+ }
+ }
+
+ public async ValueTask<(int MessageId, int MethodId, ReadOnlyMemory Request)> ReadRequestNoDeserializeAsync()
+ {
+ var requestPayload = await requestChannel.Reader.ReadAsync();
+ try
+ {
+ return ReadRequestPayload(requestPayload.Memory);
+ }
+ finally
+ {
+ StreamingHubPayloadPool.Shared.Return(requestPayload);
+ }
+ }
+
+ public async ValueTask<(int MethodId, T Request)> ReadFireAndForgetRequestAsync()
+ {
+ var requestPayload = await requestChannel.Reader.ReadAsync();
+ try
+ {
+ return ReadFireAndForgetRequestPayload(requestPayload.Memory);
+ }
+ finally
+ {
+ StreamingHubPayloadPool.Shared.Return(requestPayload);
+ }
+ }
+
+ public void WriteResponseRaw(ReadOnlySpan data)
+ {
+ responseChannel.Writer.TryWrite(StreamingHubPayloadPool.Shared.RentOrCreate(data));
+ }
+
+ public void WriteResponse(int messageId, int methodId, T response)
+ {
+ responseChannel.Writer.TryWrite(BuildResponsePayload(messageId, methodId, response));
+ }
+
+ public void WriteResponse(int messageId, int methodId, ReadOnlySpan response)
+ {
+ responseChannel.Writer.TryWrite(BuildResponsePayload(messageId, methodId, response));
+ }
+
+ static StreamingHubPayload BuildResponsePayload(int messageId, int methodId, T response)
+ {
+ using var bufferWriter = ArrayPoolBufferWriter.RentThreadStaticWriter();
+ var messagePackWriter = new MessagePackWriter(bufferWriter);
+ messagePackWriter.WriteArrayHeader(3);
+ messagePackWriter.Write(messageId);
+ messagePackWriter.Write(methodId);
+ MessagePackSerializer.Serialize(ref messagePackWriter, response);
+ messagePackWriter.Flush();
+ return StreamingHubPayloadPool.Shared.RentOrCreate(bufferWriter.WrittenSpan);
+ }
+
+ static StreamingHubPayload BuildResponsePayload(int messageId, int methodId, ReadOnlySpan response)
+ {
+ using var bufferWriter = ArrayPoolBufferWriter.RentThreadStaticWriter();
+ var messagePackWriter = new MessagePackWriter(bufferWriter);
+ messagePackWriter.WriteArrayHeader(3);
+ messagePackWriter.Write(messageId);
+ messagePackWriter.Write(methodId);
+ messagePackWriter.Flush();
+ bufferWriter.Write(response);
+ return StreamingHubPayloadPool.Shared.RentOrCreate(bufferWriter.WrittenSpan);
+ }
+
+ static (int MessageId, int MethodId, T Body) ReadRequestPayload(ReadOnlyMemory payload)
+ {
+ // Array[3][messageId (int), methodId (int), request body...]
+ var messagePackReader = new MessagePackReader(payload);
+ var arraySize = messagePackReader.ReadArrayHeader();
+ Debug.Assert(arraySize == 3);
+ var messageId = messagePackReader.ReadInt32();
+ var methodId = messagePackReader.ReadInt32();
+ return (messageId, methodId, MessagePackSerializer.Deserialize(ref messagePackReader));
+ }
+
+ static (int MessageId, int MethodId, ReadOnlyMemory Body) ReadRequestPayload(ReadOnlyMemory payload)
+ {
+ // Array[3][messageId (int), methodId (int), request body...]
+ var messagePackReader = new MessagePackReader(payload);
+ var arraySize = messagePackReader.ReadArrayHeader();
+ Debug.Assert(arraySize == 3);
+ var messageId = messagePackReader.ReadInt32();
+ var methodId = messagePackReader.ReadInt32();
+ return (messageId, methodId, payload.Slice((int)messagePackReader.Consumed));
+ }
+
+ static (int MethodId, T Body) ReadFireAndForgetRequestPayload(ReadOnlyMemory payload)
+ {
+ // Array[2][methodId (int), request body...]
+ var messagePackReader = new MessagePackReader(payload);
+ var arraySize = messagePackReader.ReadArrayHeader();
+ Debug.Assert(arraySize == 2);
+ var methodId = messagePackReader.ReadInt32();
+ return (methodId, MessagePackSerializer.Deserialize(ref messagePackReader));
+ }
+}
diff --git a/samples/ChatApp/ChatApp.Console/ChatApp.Console.csproj b/samples/ChatApp/ChatApp.Console/ChatApp.Console.csproj
new file mode 100644
index 000000000..aafa3e639
--- /dev/null
+++ b/samples/ChatApp/ChatApp.Console/ChatApp.Console.csproj
@@ -0,0 +1,16 @@
+
+
+
+ Exe
+ net8.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/samples/ChatApp/ChatApp.Console/Program.cs b/samples/ChatApp/ChatApp.Console/Program.cs
new file mode 100644
index 000000000..71c1c4429
--- /dev/null
+++ b/samples/ChatApp/ChatApp.Console/Program.cs
@@ -0,0 +1,57 @@
+// See https://aka.ms/new-console-template for more information
+
+using ChatApp.Shared.Hubs;
+using ChatApp.Shared.MessagePackObjects;
+using Grpc.Net.Client;
+using MagicOnion.Client;
+
+var channel = GrpcChannel.ForAddress("http://localhost:5000");
+
+var sessionId = Guid.NewGuid();
+Console.WriteLine("Connecting...");
+var hub = await StreamingHubClient.ConnectAsync(channel, new ChatHubReceiver(sessionId));
+Console.WriteLine($"Connected: {sessionId}");
+
+Console.Write("UserName: ");
+var userName = Console.ReadLine();
+Console.Write("RoomName: ");
+var roomName = Console.ReadLine();
+
+Console.WriteLine($"Join: RoomName={roomName}; UserName={userName}");
+await hub.JoinAsync(new JoinRequest() { RoomName = roomName, UserName = userName });
+Console.WriteLine($"Joined");
+
+while (true)
+{
+ var message = Console.ReadLine();
+ await hub.SendMessageAsync(message);
+}
+
+[MagicOnionClientGeneration(typeof(IChatHub))]
+partial class MagicOnionGeneratedClientInitializer;
+
+
+class ChatHubReceiver(Guid sessionId) : IChatHubReceiver
+{
+ public void OnJoin(string name)
+ {
+ Console.WriteLine($" Join: {name}");
+ }
+
+ public void OnLeave(string name)
+ {
+ Console.WriteLine($" Leave: {name}");
+ }
+
+ public void OnSendMessage(MessageResponse message)
+ {
+ Console.WriteLine($"{message.UserName}> {message.Message}");
+ }
+
+ public async Task HelloAsync(string name, int age)
+ {
+ Console.WriteLine("HelloAsync called");
+ await Task.Delay(100);
+ return $"Hello {name} ({age})!; {sessionId}";
+ }
+}
diff --git a/samples/ChatApp/ChatApp.Server/ChatHub.cs b/samples/ChatApp/ChatApp.Server/ChatHub.cs
index d6bf64d3a..31ee72ef0 100644
--- a/samples/ChatApp/ChatApp.Server/ChatHub.cs
+++ b/samples/ChatApp/ChatApp.Server/ChatHub.cs
@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
+using Cysharp.Runtime.Multicast;
namespace ChatApp.Server;
@@ -13,8 +14,14 @@ namespace ChatApp.Server;
///
public class ChatHub : StreamingHubBase, IChatHub
{
- private IGroup room;
+ private IGroup room;
private string myName;
+ private readonly IMulticastSyncGroup roomForAll;
+
+ public ChatHub(IMulticastGroupProvider groupProvider)
+ {
+ roomForAll = groupProvider.GetOrAddSynchronousGroup("All");
+ }
public async Task JoinAsync(JoinRequest request)
{
@@ -22,7 +29,7 @@ public async Task JoinAsync(JoinRequest request)
this.myName = request.UserName;
- this.Broadcast(this.room).OnJoin(request.UserName);
+ this.room.All.OnJoin(request.UserName);
}
@@ -31,7 +38,7 @@ public async Task LeaveAsync()
if (this.room is not null)
{
await this.room.RemoveAsync(this.Context);
- this.Broadcast(this.room).OnLeave(this.myName);
+ this.room.All.OnLeave(this.myName);
}
}
@@ -39,8 +46,16 @@ public async Task SendMessageAsync(string message)
{
if (this.room is not null)
{
- var response = new MessageResponse { UserName = this.myName, Message = message };
- this.Broadcast(this.room).OnSendMessage(response);
+ if (message.StartsWith("/global ", StringComparison.InvariantCultureIgnoreCase))
+ {
+ var response = new MessageResponse { UserName = this.myName, Message = message.Substring("/global ".Length) };
+ this.roomForAll.All.OnSendMessage(response);
+ }
+ else
+ {
+ var response = new MessageResponse { UserName = this.myName, Message = message };
+ this.room.All.OnSendMessage(response);
+ }
}
await Task.CompletedTask;
@@ -61,6 +76,7 @@ protected override ValueTask OnConnecting()
{
// handle connection if needed.
Console.WriteLine($"client connected {this.Context.ContextId}");
+ roomForAll.Add(ConnectionId, Client);
return CompletedTask;
}
@@ -68,6 +84,7 @@ protected override ValueTask OnDisconnected()
{
// handle disconnection if needed.
// on disconnecting, if automatically removed this connection from group.
+ roomForAll.Remove(ConnectionId);
return CompletedTask;
}
}
diff --git a/samples/ChatApp/ChatApp.Shared/Hubs/IChatHubReceiver.cs b/samples/ChatApp/ChatApp.Shared/Hubs/IChatHubReceiver.cs
index b8b21eeb0..0bca9910d 100644
--- a/samples/ChatApp/ChatApp.Shared/Hubs/IChatHubReceiver.cs
+++ b/samples/ChatApp/ChatApp.Shared/Hubs/IChatHubReceiver.cs
@@ -1,4 +1,5 @@
-using ChatApp.Shared.MessagePackObjects;
+using System.Threading.Tasks;
+using ChatApp.Shared.MessagePackObjects;
namespace ChatApp.Shared.Hubs
{
@@ -12,5 +13,8 @@ public interface IChatHubReceiver
void OnLeave(string name);
void OnSendMessage(MessageResponse message);
+
+
+ Task HelloAsync(string name, int age);
}
}
diff --git a/samples/JwtAuthentication/JwtAuthApp.Server/Hubs/TimerHub.cs b/samples/JwtAuthentication/JwtAuthApp.Server/Hubs/TimerHub.cs
index 384465cd7..7fb5a02d1 100644
--- a/samples/JwtAuthentication/JwtAuthApp.Server/Hubs/TimerHub.cs
+++ b/samples/JwtAuthentication/JwtAuthApp.Server/Hubs/TimerHub.cs
@@ -6,7 +6,6 @@
using System.Threading.Tasks;
using Grpc.Core;
using JwtAuthApp.Shared;
-using MagicOnion.Server;
using MagicOnion.Server.Hubs;
using Microsoft.AspNetCore.Authorization;
@@ -18,7 +17,7 @@ public class TimerHub : StreamingHubBase, ITimerHu
private Task _timerLoopTask;
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private TimeSpan _interval = TimeSpan.FromSeconds(1);
- private IGroup _group;
+ private IGroup _group;
public async Task SetAsync(TimeSpan interval)
{
@@ -33,7 +32,7 @@ public async Task SetAsync(TimeSpan interval)
await Task.Delay(_interval, _cancellationTokenSource.Token);
var userPrincipal = Context.CallContext.GetHttpContext().User;
- BroadcastToSelf(_group).OnTick($"UserId={userPrincipal.Claims.First(x => x.Type == ClaimTypes.NameIdentifier).Value}; Name={userPrincipal.Identity?.Name}");
+ Client.OnTick($"UserId={userPrincipal.Claims.First(x => x.Type == ClaimTypes.NameIdentifier).Value}; Name={userPrincipal.Identity?.Name}");
}
});
}
diff --git a/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionStreamingHubInfo.cs b/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionStreamingHubInfo.cs
index a8a04f9f0..4e09ed00b 100644
--- a/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionStreamingHubInfo.cs
+++ b/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionStreamingHubInfo.cs
@@ -77,13 +77,25 @@ public MagicOnionHubMethodInfo(int hubId, string methodName, IReadOnlyList parameters, MagicOnionTypeInfo methodReturnType, MagicOnionTypeInfo requestType, MagicOnionTypeInfo responseType)
+ : base(hubId, methodName, parameters, methodReturnType, requestType, responseType)
+ {
+ IsClientResult = methodReturnType != MagicOnionTypeInfo.KnownTypes.System_Void;
+ }
+ }
+
[DebuggerDisplay("StreamingHubReceiver: {ReceiverType,nq}; Methods={Methods.Count,nq}")]
public class MagicOnionStreamingHubReceiverInfo
{
public MagicOnionTypeInfo ReceiverType { get; }
- public IReadOnlyList Methods { get; }
+ public IReadOnlyList Methods { get; }
- public MagicOnionStreamingHubReceiverInfo(MagicOnionTypeInfo receiverType, IReadOnlyList methods)
+ public MagicOnionStreamingHubReceiverInfo(MagicOnionTypeInfo receiverType, IReadOnlyList methods)
{
ReceiverType = receiverType;
Methods = methods;
diff --git a/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionTypeInfo.cs b/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionTypeInfo.cs
index 34da33848..e931feb60 100644
--- a/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionTypeInfo.cs
+++ b/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MagicOnionTypeInfo.cs
@@ -13,6 +13,7 @@ public static class KnownTypes
public static MagicOnionTypeInfo System_String { get; } = new MagicOnionTypeInfo("System", "String");
public static MagicOnionTypeInfo System_Boolean { get; } = new MagicOnionTypeInfo("System", "Boolean", SubType.ValueType);
public static MagicOnionTypeInfo MessagePack_Nil { get; } = new MagicOnionTypeInfo("MessagePack", "Nil", SubType.ValueType);
+ public static MagicOnionTypeInfo System_Threading_CancellationToken { get; } = new MagicOnionTypeInfo("System.Threading", "CancellationToken", SubType.ValueType);
public static MagicOnionTypeInfo System_Threading_Tasks_Task { get; } = new MagicOnionTypeInfo("System.Threading.Tasks", "Task");
public static MagicOnionTypeInfo System_Threading_Tasks_ValueTask { get; } = new MagicOnionTypeInfo("System.Threading.Tasks", "ValueTask", SubType.ValueType);
public static MagicOnionTypeInfo MagicOnion_UnaryResult { get; } = new MagicOnionTypeInfo("MagicOnion", "UnaryResult", SubType.ValueType);
diff --git a/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MethodCollector.cs b/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MethodCollector.cs
index e80e4bb0c..55dac16ee 100644
--- a/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MethodCollector.cs
+++ b/src/MagicOnion.Client.SourceGenerator/CodeAnalysis/MethodCollector.cs
@@ -58,7 +58,7 @@ static IReadOnlyList GetStreamingHubs(MethodCollecto
var receiverInterfaceSymbol = x.AllInterfaces.First(y => y.ConstructedFrom.ApproximatelyEqual(ctx.ReferenceSymbols.IStreamingHub)).TypeArguments[1];
var receiverType = ctx.GetOrCreateTypeInfoFromSymbol(receiverInterfaceSymbol);
- var receiverMethods = new List();
+ var receiverMethods = new List();
foreach (var methodSymbol in GetAllMethods(receiverInterfaceSymbol, ctx.ReferenceSymbols))
{
if (TryCreateHubReceiverMethodInfoFromMethodSymbol(ctx, serviceType, receiverType, methodSymbol, out var methodInfo, out var diagnostic))
@@ -122,6 +122,7 @@ static bool TryCreateHubMethodInfoFromMethodSymbol(MethodCollectorContext ctx, M
{
case "global::System.Threading.Tasks.Task":
case "global::System.Threading.Tasks.ValueTask":
+ case "global::System.Void":
//responseType = MagicOnionTypeInfo.KnownTypes.MessagePack_Nil;
break;
case "global::System.Threading.Tasks.Task<>":
@@ -148,14 +149,19 @@ static bool TryCreateHubMethodInfoFromMethodSymbol(MethodCollectorContext ctx, M
diagnostic = null;
return true;
}
- static bool TryCreateHubReceiverMethodInfoFromMethodSymbol(MethodCollectorContext ctx, MagicOnionTypeInfo interfaceType, MagicOnionTypeInfo receiverType, IMethodSymbol methodSymbol, [NotNullWhen(true)] out MagicOnionStreamingHubInfo.MagicOnionHubMethodInfo? methodInfo, out Diagnostic? diagnostic)
+ static bool TryCreateHubReceiverMethodInfoFromMethodSymbol(MethodCollectorContext ctx, MagicOnionTypeInfo interfaceType, MagicOnionTypeInfo receiverType, IMethodSymbol methodSymbol, [NotNullWhen(true)] out MagicOnionStreamingHubInfo.MagicOnionHubReceiverMethodInfo? methodInfo, out Diagnostic? diagnostic)
{
var hubId = GetHubMethodIdFromMethodSymbol(methodSymbol);
var methodReturnType = ctx.GetOrCreateTypeInfoFromSymbol(methodSymbol.ReturnType);
var methodParameters = CreateParameterInfoListFromMethodSymbol(ctx, methodSymbol);
- var requestType = CreateRequestTypeFromMethodParameters(methodParameters);
+ var requestType = CreateRequestTypeFromMethodParameters(methodParameters.Where(x => x.Type != MagicOnionTypeInfo.KnownTypes.System_Threading_CancellationToken).ToArray());
var responseType = MagicOnionTypeInfo.KnownTypes.MessagePack_Nil;
- if (methodReturnType != MagicOnionTypeInfo.KnownTypes.System_Void)
+ if (methodReturnType != MagicOnionTypeInfo.KnownTypes.System_Void &&
+ methodReturnType != MagicOnionTypeInfo.KnownTypes.System_Threading_Tasks_Task &&
+ methodReturnType != MagicOnionTypeInfo.KnownTypes.System_Threading_Tasks_ValueTask &&
+ (!methodReturnType.HasGenericArguments ||
+ (methodReturnType.GetGenericTypeDefinition() != MagicOnionTypeInfo.KnownTypes.System_Threading_Tasks_Task &&
+ methodReturnType.GetGenericTypeDefinition() != MagicOnionTypeInfo.KnownTypes.System_Threading_Tasks_ValueTask)))
{
methodInfo = null;
diagnostic = Diagnostic.Create(
@@ -164,8 +170,12 @@ static bool TryCreateHubReceiverMethodInfoFromMethodSymbol(MethodCollectorContex
$"{receiverType.ToDisplayName(MagicOnionTypeInfo.DisplayNameFormat.Namespace)}.{methodSymbol.Name}", methodReturnType.ToDisplayName(MagicOnionTypeInfo.DisplayNameFormat.Namespace));
return false;
}
+ else if (methodReturnType.HasGenericArguments)
+ {
+ responseType = methodReturnType.GenericArguments[0];
+ }
- methodInfo = new MagicOnionStreamingHubInfo.MagicOnionHubMethodInfo(
+ methodInfo = new MagicOnionStreamingHubInfo.MagicOnionHubReceiverMethodInfo(
hubId,
methodSymbol.Name,
methodParameters,
diff --git a/src/MagicOnion.Client.SourceGenerator/CodeGen/MagicOnionInitializerGenerator.cs b/src/MagicOnion.Client.SourceGenerator/CodeGen/MagicOnionInitializerGenerator.cs
index 2bec960b0..de067ffb9 100644
--- a/src/MagicOnion.Client.SourceGenerator/CodeGen/MagicOnionInitializerGenerator.cs
+++ b/src/MagicOnion.Client.SourceGenerator/CodeGen/MagicOnionInitializerGenerator.cs
@@ -142,7 +142,7 @@ static StreamingHubClientFactoryCache()
writer.AppendLineWithFormat($$"""
if (typeof(TStreamingHub) == typeof({{hubInfo.ServiceType.FullName}}) && typeof(TReceiver) == typeof({{hubInfo.Receiver.ReceiverType.FullName}}))
{
- factory = ((global::MagicOnion.Client.StreamingHubClientFactoryDelegate<{{hubInfo.ServiceType.FullName}}, {{hubInfo.Receiver.ReceiverType.FullName}}>)((a, _, b, c, d, e) => new MagicOnionGeneratedClient.{{hubInfo.GetClientFullName()}}(a, b, c, d, e, StreamingHubDiagnosticHandler)));
+ factory = ((global::MagicOnion.Client.StreamingHubClientFactoryDelegate<{{hubInfo.ServiceType.FullName}}, {{hubInfo.Receiver.ReceiverType.FullName}}>)((a, b, c) => new MagicOnionGeneratedClient.{{hubInfo.GetClientFullName()}}(a, b, c, StreamingHubDiagnosticHandler)));
}
""");
}
@@ -151,7 +151,7 @@ static StreamingHubClientFactoryCache()
writer.AppendLineWithFormat($$"""
if (typeof(TStreamingHub) == typeof({{hubInfo.ServiceType.FullName}}) && typeof(TReceiver) == typeof({{hubInfo.Receiver.ReceiverType.FullName}}))
{
- factory = ((global::MagicOnion.Client.StreamingHubClientFactoryDelegate<{{hubInfo.ServiceType.FullName}}, {{hubInfo.Receiver.ReceiverType.FullName}}>)((a, _, b, c, d, e) => new MagicOnionGeneratedClient.{{hubInfo.GetClientFullName()}}(a, b, c, d, e)));
+ factory = ((global::MagicOnion.Client.StreamingHubClientFactoryDelegate<{{hubInfo.ServiceType.FullName}}, {{hubInfo.Receiver.ReceiverType.FullName}}>)((a, b, c) => new MagicOnionGeneratedClient.{{hubInfo.GetClientFullName()}}(a, b, c)));
}
""");
}
diff --git a/src/MagicOnion.Client.SourceGenerator/CodeGen/StaticStreamingHubClientGenerator.cs b/src/MagicOnion.Client.SourceGenerator/CodeGen/StaticStreamingHubClientGenerator.cs
index 410d07b07..6544756eb 100644
--- a/src/MagicOnion.Client.SourceGenerator/CodeGen/StaticStreamingHubClientGenerator.cs
+++ b/src/MagicOnion.Client.SourceGenerator/CodeGen/StaticStreamingHubClientGenerator.cs
@@ -46,6 +46,7 @@ static void EmitHeader(GenerationContext generationContext, StringBuilder writer
#pragma warning disable CS0414 // The private field 'field' is assigned but its value is never used
#pragma warning disable CS8019 // Unnecessary using directive.
#pragma warning disable CS1522 // Empty switch block
+ #pragma warning disable CS1998 // This async method lacks 'await' operators and will run synchronously.
""");
}
@@ -107,6 +108,7 @@ public class {{ctx.Hub.GetClientFullName()}} : global::MagicOnion.Client.Streami
EmitFireAndForget(ctx);
EmitOnBroadcastEvent(ctx);
EmitOnResponseEvent(ctx);
+ EmitOnClientResultEvent(ctx);
ctx.Writer.AppendLine("""
}
""");
@@ -147,8 +149,8 @@ static void EmitConstructor(StreamingHubClientBuildContext ctx)
if (ctx.EnableStreamingHubDiagnosticHandler)
{
ctx.Writer.AppendLineWithFormat($$"""
- public {{ctx.Hub.GetClientFullName()}}(global::Grpc.Core.CallInvoker callInvoker, global::System.String host, global::Grpc.Core.CallOptions options, global::MagicOnion.Serialization.IMagicOnionSerializerProvider serializerProvider, global::MagicOnion.Client.IMagicOnionClientLogger logger, global::MagicOnion.Client.IStreamingHubDiagnosticHandler diagnosticHandler)
- : base("{{ctx.Hub.ServiceType.Name}}", callInvoker, host, options, serializerProvider, logger)
+ public {{ctx.Hub.GetClientFullName()}}({{ctx.Hub.Receiver.ReceiverType.FullName}} receiver, global::Grpc.Core.CallInvoker callInvoker, global::MagicOnion.Client.StreamingHubClientOptions options, global::MagicOnion.Client.IStreamingHubDiagnosticHandler diagnosticHandler)
+ : base("{{ctx.Hub.ServiceType.Name}}", receiver, callInvoker, options)
{
this.diagnosticHandler = diagnosticHandler;
}
@@ -157,8 +159,8 @@ static void EmitConstructor(StreamingHubClientBuildContext ctx)
else
{
ctx.Writer.AppendLineWithFormat($$"""
- public {{ctx.Hub.GetClientFullName()}}(global::Grpc.Core.CallInvoker callInvoker, global::System.String host, global::Grpc.Core.CallOptions options, global::MagicOnion.Serialization.IMagicOnionSerializerProvider serializerProvider, global::MagicOnion.Client.IMagicOnionClientLogger logger)
- : base("{{ctx.Hub.ServiceType.Name}}", callInvoker, host, options, serializerProvider, logger)
+ public {{ctx.Hub.GetClientFullName()}}({{ctx.Hub.Receiver.ReceiverType.FullName}} receiver, global::Grpc.Core.CallInvoker callInvoker, global::MagicOnion.Client.StreamingHubClientOptions options)
+ : base("{{ctx.Hub.ServiceType.Name}}", receiver, callInvoker, options)
{
}
""");
@@ -171,7 +173,7 @@ static void EmitFireAndForget(StreamingHubClientBuildContext ctx)
ctx.Writer.AppendLineWithFormat($$"""
public {{ctx.Hub.ServiceType.FullName}} FireAndForget()
=> new FireAndForgetClient(this);
-
+
[global::MagicOnion.Ignore]
class FireAndForgetClient : {{ctx.Hub.ServiceType.FullName}}
{
@@ -213,17 +215,19 @@ static void EmitHubMethods(StreamingHubClientBuildContext ctx, bool isFireAndFor
// new DynamicArgumentTuple(arg1, arg2, ...)
_ => $", {method.Parameters.ToNewDynamicArgumentTuple()}",
};
+ var isReturnTypeVoid = method.MethodReturnType == MagicOnionTypeInfo.KnownTypes.System_Void;
+ var writeMessageTarget = isFireAndForget ? "parent" : "this";
var writeMessageAsync = ctx.EnableStreamingHubDiagnosticHandler
- ? isFireAndForget
- ? "parent.WriteMessageFireAndForgetDiagnosticAsync"
- : "this.WriteMessageWithResponseDiagnosticAsync"
- : isFireAndForget
- ? "parent.WriteMessageFireAndForgetAsync"
- : "base.WriteMessageWithResponseAsync";
+ ? isFireAndForget || isReturnTypeVoid
+ ? $"{writeMessageTarget}.WriteMessageFireAndForgetDiagnosticAsync"
+ : $"{writeMessageTarget}.WriteMessageWithResponseDiagnosticAsync"
+ : isFireAndForget || isReturnTypeVoid
+ ? $"{writeMessageTarget}.WriteMessageFireAndForgetAsync"
+ : $"{writeMessageTarget}.WriteMessageWithResponseAsync";
if (isFireAndForget) ctx.Writer.Append(" ");
ctx.Writer.AppendLineWithFormat($"""
- public {method.MethodReturnType.FullName} {method.MethodName}({method.Parameters.ToMethodSignaturize()})
+ public {(isReturnTypeVoid ? "void" : method.MethodReturnType.FullName)} {method.MethodName}({method.Parameters.ToMethodSignaturize()})
""");
if (isFireAndForget) ctx.Writer.Append(" ");
@@ -256,12 +260,12 @@ static void EmitHubMethods(StreamingHubClientBuildContext ctx, bool isFireAndFor
static void EmitOnBroadcastEvent(StreamingHubClientBuildContext ctx)
{
ctx.Writer.AppendLine("""
- protected override void OnBroadcastEvent(global::System.Int32 methodId, global::System.ArraySegment data)
+ protected override void OnBroadcastEvent(global::System.Int32 methodId, global::System.ReadOnlyMemory data)
{
switch (methodId)
{
""");
- foreach (var method in ctx.Hub.Receiver.Methods)
+ foreach (var method in ctx.Hub.Receiver.Methods.Where(x => x.MethodReturnType == MagicOnionTypeInfo.KnownTypes.System_Void))
{
var methodArgs = method.Parameters.Count switch
{
@@ -270,29 +274,35 @@ protected override void OnBroadcastEvent(global::System.Int32 methodId, global::
_ => string.Join(", ", Enumerable.Range(1, method.Parameters.Count).Select(x => $"value.Item{x}"))
};
+ ctx.Writer.AppendLineWithFormat($$"""
+ case {{method.HubId}}: // {{method.MethodReturnType.ToDisplayName()}} {{method.MethodName}}({{method.Parameters.ToMethodSignaturize()}})
+ {
+ """);
+
if (ctx.EnableStreamingHubDiagnosticHandler)
{
ctx.Writer.AppendLineWithFormat($$"""
- case {{method.HubId}}: // {{method.MethodReturnType.ToDisplayName()}} {{method.MethodName}}({{method.Parameters.ToMethodSignaturize()}})
- {
var value = base.Deserialize<{{method.RequestType.FullName}}>(data);
diagnosticHandler?.OnBroadcastEvent(this, "{{method.MethodName}}", value);
receiver.{{method.MethodName}}({{methodArgs}});
- }
- break;
""");
}
else
{
- ctx.Writer.AppendLineWithFormat($$"""
- case {{method.HubId}}: // {{method.MethodReturnType.ToDisplayName()}} {{method.MethodName}}({{method.Parameters.ToMethodSignaturize()}})
- {
+ if (method.Parameters.Count != 0)
+ {
+ ctx.Writer.AppendLineWithFormat($$"""
var value = base.Deserialize<{{method.RequestType.FullName}}>(data);
+ """);
+ }
+ ctx.Writer.AppendLineWithFormat($$"""
receiver.{{method.MethodName}}({{methodArgs}});
+ """);
+ }
+ ctx.Writer.AppendLine("""
}
break;
""");
- }
}
ctx.Writer.AppendLine("""
}
@@ -304,7 +314,7 @@ protected override void OnBroadcastEvent(global::System.Int32 methodId, global::
static void EmitOnResponseEvent(StreamingHubClientBuildContext ctx)
{
ctx.Writer.AppendLine("""
- protected override void OnResponseEvent(global::System.Int32 methodId, global::System.Object taskCompletionSource, global::System.ArraySegment data)
+ protected override void OnResponseEvent(global::System.Int32 methodId, global::System.Object taskCompletionSource, global::System.ReadOnlyMemory data)
{
switch (methodId)
{
@@ -323,4 +333,59 @@ protected override void OnResponseEvent(global::System.Int32 methodId, global::S
""");
}
+
+ static void EmitOnClientResultEvent(StreamingHubClientBuildContext ctx)
+ {
+ var clientResultMethods = ctx.Hub.Receiver.Methods.Where(x => x.IsClientResult).ToArray();
+ if (clientResultMethods.Length == 0)
+ {
+ ctx.Writer.AppendLine("""
+ protected override void OnClientResultEvent(global::System.Int32 methodId, global::System.Guid messageId, global::System.ReadOnlyMemory data)
+ {
+ }
+ """);
+ return;
+ }
+
+ ctx.Writer.AppendLine("""
+ protected override void OnClientResultEvent(global::System.Int32 methodId, global::System.Guid messageId, global::System.ReadOnlyMemory data)
+ {
+ try
+ {
+ switch (methodId)
+ {
+ """);
+ foreach (var method in clientResultMethods)
+ {
+ var methodParameters = method.Parameters
+ .Select((x, i) => (Index: i, IsCancellationToken: x.Type == MagicOnionTypeInfo.KnownTypes.System_Threading_CancellationToken, Type: x.Type))
+ .ToArray();
+
+ var methodArgs = methodParameters.Count(x => !x.IsCancellationToken) switch
+ {
+ 0 => string.Join(", ", methodParameters.Select(x => "default")),
+ 1 => string.Join(", ", methodParameters.Select(x => x.IsCancellationToken ? "default" : "value")),
+ _ => string.Join(", ", methodParameters.Select(x => x.IsCancellationToken ? "default" : $"value.Item{x.Index + 1}")),
+ };
+
+ ctx.Writer.AppendLineWithFormat($$"""
+ case {{method.HubId}}: // {{method.MethodReturnType.ToDisplayName()}} {{method.MethodName}}({{method.Parameters.ToMethodSignaturize()}})
+ {
+ var value = base.Deserialize<{{method.RequestType.FullName}}>(data);
+ base.AwaitAndWriteClientResultResponseMessage(methodId, messageId, receiver.{{method.MethodName}}({{methodArgs}}));
+ }
+ break;
+ """);
+ }
+ ctx.Writer.AppendLine("""
+ }
+ }
+ catch (global::System.Exception ex)
+ {
+ base.WriteClientResultResponseMessageForError(methodId, messageId, ex);
+ }
+ }
+
+ """);
+ }
}
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels.meta b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels.meta
new file mode 100644
index 000000000..5d3dc9bf0
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels.meta
@@ -0,0 +1,8 @@
+fileFormatVersion: 2
+guid: 6f8d1cd17d285aa40aba05cb8861efa2
+folderAsset: yes
+DefaultImporter:
+ externalObjects: {}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0.meta b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0.meta
new file mode 100644
index 000000000..fa72750dc
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0.meta
@@ -0,0 +1,8 @@
+fileFormatVersion: 2
+guid: 3fae5b6755585dc4c941b234a9e7e82b
+folderAsset: yes
+DefaultImporter:
+ externalObjects: {}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/LICENSE.TXT b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/LICENSE.TXT
new file mode 100644
index 000000000..984713a49
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/LICENSE.TXT
@@ -0,0 +1,23 @@
+The MIT License (MIT)
+
+Copyright (c) .NET Foundation and Contributors
+
+All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/LICENSE.TXT.meta b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/LICENSE.TXT.meta
new file mode 100644
index 000000000..9d58eba11
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/LICENSE.TXT.meta
@@ -0,0 +1,7 @@
+fileFormatVersion: 2
+guid: d91ceaa4a725879459c4908f7eb8a4f9
+TextScriptImporter:
+ externalObjects: {}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib.meta b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib.meta
new file mode 100644
index 000000000..b7c3424bb
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib.meta
@@ -0,0 +1,8 @@
+fileFormatVersion: 2
+guid: d49c9170d8551e346b86aafb360e2fd1
+folderAsset: yes
+DefaultImporter:
+ externalObjects: {}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1.meta b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1.meta
new file mode 100644
index 000000000..ccc02371c
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1.meta
@@ -0,0 +1,8 @@
+fileFormatVersion: 2
+guid: bc8731f0410c3b1458d9b44a697880d2
+folderAsset: yes
+DefaultImporter:
+ externalObjects: {}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.dll b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.dll
new file mode 100644
index 000000000..60b45fba1
Binary files /dev/null and b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.dll differ
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.dll.meta b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.dll.meta
new file mode 100644
index 000000000..60afb28db
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.dll.meta
@@ -0,0 +1,23 @@
+fileFormatVersion: 2
+guid: 178d59b1c5291b64d9a206e568e79b9b
+labels:
+- NuGetForUnity
+PluginImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ iconMap: {}
+ executionOrder: {}
+ defineConstraints: []
+ isPreloaded: 0
+ isOverridable: 0
+ isExplicitlyReferenced: 0
+ validateReferences: 1
+ platformData:
+ - first:
+ Any:
+ second:
+ enabled: 1
+ settings: {}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.xml b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.xml
new file mode 100644
index 000000000..20275dc30
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.xml
@@ -0,0 +1,243 @@
+
+
+
+ System.Threading.Channels
+
+
+
+ Specifies the behavior to use when writing to a bounded channel that is already full.
+
+
+ Removes and ignores the newest item in the channel in order to make room for the item being written.
+
+
+ Removes and ignores the oldest item in the channel in order to make room for the item being written.
+
+
+ Drops the item being written.
+
+
+ Waits for space to be available in order to complete the write operation.
+
+
+ Provides options that control the behavior of bounded instances.
+
+
+ Initializes the options.
+ The maximum number of items the bounded channel may store.
+
+
+ Gets or sets the maximum number of items the bounded channel may store.
+
+
+ Gets or sets the behavior incurred by write operations when the channel is full.
+
+
+ Provides static methods for creating channels.
+
+
+ Creates a channel with the specified maximum capacity.
+ The maximum number of items the channel may store.
+ Specifies the type of data in the channel.
+ The created channel.
+
+
+ Creates a channel with the specified maximum capacity.
+ Options that guide the behavior of the channel.
+ Specifies the type of data in the channel.
+ The created channel.
+
+
+ Creates a channel subject to the provided options.
+ Options that guide the behavior of the channel.
+ Delegate that will be called when item is being dropped from channel. See .
+ Specifies the type of data in the channel.
+ The created channel.
+
+
+ Creates an unbounded channel usable by any number of readers and writers concurrently.
+ The type of data in the channel.
+ The created channel.
+
+
+ Creates an unbounded channel subject to the provided options.
+ Options that guide the behavior of the channel.
+ Specifies the type of data in the channel.
+ The created channel.
+
+
+ Provides a base class for channels that support reading and writing elements of type .
+ Specifies the type of data readable and writable in the channel.
+
+
+ Initializes an instance of the class.
+
+
+ Provides a base class for channels that support reading elements of type and writing elements of type .
+ Specifies the type of data that may be written to the channel.
+ Specifies the type of data that may be read from the channel.
+
+
+ Initializes an instance of the class.
+
+
+ Implicit cast from a to its readable half.
+ The being cast.
+ The readable half.
+
+
+ Implicit cast from a to its writable half.
+ The being cast.
+ The writable half.
+
+
+ Gets the readable half of this channel.
+
+
+ Gets the writable half of this channel.
+
+
+ Exception thrown when a channel is used after it's been closed.
+
+
+ Initializes a new instance of the class.
+
+
+ Initializes a new instance of the class.
+ The exception that is the cause of this exception.
+
+
+ Initializes a new instance of the class with serialized data.
+ The object that holds the serialized object data.
+ The contextual information about the source or destination.
+
+
+ Initializes a new instance of the class.
+ The message that describes the error.
+
+
+ Initializes a new instance of the class.
+ The message that describes the error.
+ The exception that is the cause of this exception.
+
+
+ Provides options that control the behavior of channel instances.
+
+
+ Initializes an instance of the class.
+
+
+
+ if operations performed on a channel may synchronously invoke continuations subscribed to
+ notifications of pending async operations; if all continuations should be invoked asynchronously.
+
+
+
+ readers from the channel guarantee that there will only ever be at most one read operation at a time;
+ if no such constraint is guaranteed.
+
+
+
+ if writers to the channel guarantee that there will only ever be at most one write operation
+ at a time; if no such constraint is guaranteed.
+
+
+ Provides a base class for reading from a channel.
+ Specifies the type of data that may be read from the channel.
+
+
+ Initializes an instance of the class.
+
+
+ Creates an that enables reading all of the data from the channel.
+ The cancellation token to use to cancel the enumeration. If data is immediately ready for reading, then that data may be yielded even after cancellation has been requested.
+ The created async enumerable.
+
+
+ Asynchronously reads an item from the channel.
+ A used to cancel the read operation.
+ A that represents the asynchronous read operation.
+
+
+ Attempts to peek at an item from the channel.
+ The peeked item, or a default value if no item could be peeked.
+
+ if an item was read; otherwise, .
+
+
+ Attempts to read an item from the channel.
+ The read item, or a default value if no item could be read.
+
+ if an item was read; otherwise, .
+
+
+ Returns a that will complete when data is available to read.
+ A used to cancel the wait operation.
+
+ A that will complete with a result when data is available to read
+ or with a result when no further data will ever be available to be read due to the channel completing successfully.
+ If the channel completes with an exception, the task will also complete with an exception.
+
+
+
+ Gets a value that indicates whether is available for use on this instance.
+
+
+ Gets a value that indicates whether is available for use on this instance.
+
+ if peeking is supported by this channel instance; otherwise.
+
+
+ Gets a that completes when no more data will ever
+ be available to be read from this channel.
+
+
+ Gets the current number of items available from this channel reader.
+ Counting is not supported on this instance.
+
+
+ Provides a base class for writing to a channel.
+ Specifies the type of data that may be written to the channel.
+
+
+ Initializes an instance of the class.
+
+
+ Mark the channel as being complete, meaning no more items will be written to it.
+ Optional Exception indicating a failure that's causing the channel to complete.
+ The channel has already been marked as complete.
+
+
+ Attempts to mark the channel as being completed, meaning no more data will be written to it.
+ An indicating the failure causing no more data to be written, or null for success.
+
+ if this operation successfully completes the channel; otherwise, if the channel could not be marked for completion,
+ for example due to having already been marked as such, or due to not supporting completion.
+ .
+
+
+ Attempts to write the specified item to the channel.
+ The item to write.
+
+ if the item was written; otherwise, .
+
+
+ Returns a that will complete when space is available to write an item.
+ A used to cancel the wait operation.
+ A that will complete with a result when space is available to write an item
+ or with a result when no further writing will be permitted.
+
+
+ Asynchronously writes an item to the channel.
+ The value to write to the channel.
+ A used to cancel the write operation.
+ A that represents the asynchronous write operation.
+
+
+ Provides options that control the behavior of unbounded instances.
+
+
+ Initializes a new instance of the class.
+
+
+
\ No newline at end of file
diff --git a/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.xml.meta b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.xml.meta
new file mode 100644
index 000000000..17d99b9e3
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Plugins/System.Threading.Channels/8.0.0/lib/netstandard2.1/System.Threading.Channels.xml.meta
@@ -0,0 +1,7 @@
+fileFormatVersion: 2
+guid: 3a999c3e0c15371408a73b75c174eeac
+TextScriptImporter:
+ externalObjects: {}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientBuilder.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientBuilder.cs
index ab907ade1..8db75bc3b 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientBuilder.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientBuilder.cs
@@ -49,6 +49,7 @@ public static AssemblyBuilder Save()
internal
#endif
static class DynamicStreamingHubClientBuilder
+ where TStreamingHub : IStreamingHub
{
public static readonly Type ClientType;
// static readonly Type ClientFireAndForgetType;
@@ -133,9 +134,10 @@ static void VerifyMethodDefinitions(MethodDefinition[] definitions)
if (returnTypeNonGenericOrOpenGeneric != typeof(ValueTask) &&
returnTypeNonGenericOrOpenGeneric != typeof(Task) &&
returnTypeNonGenericOrOpenGeneric != typeof(ValueTask<>) &&
- returnTypeNonGenericOrOpenGeneric != typeof(Task<>))
+ returnTypeNonGenericOrOpenGeneric != typeof(Task<>) &&
+ returnTypeNonGenericOrOpenGeneric != typeof(void))
{
- throw new Exception($"Invalid definition, TStreamingHub's return type must only be `Task`, `Task`, `ValueTask` or `ValueTask`. {item.MethodInfo.Name}.");
+ throw new Exception($"Invalid definition, TStreamingHub's return type must only be `void`, `Task`, `Task`, `ValueTask` or `ValueTask`. {item.MethodInfo.Name}.");
}
item.MethodId = methodId;
@@ -148,20 +150,18 @@ static void VerifyMethodDefinitions(MethodDefinition[] definitions)
static FieldInfo DefineConstructor(TypeBuilder typeBuilder, Type interfaceType, Type receiverType, ConstructorInfo fireAndForgetClientCtor)
{
- // .ctor(CallInvoker callInvoker, string host, CallOptions option, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger) :base(...)
+ // .ctor(TReceiver receiver, CallInvoker callInvoker, StreamingHubClientOptions options) : base("InterfaceName", receiver, callInvoker, options)
{
- var argTypes = new[] { typeof(CallInvoker), typeof(string), typeof(CallOptions), typeof(IMagicOnionSerializerProvider), typeof(IMagicOnionClientLogger) };
+ var argTypes = new[] { receiverType, typeof(CallInvoker), typeof(StreamingHubClientOptions) };
var ctor = typeBuilder.DefineConstructor(MethodAttributes.Public, CallingConventions.Standard, argTypes);
var il = ctor.GetILGenerator();
- // base(serviceName, callInvoker, host, option, serializerProvider, logger);
+ // base("InterfaceName", receiver, callInvoker, options);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldstr, interfaceType.Name);
- il.Emit(OpCodes.Ldarg_1);
- il.Emit(OpCodes.Ldarg_2);
- il.Emit(OpCodes.Ldarg_3);
- il.Emit(OpCodes.Ldarg_S, (byte)4);
- il.Emit(OpCodes.Ldarg_S, (byte)5);
+ il.Emit(OpCodes.Ldarg_1); // receiver
+ il.Emit(OpCodes.Ldarg_2); // callInvoker
+ il.Emit(OpCodes.Ldarg_3); // options
il.Emit(OpCodes.Call, typeBuilder.BaseType!
.GetConstructors(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance).First());
@@ -240,13 +240,14 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
// receiver types borrow from DynamicBroadcastBuilder
{
- // protected abstract void OnResponseEvent(int methodId, object taskCompletionSource, ArraySegment data);
+ // protected abstract void OnResponseEvent(int methodId, object taskCompletionSource, ReadOnlyMemory data);
{
var method = typeBuilder.DefineMethod("OnResponseEvent", MethodAttributes.Public | MethodAttributes.Final | MethodAttributes.Virtual,
- null, new[] { typeof(int), typeof(object), typeof(ArraySegment) });
+ null, new[] { typeof(int), typeof(object), typeof(ReadOnlyMemory) });
var il = method.GetILGenerator();
var labels = definitions
+ .Where(x => x.MethodInfo.ReturnType != typeof(void)) // If the return type if `void`, we always need to treat as fire-and-forget.
.Select(x => new { def = x, label = il.DefineLabel() })
.ToArray();
@@ -264,6 +265,7 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
{
// SetResultForResponse(taskCompletionSource, data);
Type responseType;
+
if (item.def.MethodInfo.ReturnType == typeof(Task) || item.def.MethodInfo.ReturnType == typeof(ValueTask))
{
// Task methods uses TaskCompletionSource
@@ -285,13 +287,13 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
il.Emit(OpCodes.Ret);
}
}
- // protected abstract void OnBroadcastEvent(int methodId, ArraySegment data);
+ // protected abstract void OnBroadcastEvent(int methodId, ReadOnlyMemory data);
{
var methodDefinitions = BroadcasterHelper.SearchDefinitions(receiverType);
BroadcasterHelper.VerifyMethodDefinitions(methodDefinitions);
var method = typeBuilder.DefineMethod("OnBroadcastEvent", MethodAttributes.Public | MethodAttributes.Final | MethodAttributes.Virtual,
- typeof(void), new[] { typeof(int), typeof(ArraySegment) });
+ typeof(void), new[] { typeof(int), typeof(ReadOnlyMemory) });
var il = method.GetILGenerator();
var labels = methodDefinitions
@@ -314,7 +316,6 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
// var value = Deserialize>(data);
// receiver.OnMessage(value.Item1, value.Item2);
- var deserializeMethod = baseType.GetMethod("Deserialize", BindingFlags.Instance | BindingFlags.NonPublic)!;
var parameters = item.def.MethodInfo.GetParameters();
if (parameters.Length == 0)
{
@@ -332,7 +333,7 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
// this.Deserialize(data)
il.Emit(OpCodes.Ldarg_0); // this
il.Emit(OpCodes.Ldarg_2); // data
- il.Emit(OpCodes.Call, deserializeMethod.MakeGenericMethod(parameters[0].ParameterType));
+ il.Emit(OpCodes.Call, MethodInfoCache.StreamingHubClientBase_Deserialize.MakeGenericMethod(parameters[0].ParameterType));
}
il.Emit(OpCodes.Callvirt, item.def.MethodInfo);
}
@@ -346,7 +347,7 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
{
il.Emit(OpCodes.Ldarg_0); // this
il.Emit(OpCodes.Ldarg_2); // data
- il.Emit(OpCodes.Call, deserializeMethod.MakeGenericMethod(deserializeType));
+ il.Emit(OpCodes.Call, MethodInfoCache.StreamingHubClientBase_Deserialize.MakeGenericMethod(deserializeType));
il.Emit(OpCodes.Stloc, lc);
}
@@ -365,6 +366,162 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
}
}
}
+ // protected abstract void OnClientResultEvent(int methodId, Guid messageId, ReadOnlyMemory data);
+ {
+ var methodDefinitions = BroadcasterHelper.SearchDefinitions(receiverType);
+ BroadcasterHelper.VerifyMethodDefinitions(methodDefinitions);
+
+ var method = typeBuilder.DefineMethod("OnClientResultEvent", MethodAttributes.Public | MethodAttributes.Final | MethodAttributes.Virtual,
+ typeof(void), new[] { typeof(int), typeof(Guid), typeof(ReadOnlyMemory) });
+ {
+ var il = method.GetILGenerator();
+ var localEx = il.DeclareLocal(typeof(Exception));
+
+ var clientResultMethods = methodDefinitions.Where(x => x.IsClientResult)
+ .Select(x => (Label: il.DefineLabel(), Method: x)).ToArray();
+ var labelReturn = il.DefineLabel();
+
+ var localCtDefault = il.DeclareLocal(typeof(CancellationToken));
+ {
+ // var ctDefault = default(CancellationToken);
+ il.Emit(OpCodes.Ldloca_S, localCtDefault!);
+ il.Emit(OpCodes.Initobj, typeof(CancellationToken));
+ }
+
+
+ // try {
+ il.BeginExceptionBlock();
+ foreach (var (labelMethodBlock, methodClientResult) in clientResultMethods)
+ {
+ // if (methodId == ...) goto label;
+ il.Emit(OpCodes.Ldarg_1); // methodId
+ il.Emit(OpCodes.Ldc_I4, methodClientResult.MethodId);
+ il.Emit(OpCodes.Beq, labelMethodBlock);
+ }
+ // goto Return;
+ il.Emit(OpCodes.Leave, labelReturn);
+
+ foreach (var (labelMethodBlock, methodClientResult) in clientResultMethods)
+ {
+ var parameters = methodClientResult.MethodInfo.GetParameters().Where(x => x.ParameterType != typeof(CancellationToken)).ToArray();
+ var deserializeType = parameters.Length switch
+ {
+ 0 => typeof(MessagePack.Nil),
+ 1 => parameters[0].ParameterType,
+ _ => BroadcasterHelper.DynamicArgumentTupleTypes[parameters.Length - 2].MakeGenericType(parameters.Select(x => x.ParameterType).ToArray())
+ };
+
+ // Method:
+ // {
+ // var local_0 = base.Deserialize(data);
+ // var task = this.SomeMethod(local0.Item1, local0.Item2);
+ // base.AwaitAndWriteClientResultResponseMessage(methodId, messageId, localTask);
+ // return;
+ // }
+ // break;
+
+ // Method:
+ il.MarkLabel(labelMethodBlock);
+ // var local0 = base.Deserialize(data);
+ il.Emit(OpCodes.Ldarg_0); // base
+ il.Emit(OpCodes.Ldarg_3); // data
+ il.Emit(OpCodes.Call, MethodInfoCache.StreamingHubClientBase_Deserialize.MakeGenericMethod(deserializeType));
+ var local0 = il.DeclareLocal(deserializeType);
+ il.Emit(OpCodes.Stloc_S, local0);
+
+ // var task = receiver.SomeMethod(local0.Item1, local0.Item2);
+ il.Emit(OpCodes.Ldarg_0); // receiver
+ il.Emit(OpCodes.Ldfld, receiverField);
+
+ if (parameters.Length == 0)
+ {
+ foreach (var p in methodClientResult.MethodInfo.GetParameters())
+ {
+ // default(CancellationToken)
+ il.Emit(OpCodes.Ldloca_S, localCtDefault!);
+ il.Emit(OpCodes.Initobj, typeof(CancellationToken));
+ il.Emit(OpCodes.Ldloc_S, localCtDefault!);
+ }
+ }
+ else if (parameters.Length == 1)
+ {
+ foreach (var p in methodClientResult.MethodInfo.GetParameters())
+ {
+ if (p.ParameterType == typeof(CancellationToken))
+ {
+ // default(CancellationToken)
+ il.Emit(OpCodes.Ldloca_S, localCtDefault!);
+ il.Emit(OpCodes.Initobj, typeof(CancellationToken));
+ il.Emit(OpCodes.Ldloc_S, localCtDefault!);
+ }
+ else if (p == parameters[0])
+ {
+ // local0
+ il.Emit(OpCodes.Ldloc_S, local0);
+ }
+ else
+ {
+ throw new InvalidOperationException();
+ }
+ }
+ }
+ else
+ {
+ var itemIndex = 1;
+ foreach (var p in methodClientResult.MethodInfo.GetParameters())
+ {
+ if (p.ParameterType == typeof(CancellationToken))
+ {
+ // default(CancellationToken)
+ il.Emit(OpCodes.Ldloca_S, localCtDefault!);
+ il.Emit(OpCodes.Initobj, typeof(CancellationToken));
+ il.Emit(OpCodes.Ldloc_S, localCtDefault!);
+ }
+ else
+ {
+ // local0.ItemX
+ il.Emit(OpCodes.Ldloc_S, local0);
+ il.Emit(OpCodes.Ldfld, deserializeType.GetField($"Item{itemIndex}")!);
+ itemIndex++;
+ }
+ }
+ }
+ il.Emit(OpCodes.Callvirt, methodClientResult.MethodInfo);
+
+ // var localTask = task;
+ var localTask = il.DeclareLocal(methodClientResult.MethodInfo.ReturnType);
+ il.Emit(OpCodes.Stloc_S, localTask);
+
+ // base.AwaitAndWriteClientResultResponseMessage(methodId, messageId, localTask);
+ il.Emit(OpCodes.Ldarg_0); // base
+ il.Emit(OpCodes.Ldarg_1); // methodId
+ il.Emit(OpCodes.Ldarg_2); // messageId
+ il.Emit(OpCodes.Ldloc_S, localTask);
+ il.Emit(OpCodes.Call, MethodInfoCache.GetStreamingHubClientBase_AwaitAndWriteClientResultResponseMessage(methodClientResult.MethodInfo.ReturnType));
+
+ il.Emit(OpCodes.Leave, labelReturn);
+ }
+ // } catch (Exception ex) {
+ il.BeginCatchBlock(typeof(Exception));
+ il.Emit(OpCodes.Stloc_S, localEx);
+ {
+ // base.WriteClientResultResponseMessageForError(methodId, messageId, ex);
+ il.Emit(OpCodes.Ldarg_0); // base
+ il.Emit(OpCodes.Ldarg_1); // methodId
+ il.Emit(OpCodes.Ldarg_2); // messageId
+ il.Emit(OpCodes.Ldloc_S, localEx); // ex
+ il.Emit(OpCodes.Call, MethodInfoCache.StreamingHubClientBase_WriteClientResultResponseMessageForError);
+ il.Emit(OpCodes.Leave, labelReturn);
+ }
+ il.EndExceptionBlock();
+ // }
+
+ // Return:
+ // return;
+ il.MarkLabel(labelReturn);
+ il.Emit(OpCodes.Ret);
+ }
+ }
// Proxy Methods
for (int i = 0; i < definitions.Length; i++)
@@ -413,13 +570,15 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
if (def.MethodInfo.ReturnType == typeof(Task) || def.MethodInfo.ReturnType == typeof(ValueTask))
{
- var mInfo = baseType.GetMethod("WriteMessageWithResponseAsync", BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)!;
- il.Emit(OpCodes.Callvirt, mInfo.MakeGenericMethod(callType, typeof(Nil)));
+ il.Emit(OpCodes.Callvirt, MethodInfoCache.StreamingHubClientBase_WriteMessageWithResponseAsync.MakeGenericMethod(callType, typeof(Nil)));
+ }
+ else if (def.MethodInfo.ReturnType == typeof(void))
+ {
+ il.Emit(OpCodes.Callvirt, MethodInfoCache.StreamingHubClientBase_WriteMessageFireAndForgetAsync.MakeGenericMethod(callType, typeof(Nil)));
}
else
{
- var mInfo = baseType.GetMethod("WriteMessageWithResponseAsync", BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)!;
- il.Emit(OpCodes.Callvirt, mInfo.MakeGenericMethod(callType, def.MethodInfo.ReturnType.GetGenericArguments()[0]));
+ il.Emit(OpCodes.Callvirt, MethodInfoCache.StreamingHubClientBase_WriteMessageWithResponseAsync.MakeGenericMethod(callType, def.MethodInfo.ReturnType.GetGenericArguments()[0]));
}
// If the return type is `ValueTask`, the task must be wrapped as ValueTask.
@@ -432,6 +591,10 @@ static void DefineMethods(TypeBuilder typeBuilder, Type interfaceType, Type rece
var returnTypeOfT = def.MethodInfo.ReturnType.GetGenericArguments()[0];
il.Emit(OpCodes.Newobj, typeof(ValueTask<>).MakeGenericType(returnTypeOfT).GetConstructor(new [] { typeof(Task<>).MakeGenericType(returnTypeOfT) })!);
}
+ else if (def.MethodInfo.ReturnType == typeof(void))
+ {
+ il.Emit(OpCodes.Pop);
+ }
il.Emit(OpCodes.Ret);
}
@@ -515,7 +678,7 @@ static void DefineMethodsFireAndForget(TypeBuilder typeBuilder, Type interfaceTy
}
Type responseType;
- if (def.MethodInfo.ReturnType == typeof(Task) || def.MethodInfo.ReturnType == typeof(ValueTask))
+ if (def.MethodInfo.ReturnType == typeof(Task) || def.MethodInfo.ReturnType == typeof(ValueTask) || def.MethodInfo.ReturnType == typeof(void))
{
responseType = typeof(Nil);
}
@@ -538,11 +701,58 @@ static void DefineMethodsFireAndForget(TypeBuilder typeBuilder, Type interfaceTy
var returnTypeOfT = def.MethodInfo.ReturnType.GetGenericArguments()[0];
il.Emit(OpCodes.Newobj, typeof(ValueTask<>).MakeGenericType(returnTypeOfT).GetConstructor(new [] { typeof(Task<>).MakeGenericType(returnTypeOfT) })!);
}
+ else if (def.MethodInfo.ReturnType == typeof(void))
+ {
+ il.Emit(OpCodes.Pop);
+ }
il.Emit(OpCodes.Ret);
}
}
+ static class MethodInfoCache
+ {
+ // ReSharper disable StaticMemberInGenericType
+ // ReSharper disable InconsistentNaming
+#pragma warning disable IDE1006 // Naming Styles
+ public static readonly MethodInfo StreamingHubClientBase_Deserialize
+ = typeof(StreamingHubClientBase).GetMethod("Deserialize", BindingFlags.Instance | BindingFlags.NonPublic)!;
+
+ static readonly MethodInfo StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_Task;
+ static readonly MethodInfo StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_TaskOfT;
+ static readonly MethodInfo StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_ValueTask;
+ static readonly MethodInfo StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_ValueTaskOfT;
+
+ public static MethodInfo GetStreamingHubClientBase_AwaitAndWriteClientResultResponseMessage(Type t)
+ => t == typeof(Task) ? StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_Task
+ : t == typeof(ValueTask) ? StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_ValueTask
+ : t.GetGenericTypeDefinition() == typeof(Task<>) ? StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_TaskOfT.MakeGenericMethod(t.GetGenericArguments())
+ : t.GetGenericTypeDefinition() == typeof(ValueTask<>) ? StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_ValueTaskOfT.MakeGenericMethod(t.GetGenericArguments())
+ : throw new InvalidOperationException();
+
+ public static readonly MethodInfo StreamingHubClientBase_WriteClientResultResponseMessageForError
+ = typeof(StreamingHubClientBase).GetMethod("WriteClientResultResponseMessageForError", BindingFlags.NonPublic | BindingFlags.Instance)!;
+ public static readonly MethodInfo StreamingHubClientBase_WriteMessageWithResponseAsync
+ = typeof(StreamingHubClientBase).GetMethod("WriteMessageWithResponseAsync", BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)!;
+ public static readonly MethodInfo StreamingHubClientBase_WriteMessageFireAndForgetAsync
+ = typeof(StreamingHubClientBase).GetMethod("WriteMessageFireAndForgetAsync", BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance)!;
+ // ReSharper restore StaticMemberInGenericType
+ // ReSharper restore InconsistentNaming
+#pragma warning restore IDE1006 // Naming Styles
+
+ static MethodInfoCache()
+ {
+ var methodsAwaitAndWriteClientResultResponseMessage = typeof(StreamingHubClientBase)
+ .GetMethods(BindingFlags.NonPublic | BindingFlags.Instance)
+ .Where(x => x.Name == "AwaitAndWriteClientResultResponseMessage")
+ .ToArray();
+
+ StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_Task = methodsAwaitAndWriteClientResultResponseMessage.Single(x => x.GetParameters()[2].ParameterType == typeof(Task));
+ StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_TaskOfT = methodsAwaitAndWriteClientResultResponseMessage.Single(x => x.GetParameters()[2].ParameterType is { IsGenericType: true } paramType && paramType.GetGenericTypeDefinition() == typeof(Task<>));
+ StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_ValueTask = methodsAwaitAndWriteClientResultResponseMessage.Single(x => x.GetParameters()[2].ParameterType == typeof(ValueTask));
+ StreamingHubClientBase_AwaitAndWriteClientResultResponseMessage_ValueTaskOfT = methodsAwaitAndWriteClientResultResponseMessage.Single(x => x.GetParameters()[2].ParameterType is { IsGenericType: true } paramType && paramType.GetGenericTypeDefinition() == typeof(ValueTask<>));
+ }
+ }
class MethodDefinition
{
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientFactoryProvider.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientFactoryProvider.cs
index 59256a5a0..6f82872fa 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientFactoryProvider.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/DynamicClient/DynamicStreamingHubClientFactoryProvider.cs
@@ -31,8 +31,7 @@ public bool TryGetFactory([NotNullWhen(true)] out Stre
static class Cache where TStreamingHub : IStreamingHub
{
public static readonly StreamingHubClientFactoryDelegate Factory
- = (callInvoker, receiver, host, callOptions, serializerProvider, logger)
- => (TStreamingHub)Activator.CreateInstance(DynamicStreamingHubClientBuilder.ClientType, callInvoker, host, callOptions, serializerProvider, logger)!;
+ = (receiver, callInvoker, options) => (TStreamingHub)Activator.CreateInstance(DynamicStreamingHubClientBuilder.ClientType, receiver, callInvoker, options)!;
}
}
#endif
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/BroadcasterHelper.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/BroadcasterHelper.cs
index ceaa5b6f8..eab0c5c9d 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/BroadcasterHelper.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/BroadcasterHelper.cs
@@ -58,10 +58,10 @@ internal static void VerifyMethodDefinitions(MethodDefinition[] definitions)
}
map.Add(methodId, item);
- if (!(item.MethodInfo.ReturnType == typeof(void)))
- {
- throw new Exception($"Invalid definition, TReceiver's return type must only be `void`. {item.MethodInfo.Name}.");
- }
+ //if (!(item.MethodInfo.ReturnType == typeof(void)))
+ //{
+ // throw new Exception($"Invalid definition, TReceiver's return type must only be `void`. {item.MethodInfo.Name}.");
+ //}
item.MethodId = methodId;
}
@@ -74,12 +74,14 @@ internal class MethodDefinition
public Type ReceiverType { get; set; }
public MethodInfo MethodInfo { get; set; }
public int MethodId { get; set; }
+ public bool IsClientResult { get; set; }
public MethodDefinition(Type receiverType, MethodInfo methodInfo, int methodId)
{
ReceiverType = receiverType;
MethodInfo = methodInfo;
MethodId = methodId;
+ IsClientResult = methodInfo.ReturnType != typeof(void);
}
}
}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs
index db49478bb..8c5806321 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/ArrayPoolBufferWriter.cs
@@ -1,5 +1,6 @@
using System;
using System.Buffers;
+using System.Diagnostics;
namespace MagicOnion.Internal.Buffers
{
@@ -15,7 +16,14 @@ public static ArrayPoolBufferWriter RentThreadStaticWriter()
staticInstance = new ArrayPoolBufferWriter();
}
staticInstance.Prepare();
+
+#if DEBUG
+ var currentInstance = staticInstance;
+ staticInstance = null;
+ return currentInstance;
+#else
return staticInstance;
+#endif
}
const int MinimumBufferSize = 32767; // use 32k buffer.
@@ -96,6 +104,11 @@ public void Dispose()
ArrayPool.Shared.Return(buffer);
buffer = null;
+
+#if DEBUG
+ Debug.Assert(staticInstance is null);
+ staticInstance = this;
+#endif
}
}
}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/MemoryPoolBufferWriter.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/MemoryPoolBufferWriter.cs
new file mode 100644
index 000000000..2666b3aba
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/MemoryPoolBufferWriter.cs
@@ -0,0 +1,69 @@
+using System;
+using System.Buffers;
+
+namespace MagicOnion.Internal.Buffers
+{
+ public class MemoryPoolBufferWriter : IBufferWriter
+ {
+ readonly MemoryPool memoryPool;
+ IMemoryOwner? buffer;
+ int written;
+
+ [ThreadStatic]
+ static MemoryPoolBufferWriter? shared;
+ public static MemoryPoolBufferWriter RentThreadStaticWriter() => shared ??= new MemoryPoolBufferWriter(MemoryPool.Shared);
+
+ public MemoryPoolBufferWriter(MemoryPool memoryPool)
+ {
+ this.memoryPool = memoryPool;
+ this.buffer = null;
+ this.written = 0;
+ }
+
+ public void Advance(int count)
+ {
+ written += count;
+ }
+
+ public Memory GetMemory(int sizeHint = 0)
+ {
+ if (buffer != null && (buffer.Memory.Length - written) > sizeHint)
+ {
+ return buffer.Memory.Slice(written);
+ }
+ else
+ {
+ if (buffer == null)
+ {
+ // New
+ buffer = memoryPool.Rent(sizeHint > 0 ? sizeHint : 32767);
+ }
+ else
+ {
+ // Grow
+ var oldBuffer = buffer;
+ var newBuffer = memoryPool.Rent(buffer.Memory.Length * 2);
+
+ oldBuffer.Memory.Slice(0, written).CopyTo(newBuffer.Memory);
+ oldBuffer.Dispose();
+
+ buffer = newBuffer;
+ }
+ return buffer.Memory.Slice(written);
+ }
+ }
+
+ public Span GetSpan(int sizeHint = 0)
+ {
+ return GetMemory(sizeHint).Span;
+ }
+
+ public (IMemoryOwner Owner, int Written) ToMemoryOwnerAndReturn()
+ {
+ var result = (buffer ?? memoryPool.Rent(0), written);
+ written = 0;
+ buffer = null;
+ return result;
+ }
+ }
+}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/MemoryPoolBufferWriter.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/MemoryPoolBufferWriter.cs.meta
new file mode 100644
index 000000000..fa5ec2709
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/Buffers/MemoryPoolBufferWriter.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: c58e159a2bd5bed469aa95ac4ef89613
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/MagicOnionMarshallers.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/MagicOnionMarshallers.cs
index 6e93ee430..af9233361 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/MagicOnionMarshallers.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/MagicOnionMarshallers.cs
@@ -1,6 +1,7 @@
using Grpc.Core;
using MessagePack;
using System;
+using System.Buffers;
using System.Linq;
using System.Reflection;
@@ -15,7 +16,21 @@ internal static class MagicOnionMarshallers
.OrderBy(x => x.GetGenericArguments().Length)
.ToArray();
- public static readonly Marshaller ThroughMarshaller = new Marshaller(x => x, x => x);
+ internal static Marshaller StreamingHubMarshaller { get; } = new(
+ serializer: static (payload, context) =>
+ {
+ context.SetPayloadLength(payload.Length);
+ var bufferWriter = context.GetBufferWriter();
+ payload.Span.CopyTo(bufferWriter.GetSpan(payload.Length));
+ bufferWriter.Advance(payload.Length);
+ context.Complete();
+ StreamingHubPayloadPool.Shared.Return(payload);
+ },
+ deserializer: static context =>
+ {
+ return StreamingHubPayloadPool.Shared.RentOrCreate(context.PayloadAsReadOnlySequence());
+ }
+ );
internal static Type CreateRequestType(ParameterInfo[] parameters)
{
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs
new file mode 100644
index 000000000..247caec1f
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs
@@ -0,0 +1,86 @@
+using System;
+using MessagePack;
+
+namespace MagicOnion.Internal
+{
+ internal ref struct StreamingHubClientMessageReader
+ {
+ readonly ReadOnlyMemory data;
+ MessagePackReader reader;
+
+ public StreamingHubClientMessageReader(ReadOnlyMemory data)
+ {
+ this.data = data;
+ this.reader = new MessagePackReader(data);
+ }
+
+ public StreamingHubMessageType ReadMessageType()
+ {
+ var arrayLength = this.reader.ReadArrayHeader();
+ return arrayLength switch
+ {
+ 2 => StreamingHubMessageType.Broadcast,
+ 3 => StreamingHubMessageType.Response,
+ 4 => StreamingHubMessageType.ResponseWithError,
+ 5 => reader.ReadByte() switch
+ {
+ 0x00 /* 0:ClientResultRequest */ => StreamingHubMessageType.ClientResultRequest,
+ 0x7f /* 127:Heartbeat */ => StreamingHubMessageType.Heartbeat,
+ var x => throw new InvalidOperationException($"Unknown Type: {x}"),
+ },
+ _ => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"),
+ };
+ }
+
+ public (int MethodId, int Cosumed) ReadBroadcastMessageMethodId()
+ {
+ return (reader.ReadInt32(), (int)reader.Consumed);
+ }
+
+ public (int MethodId, ReadOnlyMemory Body) ReadBroadcastMessage()
+ {
+ var methodId = reader.ReadInt32();
+ var offset = (int)reader.Consumed;
+ return (methodId, data.Slice(offset));
+ }
+
+ public (int MessageId, int MethodId, ReadOnlyMemory Body) ReadResponseMessage()
+ {
+ var messageId = reader.ReadInt32();
+ var methodId = reader.ReadInt32();
+ var offset = (int)reader.Consumed;
+ return (messageId, methodId, data.Slice(offset));
+ }
+
+ public (int MessageId, int StatusCode, string? Detail, string? Error) ReadResponseWithErrorMessage()
+ {
+ var messageId = reader.ReadInt32();
+ var statusCode = reader.ReadInt32();
+ var detail = reader.ReadString();
+ var error = reader.ReadString();
+
+ return (messageId, statusCode, detail, error);
+ }
+
+ public (Guid ClientResultRequestMessageId, int MethodId, ReadOnlyMemory Body) ReadClientResultRequestMessage()
+ {
+ //var type = reader.ReadByte(); // Type is already read by ReadMessageType
+ reader.Skip(); // Dummy
+ var clientRequestMessageId = MessagePackSerializer.Deserialize(ref reader);
+ var methodId = reader.ReadInt32();
+ var offset = (int)reader.Consumed;
+
+ return (clientRequestMessageId, methodId, data.Slice(offset));
+ }
+
+ public ReadOnlyMemory ReadHeartbeat()
+ {
+ //var type = reader.ReadByte(); // Type is already read by ReadMessageType
+ reader.Skip(); // Dummy (1)
+ reader.Skip(); // Dummy (2)
+ reader.Skip(); // Dummy (3)
+
+ return data.Slice((int)reader.Consumed);
+ }
+ }
+}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs.meta
new file mode 100644
index 000000000..3f9896648
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: 84187af9e9bbd9b47ad00a2e646a3862
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs
new file mode 100644
index 000000000..93832281e
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs
@@ -0,0 +1,250 @@
+using System;
+using System.Buffers;
+using System.Runtime.CompilerServices;
+using MagicOnion.Serialization;
+using MessagePack;
+
+namespace MagicOnion.Internal
+{
+ ///
+ /// StreamingHub message formats (from Server to Client):
+ ///
+ /// -
+ /// Response: InvokeHubMethod (from server to client)
+ /// Array(3): [MessageId(int), MethodId(int), SerializedResponse]
+ ///
+ /// -
+ /// Response: InvokeHubMethod (from server to client; with Exception)
+ /// Array(4): [MessageId(int), StatusCode(int), Detail(string), Message(string)]
+ ///
+ /// -
+ /// Broadcast: from server to client
+ /// Array(2): [MethodId(int), SerializedArgument]
+ ///
+ /// -
+ /// ClientInvoke/Request: InvokeClientMethod (from server to client)
+ /// Array(5): [Type=0x00, Nil, ClientResultMessageId(Guid), MethodId(int), SerializedArguments]
+ ///
+ /// -
+ /// Heartbeat:
+ /// Array(5): [Type=0x7f, Nil, Nil, Nil, Extras]
+ ///
+ ///
+ /// StreamingHub message formats (from Client to Server):
+ ///
+ /// -
+ /// Request: InvokeHubMethod (from client; void; fire-and-forget)
+ /// Array(2): [MethodId(int), SerializedArguments]
+ ///
+ /// -
+ /// Request: InvokeHubMethod (from client; non-void)
+ /// Array(3): [MessageId(int), MethodId(int), SerializedArguments]
+ ///
+ /// -
+ /// ClientInvoke/Response: InvokeClientMethod (from client to server)
+ /// Array(4): [Type=0x00, ClientResultMessageId(Guid), MethodId(int), SerializedResponse]
+ ///
+ /// -
+ /// ClientInvoke/Response: InvokeClientMethod (from client to server; with Exception)
+ /// Array(4): [Type=0x01, ClientResultMessageId(Guid), MethodId(int), [StatusCode(int), Detail(string), Message(string)]]
+ ///
+ /// -
+ /// Heartbeat/Response:
+ /// Array(4): [Type=0x7f, Nil, Nil, Nil]
+ ///
+ ///
+ ///
+ internal static class StreamingHubMessageWriter
+ {
+ ///
+ /// Writes a broadcast message of Hub method.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteBroadcastMessage(IBufferWriter bufferWriter, int methodId, T value, IMagicOnionSerializer messageSerializer)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(2);
+ writer.Write(methodId);
+ writer.Flush();
+ messageSerializer.Serialize(bufferWriter, value);
+ }
+
+ ///
+ /// Writes a request message of Hub method.
+ ///
+ public static void WriteRequestMessageVoid(IBufferWriter bufferWriter, int methodId, T value, IMagicOnionSerializer messageSerializer)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(2);
+ writer.Write(methodId);
+ writer.Flush();
+ messageSerializer.Serialize(bufferWriter, value);
+ }
+
+ ///
+ /// Writes a request message of Hub method.
+ ///
+ public static void WriteRequestMessage(IBufferWriter bufferWriter, int methodId, int messageId, T value, IMagicOnionSerializer messageSerializer)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(3);
+ writer.Write(messageId);
+ writer.Write(methodId);
+ writer.Flush();
+ messageSerializer.Serialize(bufferWriter, value);
+ }
+
+ ///
+ /// Writes an empty response message of Hub method.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteResponseMessage(IBufferWriter bufferWriter, int methodId, int messageId)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(3);
+ writer.Write(messageId);
+ writer.Write(methodId);
+ writer.WriteNil();
+ writer.Flush();
+ }
+
+ ///
+ /// Writes a response message of Hub method.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteResponseMessage(IBufferWriter bufferWriter, int methodId, int messageId, T v, IMagicOnionSerializer messageSerializer)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(3);
+ writer.Write(messageId);
+ writer.Write(methodId);
+ writer.Flush();
+ messageSerializer.Serialize(bufferWriter, v);
+ }
+
+ ///
+ /// Write an error response message of Hub method.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteResponseMessageForError(IBufferWriter bufferWriter, int messageId, int statusCode, string detail, Exception? ex, bool isReturnExceptionStackTraceInErrorDetail)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(4);
+ writer.Write(messageId);
+ writer.Write(statusCode);
+ writer.Write(detail);
+
+ var msg = (isReturnExceptionStackTraceInErrorDetail && ex != null)
+ ? ex.ToString()
+ : null;
+
+ writer.Write(msg);
+ writer.Flush();
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteClientResultRequestMessage(IBufferWriter bufferWriter, int methodId, Guid messageId, T request, IMagicOnionSerializer messageSerializer)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(5);
+ writer.Write(0); // Type = ClientResultRequest (0)
+ writer.WriteNil(); // Dummy
+ MessagePackSerializer.Serialize(ref writer, messageId);
+ writer.Write(methodId);
+ writer.Flush();
+ messageSerializer.Serialize(bufferWriter, request);
+ }
+
+ ///
+ /// Writes a response message for client result.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteClientResultResponseMessage(IBufferWriter bufferWriter, int methodId, Guid messageId, T response, IMagicOnionSerializer messageSerializer)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(4);
+ writer.Write(0); // Result = 0 (success)
+ MessagePackSerializer.Serialize(ref writer, messageId);
+ writer.Write(methodId);
+ writer.Flush();
+ messageSerializer.Serialize(bufferWriter, response);
+ }
+
+ ///
+ /// Writes an error response message for client result.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteClientResultResponseMessageForError(IBufferWriter bufferWriter, int methodId, Guid messageId, int statusCode, string detail, Exception? ex, IMagicOnionSerializer messageSerializer)
+ {
+ var writer = new MessagePackWriter(bufferWriter);
+ writer.WriteArrayHeader(4);
+ writer.Write(1); // Result = 1 (failed)
+ MessagePackSerializer.Serialize(ref writer, messageId);
+ writer.Write(methodId);
+
+ writer.WriteArrayHeader(3);
+ {
+ writer.Write(statusCode);
+ writer.Write(detail);
+ writer.Write(ex?.ToString());
+ }
+ writer.Flush();
+ }
+
+
+ // Array(5)[127, Nil, Nil, Nil, ]
+ static ReadOnlySpan HeartbeatMessageForServerToClientHeader => new byte[] { 0x95, 0x7f, 0xc0, 0xc0, 0xc0 };
+
+ ///
+ /// Writes a heartbeat message for sending from the server.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteHeartbeatMessageForServerToClientHeader(IBufferWriter bufferWriter)
+ {
+ bufferWriter.Write(HeartbeatMessageForServerToClientHeader);
+ //var writer = new MessagePackWriter(bufferWriter);
+ //writer.WriteArrayHeader(5);
+ //writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
+ //writer.WriteNil(); // Dummy
+ //writer.WriteNil(); // Dummy
+ //writer.WriteNil(); // Dummy
+ //writer.Flush();
+ }
+
+ // Array(4)[127, Nil, Nil, Nil]
+ static ReadOnlySpan HeartbeatMessageForClientToServer => new byte[] { 0x94, 0x7f, 0xc0, 0xc0, 0xc0 };
+
+ ///
+ /// Writes a heartbeat message for sending from the client.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static void WriteHeartbeatMessageForClientToServer(IBufferWriter bufferWriter)
+ {
+ bufferWriter.Write(HeartbeatMessageForClientToServer);
+ //var writer = new MessagePackWriter(bufferWriter);
+ //writer.WriteArrayHeader(4);
+ //writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
+ //writer.WriteNil(); // Dummy
+ //writer.WriteNil(); // Dummy
+ //writer.WriteNil(); // Dummy
+ //writer.Flush();
+ }
+ }
+
+ internal enum StreamingHubMessageType
+ {
+ // Client to Server
+ Request,
+ RequestFireAndForget,
+ Response,
+ ResponseWithError,
+ HeartbeatResponse,
+
+ // Server to Client
+ Broadcast,
+ ClientResultRequest,
+ ClientResultResponse,
+ ClientResultResponseWithError,
+ Heartbeat,
+ }
+}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs.meta
new file mode 100644
index 000000000..16a91124e
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: f412c8c0f99f39344a9188e75b97a219
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs
new file mode 100644
index 000000000..3bebc7d94
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs
@@ -0,0 +1,81 @@
+#nullable enable
+using System;
+using System.Buffers;
+using System.Diagnostics.CodeAnalysis;
+
+namespace MagicOnion.Internal
+{
+ internal class StreamingHubPayload : IStreamingHubPayload
+ {
+ byte[]? buffer;
+ ReadOnlyMemory? memory;
+
+ public int Length => memory!.Value.Length;
+ public ReadOnlySpan Span => memory!.Value.Span;
+ public ReadOnlyMemory Memory => memory!.Value;
+
+ void IStreamingHubPayload.Initialize(ReadOnlySpan data)
+ {
+ ThrowIfUsing();
+
+ buffer = ArrayPool.Shared.Rent(data.Length);
+ data.CopyTo(buffer);
+ memory = buffer.AsMemory(0, (int)data.Length);
+ }
+
+ void IStreamingHubPayload.Initialize(ReadOnlySequence data)
+ {
+ ThrowIfUsing();
+ if (data.Length > int.MaxValue) throw new InvalidOperationException("A body size of StreamingHubPayload must be less than int.MaxValue");
+
+ buffer = ArrayPool.Shared.Rent((int)data.Length);
+ data.CopyTo(buffer);
+ memory = buffer.AsMemory(0, (int)data.Length);
+ }
+
+ void IStreamingHubPayload.Initialize(ReadOnlyMemory data)
+ {
+ ThrowIfUsing();
+
+ buffer = null;
+ memory = data;
+ }
+
+ void IStreamingHubPayload.Uninitialize()
+ {
+ ThrowIfDisposed();
+
+ if (buffer != null)
+ {
+#if DEBUG && NET6_0_OR_GREATER
+ Array.Fill(buffer, 0xff);
+#endif
+ ArrayPool.Shared.Return(buffer);
+ }
+
+ memory = null;
+ buffer = null;
+ }
+
+#if NON_UNITY && !NETSTANDARD2_0 && !NETSTANDARD2_1
+ [MemberNotNull(nameof(memory))]
+#endif
+ void ThrowIfDisposed()
+ {
+ if (memory is null) throw new ObjectDisposedException(nameof(StreamingHubPayload));
+ }
+
+ void ThrowIfUsing()
+ {
+ if (memory is not null) throw new InvalidOperationException(nameof(StreamingHubPayload));
+ }
+ }
+
+ internal interface IStreamingHubPayload
+ {
+ void Initialize(ReadOnlySpan data);
+ void Initialize(ReadOnlySequence data);
+ void Initialize(ReadOnlyMemory data);
+ void Uninitialize();
+ }
+}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs.meta
new file mode 100644
index 000000000..9e8fb86b6
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayload.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: 29c0ae37b5a14d74f9ff3d1ba17d88b6
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs
new file mode 100644
index 000000000..68166f3c2
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs
@@ -0,0 +1,98 @@
+#if !USE_OBJECTPOOL_STREAMINGHUBPAYLOADPOOL
+#nullable enable
+using System;
+using System.Buffers;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.CompilerServices;
+using System.Threading;
+
+namespace MagicOnion.Internal
+{
+ internal class StreamingHubPayloadPool
+ {
+ StreamingHubPayload? pool1;
+ StreamingHubPayload? pool2;
+ StreamingHubPayload? pool3;
+ StreamingHubPayload? pool4;
+
+ static StreamingHubPayloadPool pool = new();
+
+ public static StreamingHubPayloadPool Shared => pool;
+
+ public void Return(StreamingHubPayload payload)
+ {
+ ((IStreamingHubPayload)payload).Uninitialize();
+
+ var pooled = TryReturn(ref pool1, payload) ||
+ TryReturn(ref pool2, payload) ||
+ TryReturn(ref pool3, payload) ||
+ TryReturn(ref pool4, payload);
+ }
+
+ public StreamingHubPayload RentOrCreate(ReadOnlySequence data)
+ {
+ StreamingHubPayload? tmpPayload;
+ if (!(TryGet(ref pool1, out tmpPayload) ||
+ TryGet(ref pool2, out tmpPayload) ||
+ TryGet(ref pool3, out tmpPayload) ||
+ TryGet(ref pool4, out tmpPayload)))
+ {
+ tmpPayload = new StreamingHubPayload();
+ }
+
+ ((IStreamingHubPayload)tmpPayload).Initialize(data);
+
+ return tmpPayload;
+ }
+
+ public StreamingHubPayload RentOrCreate(ReadOnlySpan data)
+ {
+ StreamingHubPayload? tmpPayload;
+ if (!(TryGet(ref pool1, out tmpPayload) ||
+ TryGet(ref pool2, out tmpPayload) ||
+ TryGet(ref pool3, out tmpPayload) ||
+ TryGet(ref pool4, out tmpPayload)))
+ {
+ tmpPayload = new StreamingHubPayload();
+ }
+
+ ((IStreamingHubPayload)tmpPayload).Initialize(data);
+
+ return tmpPayload;
+ }
+
+ public StreamingHubPayload RentOrCreate(ReadOnlyMemory data)
+ {
+ StreamingHubPayload? tmpPayload;
+ if (!(TryGet(ref pool1, out tmpPayload) ||
+ TryGet(ref pool2, out tmpPayload) ||
+ TryGet(ref pool3, out tmpPayload) ||
+ TryGet(ref pool4, out tmpPayload)))
+ {
+ tmpPayload = new StreamingHubPayload();
+ }
+
+ ((IStreamingHubPayload)tmpPayload).Initialize(data);
+
+ return tmpPayload;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ bool TryReturn(ref StreamingHubPayload? field, StreamingHubPayload payload)
+ => Interlocked.CompareExchange(ref field, payload, null) == null;
+
+ bool TryGet(ref StreamingHubPayload? field, [NotNullWhen(true)] out StreamingHubPayload? payload)
+ {
+ var tmp = field;
+ if (tmp != null && Interlocked.CompareExchange(ref field, null, tmp) == tmp)
+ {
+ payload = tmp;
+ return true;
+ }
+
+ payload = null;
+ return false;
+ }
+ }
+}
+#endif
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs.meta
new file mode 100644
index 000000000..eb73abab4
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.BuiltIn.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: b10ef269ace167b4dae0bdd13ef2c7ad
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs
new file mode 100644
index 000000000..ae7abf115
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs
@@ -0,0 +1,55 @@
+#if USE_OBJECTPOOL_STREAMINGHUBPAYLOADPOOL
+using Microsoft.Extensions.ObjectPool;
+using System.Buffers;
+
+namespace MagicOnion.Internal;
+
+internal class StreamingHubPayloadPool
+{
+ const int MaximumRetained = 2 << 7;
+
+ readonly ObjectPool pool = new DefaultObjectPool(new Policy(), MaximumRetained);
+
+ public static StreamingHubPayloadPool Shared { get; } = new StreamingHubPayloadPool();
+
+ public StreamingHubPayload RentOrCreate(ReadOnlySequence data)
+ {
+ var payload = pool.Get();
+ ((IStreamingHubPayload)payload).Initialize(data);
+ return payload;
+ }
+
+ public StreamingHubPayload RentOrCreate(ReadOnlySpan data)
+ {
+ var payload = pool.Get();
+ ((IStreamingHubPayload)payload).Initialize(data);
+ return payload;
+ }
+
+ public StreamingHubPayload RentOrCreate(ReadOnlyMemory data)
+ {
+ var payload = pool.Get();
+ ((IStreamingHubPayload)payload).Initialize(data);
+ return payload;
+ }
+
+ public void Return(StreamingHubPayload payload)
+ {
+ pool.Return(payload);
+ }
+
+ class Policy : IPooledObjectPolicy
+ {
+ public StreamingHubPayload Create()
+ {
+ return new StreamingHubPayload();
+ }
+
+ public bool Return(StreamingHubPayload obj)
+ {
+ ((IStreamingHubPayload)obj).Uninitialize();
+ return true;
+ }
+ }
+}
+#endif
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs.meta
new file mode 100644
index 000000000..f6ea9615e
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubPayloadPool.ObjectPool.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: a0db1221924fc1a47a6966366a8028c1
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs
new file mode 100644
index 000000000..b1d8326cf
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs
@@ -0,0 +1,79 @@
+using System;
+using MessagePack;
+
+namespace MagicOnion.Internal
+{
+ internal ref struct StreamingHubServerMessageReader
+ {
+ readonly ReadOnlyMemory data;
+ MessagePackReader reader;
+
+ public StreamingHubServerMessageReader(ReadOnlyMemory data)
+ {
+ this.data = data;
+ this.reader = new MessagePackReader(data);
+ }
+
+ public StreamingHubMessageType ReadMessageType()
+ {
+ var arrayLength = this.reader.ReadArrayHeader();
+ return arrayLength switch
+ {
+ 2 => StreamingHubMessageType.RequestFireAndForget,
+ 3 => StreamingHubMessageType.Request,
+ 4 => reader.ReadByte() switch
+ {
+ 0x00 => StreamingHubMessageType.ClientResultResponse,
+ 0x01 => StreamingHubMessageType.ClientResultResponseWithError,
+ 0x7f => StreamingHubMessageType.HeartbeatResponse,
+ var subType => throw new InvalidOperationException($"Unknown client response message: {subType}"),
+ },
+ _ => throw new InvalidOperationException($"Unknown message format: ArrayLength = {arrayLength}"),
+ };
+ }
+
+ public (int MethodId, ReadOnlyMemory Body) ReadRequestFireAndForget()
+ {
+ // void: [methodId, [argument]]
+ var methodId = reader.ReadInt32();
+ var consumed = (int)reader.Consumed;
+
+ return (methodId, data.Slice(consumed));
+ }
+
+ public (int MessageId, int MethodId, ReadOnlyMemory Body) ReadRequest()
+ {
+ // T: [messageId, methodId, [argument]]
+ var messageId = reader.ReadInt32();
+ var methodId = reader.ReadInt32();
+ var consumed = (int)reader.Consumed;
+
+ return (messageId, methodId, data.Slice(consumed));
+ }
+
+ public (Guid ClientResultMessageId, int ClientMethodId, ReadOnlyMemory Body) ReadClientResultResponse()
+ {
+ // T: [0, clientResultMessageId, methodId, result]
+ var clientResultMessageId = MessagePackSerializer.Deserialize(ref reader);
+ var clientMethodId = reader.ReadInt32();
+ var consumed = (int)reader.Consumed;
+
+ return (clientResultMessageId, clientMethodId, data.Slice(consumed));
+ }
+
+ public (Guid ClientResultMessageId, int ClientMethodId, int StatusCode, string Detail, string Message) ReadClientResultResponseForError()
+ {
+ // T: [1, clientResultMessageId, methodId, [statusCode, detail, message]]
+ var clientResultMessageId = MessagePackSerializer.Deserialize(ref reader);
+ var clientMethodId = reader.ReadInt32();
+ var bodyArray = reader.ReadArrayHeader();
+ if (bodyArray != 3) throw new InvalidOperationException($"Invalid ClientResponse: The BodyArray length is {bodyArray}");
+
+ var statusCode = reader.ReadInt32();
+ var detail = reader.ReadString() ?? string.Empty;
+ var message = reader.ReadString() ?? string.Empty;
+
+ return (clientResultMessageId, clientMethodId, statusCode, detail, message);
+ }
+ }
+}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs.meta
new file mode 100644
index 000000000..2e9d41b3c
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: 6d4c013c0f3f500459b936e3cee4b01b
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal/DictionaryExtensions.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal/DictionaryExtensions.cs
new file mode 100644
index 000000000..ca34940c8
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal/DictionaryExtensions.cs
@@ -0,0 +1,26 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
+using System.Text;
+
+// ReSharper disable once CheckNamespace
+namespace System.Collections.Generic
+{
+ internal static class DictionaryExtensions
+ {
+#if NETSTANDARD2_0
+ public static bool Remove(this IDictionary dict, TKey key, [NotNullWhen(true)] out TValue? value)
+ {
+ if (dict.TryGetValue(key, out var v))
+ {
+ dict.Remove(key);
+ value = v!;
+ return true;
+ }
+
+ value = default;
+ return false;
+ }
+#endif
+ }
+}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal/DictionaryExtensions.cs.meta b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal/DictionaryExtensions.cs.meta
new file mode 100644
index 000000000..38b0a2bf0
--- /dev/null
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal/DictionaryExtensions.cs.meta
@@ -0,0 +1,11 @@
+fileFormatVersion: 2
+guid: 4e248d00dbcc30c40928a09bba388efc
+MonoImporter:
+ externalObjects: {}
+ serializedVersion: 2
+ defaultReferences: []
+ executionOrder: 0
+ icon: {instanceID: 0}
+ userData:
+ assetBundleName:
+ assetBundleVariant:
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Client.SourceGenerator.Unity/MagicOnion.Client.SourceGenerator.Unity.dll b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Client.SourceGenerator.Unity/MagicOnion.Client.SourceGenerator.Unity.dll
index 9695037d0..1ae0e388c 100644
Binary files a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Client.SourceGenerator.Unity/MagicOnion.Client.SourceGenerator.Unity.dll and b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnion.Client.SourceGenerator.Unity/MagicOnion.Client.SourceGenerator.Unity.dll differ
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnionClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnionClientBase.cs
index 9a370287c..035841aca 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnionClientBase.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/MagicOnionClientBase.cs
@@ -6,14 +6,14 @@
namespace MagicOnion.Client
{
- public readonly struct MagicOnionClientOptions
+ public class MagicOnionClientOptions
{
public string? Host { get; }
public CallInvoker CallInvoker { get; }
public IReadOnlyList Filters { get; }
public CallOptions CallOptions { get; }
- public MagicOnionClientOptions(CallInvoker callInvoker, string? host, CallOptions callOptions, IReadOnlyList filters)
+ public MagicOnionClientOptions(CallInvoker callInvoker, string? host, CallOptions callOptions, IReadOnlyList? filters)
{
Host = host;
CallOptions = callOptions;
@@ -22,11 +22,11 @@ public MagicOnionClientOptions(CallInvoker callInvoker, string? host, CallOption
}
public MagicOnionClientOptions WithCallOptions(CallOptions callOptions)
- => new MagicOnionClientOptions(CallInvoker, Host, callOptions, Filters);
+ => new (CallInvoker, Host, callOptions, Filters);
public MagicOnionClientOptions WithHost(string? host)
- => new MagicOnionClientOptions(CallInvoker, host, CallOptions, Filters);
+ => new (CallInvoker, host, CallOptions, Filters);
public MagicOnionClientOptions WithFilters(IReadOnlyList filters)
- => new MagicOnionClientOptions(CallInvoker, Host, CallOptions, filters);
+ => new (CallInvoker, Host, CallOptions, filters);
}
public class MagicOnionClientBase
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClient.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClient.cs
index 879e46285..833b91939 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClient.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClient.cs
@@ -1,6 +1,5 @@
using Grpc.Core;
using MagicOnion.Serialization;
-using MessagePack;
using System;
using System.Threading;
using System.Threading.Tasks;
@@ -22,10 +21,17 @@ public static partial class StreamingHubClient
return hubClient;
}
- public static async Task ConnectAsync(ChannelBase channel, TReceiver receiver, string? host = null, CallOptions option = default(CallOptions), IMagicOnionSerializerProvider? serializerProvider = null, IStreamingHubClientFactoryProvider? factoryProvider = null, IMagicOnionClientLogger? logger = null, CancellationToken cancellationToken = default)
+ public static Task ConnectAsync(ChannelBase channel, TReceiver receiver, string? host = null, CallOptions option = default(CallOptions), IMagicOnionSerializerProvider? serializerProvider = null, IStreamingHubClientFactoryProvider? factoryProvider = null, IMagicOnionClientLogger? logger = null, CancellationToken cancellationToken = default)
where TStreamingHub : IStreamingHub
{
- var hubClient = await ConnectAsync(channel.CreateCallInvoker(), receiver, host, option, serializerProvider, factoryProvider, logger, cancellationToken);
+ var options = StreamingHubClientOptions.CreateWithDefault(host, option, serializerProvider, logger);
+ return ConnectAsync(channel, receiver, options, factoryProvider, cancellationToken);
+ }
+
+ public static async Task ConnectAsync(ChannelBase channel, TReceiver receiver, StreamingHubClientOptions options, IStreamingHubClientFactoryProvider? factoryProvider = null, CancellationToken cancellationToken = default)
+ where TStreamingHub : IStreamingHub
+ {
+ var hubClient = await ConnectAsync(channel.CreateCallInvoker(), receiver, options, factoryProvider, cancellationToken);
// ReSharper disable once SuspiciousTypeConversion.Global
if (channel is IMagicOnionAwareGrpcChannel magicOnionAwareGrpcChannel)
{
@@ -38,11 +44,12 @@ public static partial class StreamingHubClient
public static TStreamingHub Connect(CallInvoker callInvoker, TReceiver receiver, string? host = null, CallOptions option = default(CallOptions), IMagicOnionSerializerProvider? serializerProvider = null, IStreamingHubClientFactoryProvider? factoryProvider = null, IMagicOnionClientLogger? logger = null)
where TStreamingHub : IStreamingHub
{
- var client = CreateClient(callInvoker, receiver, host, option, serializerProvider, factoryProvider, logger);
+ var options = StreamingHubClientOptions.CreateWithDefault(host, option);
+ var client = CreateClient(receiver, callInvoker, options, factoryProvider);
async void ConnectAndForget()
{
- var task = client.__ConnectAndSubscribeAsync(receiver, CancellationToken.None);
+ var task = client.__ConnectAndSubscribeAsync(CancellationToken.None);
try
{
await task.ConfigureAwait(false);
@@ -58,27 +65,32 @@ async void ConnectAndForget()
return (TStreamingHub)(object)client;
}
- public static async Task ConnectAsync(CallInvoker callInvoker, TReceiver receiver, string? host = null, CallOptions option = default(CallOptions), IMagicOnionSerializerProvider? serializerProvider = null, IStreamingHubClientFactoryProvider? factoryProvider = null, IMagicOnionClientLogger? logger = null, CancellationToken cancellationToken = default)
+ public static Task ConnectAsync(CallInvoker callInvoker, TReceiver receiver, string? host = null, CallOptions option = default(CallOptions), IMagicOnionSerializerProvider? serializerProvider = null, IStreamingHubClientFactoryProvider? factoryProvider = null, IMagicOnionClientLogger? logger = null, CancellationToken cancellationToken = default)
+ where TStreamingHub : IStreamingHub
+ {
+ var options = StreamingHubClientOptions.CreateWithDefault(host, option, serializerProvider, logger);
+ return ConnectAsync(callInvoker, receiver, options, factoryProvider, cancellationToken);
+ }
+
+ public static async Task ConnectAsync(CallInvoker callInvoker, TReceiver receiver, StreamingHubClientOptions options, IStreamingHubClientFactoryProvider? factoryProvider = null, CancellationToken cancellationToken = default)
where TStreamingHub : IStreamingHub
{
- var client = CreateClient(callInvoker, receiver, host, option, serializerProvider, factoryProvider, logger);
- await client.__ConnectAndSubscribeAsync(receiver, cancellationToken).ConfigureAwait(false);
+ var client = CreateClient(receiver, callInvoker, options, factoryProvider);
+ await client.__ConnectAndSubscribeAsync(cancellationToken).ConfigureAwait(false);
return (TStreamingHub)(object)client;
}
- static StreamingHubClientBase CreateClient(CallInvoker callInvoker, TReceiver receiver, string? host, CallOptions option, IMagicOnionSerializerProvider? serializerProvider, IStreamingHubClientFactoryProvider? factoryProvider, IMagicOnionClientLogger? logger)
+ static StreamingHubClientBase CreateClient(TReceiver receiver, CallInvoker callInvoker, StreamingHubClientOptions options, IStreamingHubClientFactoryProvider? factoryProvider)
where TStreamingHub : IStreamingHub
{
- serializerProvider ??= MagicOnionSerializerProvider.Default;
factoryProvider ??= StreamingHubClientFactoryProvider.Default;
- logger ??= NullMagicOnionClientLogger.Instance;
if (!factoryProvider.TryGetFactory(out var factory))
{
throw new NotSupportedException($"Unable to get client factory for StreamingHub type '{typeof(TStreamingHub).FullName}'.");
}
- return (StreamingHubClientBase)(object)factory(callInvoker, receiver, host, option, serializerProvider, logger);
+ return (StreamingHubClientBase)(object)factory(receiver, callInvoker, options);
}
}
}
diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs
index a0d8da805..9518d098e 100644
--- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs
+++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs
@@ -1,20 +1,75 @@
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Buffers;
+using System.Collections.Concurrent;
using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading.Channels;
using Grpc.Core;
using MagicOnion.Client.Internal.Threading;
using MagicOnion.Client.Internal.Threading.Tasks;
using MagicOnion.Internal;
using MagicOnion.Serialization;
using MagicOnion.Internal.Buffers;
-using MessagePack;
namespace MagicOnion.Client
{
+ public class StreamingHubClientOptions
+ {
+ public string? Host { get; }
+ public CallOptions CallOptions { get; }
+ public IMagicOnionSerializerProvider SerializerProvider { get; }
+ public IMagicOnionClientLogger Logger { get; }
+
+ public TimeSpan? HeartbeatInterval { get; }
+ public Action>? HeartbeatReceivedFromServer { get; }
+
+ public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger)
+ : this(host, callOptions, serializerProvider, logger, default, default)
+ {
+ }
+
+ public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? heartbeatInterval, Action>? heartbeatReceivedFromServer)
+ {
+ Host = host;
+ CallOptions = callOptions;
+ SerializerProvider = serializerProvider ?? throw new ArgumentNullException(nameof(serializerProvider));
+ Logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ HeartbeatInterval = heartbeatInterval;
+ HeartbeatReceivedFromServer = heartbeatReceivedFromServer;
+ }
+
+ public static StreamingHubClientOptions CreateWithDefault(string? host = default, CallOptions callOptions = default, IMagicOnionSerializerProvider? serializerProvider = default, IMagicOnionClientLogger? logger = default)
+ => new(host, callOptions, serializerProvider ?? MagicOnionSerializerProvider.Default, logger ?? NullMagicOnionClientLogger.Instance);
+
+ public StreamingHubClientOptions WithHost(string? host)
+ => new(host, CallOptions, SerializerProvider, Logger, HeartbeatInterval, HeartbeatReceivedFromServer);
+ public StreamingHubClientOptions WithCallOptions(CallOptions callOptions)
+ => new(Host, callOptions, SerializerProvider, Logger, HeartbeatInterval, HeartbeatReceivedFromServer);
+ public StreamingHubClientOptions WithSerializerProvider(IMagicOnionSerializerProvider serializerProvider)
+ => new(Host, CallOptions, serializerProvider, Logger, HeartbeatInterval, HeartbeatReceivedFromServer);
+ public StreamingHubClientOptions WithLogger(IMagicOnionClientLogger logger)
+ => new(Host, CallOptions, SerializerProvider, logger, HeartbeatInterval, HeartbeatReceivedFromServer);
+
+ ///
+ /// Sets a heartbeat interval. If a value is , the heartbeat from the client is disabled.
+ ///
+ ///
+ ///
+ public StreamingHubClientOptions WithHeartbeatInterval(TimeSpan? interval)
+ => new(Host, CallOptions, SerializerProvider, Logger, interval, HeartbeatReceivedFromServer);
+
+ ///
+ /// Sets a heartbeat callback. If additional metadata is provided by the server in the heartbeat message, this metadata is provided as an argument.
+ ///
+ ///
+ ///
+ public StreamingHubClientOptions WithHeartbeatReceived(Action>? onHeartbeatReceived)
+ => new(Host, CallOptions, SerializerProvider, Logger, HeartbeatInterval, onHeartbeatReceived);
+ }
+
public abstract class StreamingHubClientBase
where TStreamingHub : IStreamingHub
{
@@ -23,47 +78,52 @@ public abstract class StreamingHubClientBase
const string StreamingHubVersionHeaderValue = "2";
#pragma warning restore IDE1006 // Naming Styles
- readonly string? host;
- readonly CallOptions option;
readonly CallInvoker callInvoker;
+ readonly StreamingHubClientOptions options;
readonly IMagicOnionClientLogger logger;
readonly IMagicOnionSerializer messageSerializer;
- readonly AsyncLock asyncLock = new AsyncLock();
- readonly Method duplexStreamingConnectMethod;
+ readonly Method duplexStreamingConnectMethod;
+ // {messageId, TaskCompletionSource}
+ readonly Dictionary