From fe781b4864b503f03d5d07e0af8cc31539ea13da Mon Sep 17 00:00:00 2001 From: addjuarez Date: Wed, 21 Sep 2022 21:37:31 +0000 Subject: [PATCH 01/17] migrate actors to grpc Signed-off-by: addjuarez --- .github/workflows/itests.yml | 2 +- src/Dapr.Actors/Client/ActorProxyFactory.cs | 29 +- src/Dapr.Actors/Client/ActorProxyOptions.cs | 21 + src/Dapr.Actors/DaprGrpcInteractor.cs | 382 ++++++++++++++++++ src/Dapr.Actors/DaprHttpInteractor.cs | 6 + src/Dapr.Actors/DaprInteractorBuilder.cs | 154 +++++++ src/Dapr.Actors/IDaprInteractor.cs | 12 + src/Dapr.Actors/Runtime/ActorRuntime.cs | 12 +- .../Runtime/ActorRuntimeOptions.cs | 22 + src/Dapr.Actors/Runtime/DaprStateProvider.cs | 57 ++- src/Dapr.Actors/Runtime/TimerInfo.cs | 1 - .../Protos/dapr/proto/dapr/v1/dapr.proto | 1 + 12 files changed, 691 insertions(+), 8 deletions(-) create mode 100644 src/Dapr.Actors/DaprGrpcInteractor.cs create mode 100644 src/Dapr.Actors/DaprInteractorBuilder.cs diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index 7b8ac5b58..b439c1faf 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -49,7 +49,7 @@ jobs: DAPR_RUNTIME_VER: 1.8.0 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh DAPR_CLI_REF: '' - DAPR_REF: '' + DAPR_REF: '25213183ab8fd20f85f74591ee6ec3a00822adf7' steps: - name: Set up Dapr CLI run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }} diff --git a/src/Dapr.Actors/Client/ActorProxyFactory.cs b/src/Dapr.Actors/Client/ActorProxyFactory.cs index 9fd5edddb..1edfc6be6 100644 --- a/src/Dapr.Actors/Client/ActorProxyFactory.cs +++ b/src/Dapr.Actors/Client/ActorProxyFactory.cs @@ -66,7 +66,19 @@ public ActorProxy Create(ActorId actorId, string actorType, ActorProxyOptions op options ??= this.DefaultOptions; var actorProxy = new ActorProxy(); - var daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); + IDaprInteractor daprInteractor; + if (options.useGrpc) { + daprInteractor = new DaprInteractorBuilder() + .UseGrpcEndpoint(options.GrpcEndpoint) + .UseHttpEndpoint(options.HttpEndpoint) + .UseDaprApiToken(options.DaprApiToken) + .UseGrpcChannelOptions(options.GrpcChannelOptions) + .UseHandler(this.handler) + .UseRequestTimeout(options.RequestTimeout) + .Build(); + } else { + daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); + } var nonRemotingClient = new ActorNonRemotingClient(daprInteractor); actorProxy.Initialize(nonRemotingClient, actorId, actorType, options); @@ -77,8 +89,19 @@ public ActorProxy Create(ActorId actorId, string actorType, ActorProxyOptions op public object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string actorType, ActorProxyOptions options = null) { options ??= this.DefaultOptions; - - var daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); + IDaprInteractor daprInteractor; + if (options.useGrpc) { + daprInteractor = new DaprInteractorBuilder() + .UseGrpcEndpoint(options.GrpcEndpoint) + .UseHttpEndpoint(options.HttpEndpoint) + .UseDaprApiToken(options.DaprApiToken) + .UseGrpcChannelOptions(options.GrpcChannelOptions) + .UseHandler(this.handler) + .UseRequestTimeout(options.RequestTimeout) + .Build(); + } else { + daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); + } var remotingClient = new ActorRemotingClient(daprInteractor); var proxyGenerator = ActorCodeBuilder.GetOrCreateProxyGenerator(actorInterfaceType); var actorProxy = proxyGenerator.CreateActorProxy(); diff --git a/src/Dapr.Actors/Client/ActorProxyOptions.cs b/src/Dapr.Actors/Client/ActorProxyOptions.cs index 808605c70..ab8758a43 100644 --- a/src/Dapr.Actors/Client/ActorProxyOptions.cs +++ b/src/Dapr.Actors/Client/ActorProxyOptions.cs @@ -15,6 +15,7 @@ namespace Dapr.Actors.Client { using System; using System.Text.Json; + using Grpc.Net.Client; /// /// The class containing customizable options for how the Actor Proxy is initialized. @@ -58,9 +59,29 @@ public JsonSerializerOptions JsonSerializerOptions /// public string HttpEndpoint { get; set; } = DaprDefaults.GetDefaultHttpEndpoint(); + /// + /// Gets or sets the Grpc endpoint URI used to communicate with the Dapr sidecar. + /// + /// + /// The URI endpoint to use for Grpc calls to the Dapr runtime. The default value will be + /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the + /// DAPR_GRPC_PORT environment variable. + /// + /// + public string GrpcEndpoint { get; set; } = DaprDefaults.GetDefaultGrpcEndpoint(); + /// /// The timeout allowed for an actor request. Can be set to System.Threading.Timeout.InfiniteTimeSpan to disable any timeouts. /// public TimeSpan? RequestTimeout { get; set; } = null; + /// + /// Option to use GRPC or HTTP + /// + public bool useGrpc { get; set; } = true; + + /// + /// Options for grpc channel + /// + public GrpcChannelOptions GrpcChannelOptions { get; set; } = new GrpcChannelOptions(){ThrowOperationCanceledOnCancellation = true,}; } } diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs new file mode 100644 index 000000000..b1b0d1f58 --- /dev/null +++ b/src/Dapr.Actors/DaprGrpcInteractor.cs @@ -0,0 +1,382 @@ +// ------------------------------------------------------------------------ +// Copyright 2022 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors +{ + using System; + using System.Collections.Generic; + using System.Globalization; + using System.IO; + using System.Linq; + using System.Net.Http; + using System.Text; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using Dapr.Actors.Communication; + using Dapr.Actors.Resources; + using System.Xml; + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + using Grpc.Core; + using Google.Protobuf; + using Dapr.Actors.Runtime; + using Grpc.Net.Client; + + /// + /// Class to interact with Dapr runtime over grpc. + /// + internal class DaprGrpcInteractor : IDaprInteractor + { + private readonly JsonSerializerOptions jsonSerializerOptions = JsonSerializerDefaults.Web; + private readonly string httpEndpoint; + private readonly static HttpMessageHandler defaultHandler = new HttpClientHandler(); + private readonly HttpMessageHandler handler; + private HttpClient httpClient; + private bool disposed; + private string daprApiToken; + private readonly Autogenerated.Dapr.DaprClient client; + internal Autogenerated.Dapr.DaprClient Client => client; + private readonly GrpcChannel channel; + + private const string EXCEPTION_HEADER_TAG = "b:KeyValueOfstringbase64Binary"; + + public DaprGrpcInteractor( + GrpcChannel channel, + Autogenerated.Dapr.DaprClient inner, + HttpMessageHandler clientHandler, + string httpEndpoint, + string apiToken, + TimeSpan? requestTimeout) + { + this.channel = channel; + this.client = inner; + this.handler = clientHandler ?? defaultHandler; + this.httpEndpoint = httpEndpoint; + this.daprApiToken = apiToken; + this.httpClient = this.CreateHttpClient(); + this.httpClient.Timeout = requestTimeout ?? this.httpClient.Timeout; + } + + public async Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) + { + var request = new Autogenerated.GetActorStateRequest() + { + ActorId = actorId, + ActorType = actorType, + Key = keyName, + }; + var options = CreateCallOptions(cancellationToken); + + Autogenerated.GetActorStateResponse response = new Autogenerated.GetActorStateResponse(); + try + { + response = await client.GetActorStateAsync(request, options); + } + catch (RpcException ex) + { + throw new DaprException("GetActorState operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + return response.Data.ToStringUtf8(); + } + + public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) + { + return null; + } + + public async Task SaveStateTransactionallyAsyncGrpc(string actorType, string actorId, List data, CancellationToken cancellationToken = default) + { + var request = new Autogenerated.ExecuteActorStateTransactionRequest() + { + ActorId = actorId, + ActorType = actorType, + }; + request.Operations.AddRange(data); + var options = CreateCallOptions(cancellationToken); + + try + { + await client.ExecuteActorStateTransactionAsync(request, options); + } + catch (RpcException ex) + { + throw new DaprException("SaveStateTransactionallyAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + public async Task InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default) + { + var requestMessageHeader = remotingRequestRequestMessage.GetHeader(); + + var actorId = requestMessageHeader.ActorId.ToString(); + var methodName = requestMessageHeader.MethodName; + var actorType = requestMessageHeader.ActorType; + var interfaceId = requestMessageHeader.InterfaceId; + + var serializedHeader = serializersManager.GetHeaderSerializer() + .SerializeRequestHeader(remotingRequestRequestMessage.GetHeader()); + + var msgBodySeriaizer = serializersManager.GetRequestMessageBodySerializer(interfaceId); + var serializedMsgBody = msgBodySeriaizer.Serialize(remotingRequestRequestMessage.GetBody()); + + var request = new Autogenerated.InvokeActorRequest() + { + ActorId = actorId, + ActorType = actorType, + Method = methodName, + }; + + if (serializedMsgBody != null) + { + request.Data = ByteString.CopyFrom(serializedMsgBody); + } + + var options = CreateCallOptions(cancellationToken); + + request.Metadata.Add(Constants.RequestHeaderName, Encoding.UTF8.GetString(serializedHeader, 0, serializedHeader.Length)); + + var reentrancyId = ActorReentrancyContextAccessor.ReentrancyContext; + if (reentrancyId != null) + { + request.Metadata.Add(Constants.ReentrancyRequestHeaderName, reentrancyId); + } + + Autogenerated.InvokeActorResponse response = new Autogenerated.InvokeActorResponse(); + try + { + response = await client.InvokeActorAsync(request, options); + + } + catch (RpcException ex) + { + throw new DaprException("InvokeActorAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + + IActorResponseMessageHeader actorResponseMessageHeader = null; + IActorResponseMessageBody actorResponseMessageBody = null; + if (response != null) + { + var responseMessageBody = new MemoryStream(response.Data.ToArray()); + + var responseBodySerializer = serializersManager.GetResponseMessageBodySerializer(interfaceId); + try { + actorResponseMessageBody = responseBodySerializer.Deserialize(responseMessageBody); + } + catch + { + var isDeserialzied = + ActorInvokeException.ToException( + responseMessageBody, + out var remoteMethodException); + if (isDeserialzied) + { + throw new ActorMethodInvocationException( + "Remote Actor Method Exception, DETAILS: " + remoteMethodException.Message, + remoteMethodException, + false /* non transient */); + } + else + { + throw new ActorInvokeException(remoteMethodException.GetType().FullName, string.Format( + CultureInfo.InvariantCulture, + SR.ErrorDeserializationFailure, + remoteMethodException.ToString())); + } + } + + } + + return new ActorResponseMessage(actorResponseMessageHeader, actorResponseMessageBody); + } + + private string GetExceptionDetails(string header) { + XmlDocument xmlHeader = new XmlDocument(); + xmlHeader.LoadXml(header); + XmlNodeList exceptionValueXML = xmlHeader.GetElementsByTagName(EXCEPTION_HEADER_TAG); + string exceptionDetails = ""; + if (exceptionValueXML != null && exceptionValueXML.Item(1) != null) + { + exceptionDetails = exceptionValueXML.Item(1).LastChild.InnerText; + } + var base64EncodedBytes = System.Convert.FromBase64String(exceptionDetails); + return Encoding.UTF8.GetString(base64EncodedBytes); + } + + public async Task InvokeActorMethodWithoutRemotingAsync(string actorType, string actorId, string methodName, string jsonPayload, CancellationToken cancellationToken = default) + { + var request = new Autogenerated.InvokeActorRequest() + { + ActorId = actorId, + ActorType = actorType, + Method = methodName, + }; + + if (jsonPayload != null) + { + request.Data = ByteString.CopyFromUtf8(jsonPayload); + } + + var options = CreateCallOptions(cancellationToken); + + var reentrancyId = ActorReentrancyContextAccessor.ReentrancyContext; + if (reentrancyId != null) + { + options.Headers.Add(Constants.ReentrancyRequestHeaderName, reentrancyId); + } + + Autogenerated.InvokeActorResponse response = new Autogenerated.InvokeActorResponse(); + try + { + response = await client.InvokeActorAsync(request, options); + } + catch (RpcException ex) + { + throw new DaprException("InvokeActor operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + return new MemoryStream(response.Data.ToArray()); + } + + public async Task RegisterReminderAsync(string actorType, string actorId, string reminderName, string data, CancellationToken cancellationToken = default) + { + + var reminderdata = await ReminderInfo.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(data))); + + var request = new Autogenerated.RegisterActorReminderRequest() + { + ActorId = actorId, + ActorType = actorType, + Name = reminderName, + DueTime = reminderdata.DueTime != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(reminderdata.DueTime) : "", + Ttl = reminderdata.Ttl != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(reminderdata.Ttl) : "", + Period = reminderdata.Period != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(reminderdata.Period) : "", + Data = ByteString.CopyFrom(reminderdata.Data), + }; + var options = CreateCallOptions(cancellationToken); + + try + { + await client.RegisterActorReminderAsync(request, options); + } + catch (RpcException ex) + { + throw new DaprException("RegisterReminde operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + public async Task UnregisterReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default) + { + + var request = new Autogenerated.UnregisterActorReminderRequest() + { + ActorId = actorId, + ActorType = actorType, + Name = reminderName, + }; + var options = CreateCallOptions(cancellationToken); + + try + { + await client.UnregisterActorReminderAsync(request, options); + } + catch (RpcException ex) + { + throw new DaprException("UnregisterReminder operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + public async Task RegisterTimerAsync(string actorType, string actorId, string timerName, string data, CancellationToken cancellationToken = default) + { + var timerdata = JsonSerializer.Deserialize(data, jsonSerializerOptions); + + var request = new Autogenerated.RegisterActorTimerRequest() + { + ActorId = actorId, + ActorType = actorType, + Name = timerName, + DueTime = timerdata.DueTime != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(timerdata.DueTime) : "", + Ttl = timerdata.Ttl != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(timerdata.Ttl) : "", + Period = timerdata.Period != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(timerdata.Period) : "", + Data = ByteString.CopyFrom(timerdata.Data), + Callback = timerdata.Callback + }; + var options = CreateCallOptions(cancellationToken); + + try + { + await client.RegisterActorTimerAsync(request, options); + } + catch (RpcException ex) + { + throw new DaprException("RegisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + public async Task UnregisterTimerAsync(string actorType, string actorId, string timerName, CancellationToken cancellationToken = default) + { + var request = new Autogenerated.UnregisterActorTimerRequest() + { + ActorId = actorId, + ActorType = actorType, + Name = timerName, + }; + var options = CreateCallOptions(cancellationToken); + + try + { + await client.UnregisterActorTimerAsync(request, options); + } + catch (RpcException ex) + { + throw new DaprException("UnregisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + /// + /// Disposes resources. + /// + /// False values indicates the method is being called by the runtime, true value indicates the method is called by the user code. + protected virtual void Dispose(bool disposing) + { + if (!this.disposed) + { + if (disposing) + { + this.httpClient.Dispose(); + this.httpClient = null; + this.channel.Dispose(); + } + + this.disposed = true; + } + } + + private CallOptions CreateCallOptions(CancellationToken cancellationToken) + { + var options = new CallOptions(headers: new Metadata(), cancellationToken: cancellationToken); + + // add token for dapr api token based authentication + if (this.daprApiToken is not null) + { + options.Headers.Add("dapr-api-token", this.daprApiToken); + } + + return options; + } + + private HttpClient CreateHttpClient() + { + return new HttpClient(this.handler, false); + } + + } +} diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index df5207f4e..a89b8da91 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -28,6 +28,7 @@ namespace Dapr.Actors using Dapr.Actors.Communication; using Dapr.Actors.Resources; using System.Xml; + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; /// /// Class to interact with Dapr runtime over http. @@ -75,6 +76,11 @@ HttpRequestMessage RequestFunc() return stringResponse; } + public Task SaveStateTransactionallyAsyncGrpc(string actorType, string actorId, List data, CancellationToken cancellationToken = default) + { + return null; + } + public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) { var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorStateRelativeUrlFormat, actorType, actorId); diff --git a/src/Dapr.Actors/DaprInteractorBuilder.cs b/src/Dapr.Actors/DaprInteractorBuilder.cs new file mode 100644 index 000000000..d7494f429 --- /dev/null +++ b/src/Dapr.Actors/DaprInteractorBuilder.cs @@ -0,0 +1,154 @@ +// ------------------------------------------------------------------------ +// Copyright 2022 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Actors +{ + using System; + using System.Net.Http; + using Grpc.Net.Client; + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + + /// + /// Builder for building + /// + public sealed class DaprInteractorBuilder + { + /// + /// Initializes a new instance of the class. + /// + public DaprInteractorBuilder() + { + this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint(); + this.HttpEndpoint = DaprDefaults.GetDefaultHttpEndpoint(); + + this.GrpcChannelOptions = new GrpcChannelOptions() + { + ThrowOperationCanceledOnCancellation = true, + }; + this.DaprApiToken = DaprDefaults.GetDefaultDaprApiToken(); + } + + internal string GrpcEndpoint { get; private set; } + + internal string HttpEndpoint { get; private set; } + + internal GrpcChannelOptions GrpcChannelOptions { get; private set; } + internal string DaprApiToken { get; private set; } + + internal TimeSpan? RequestTimeout { get; set; } = null; + + internal HttpMessageHandler Handler { get; set; } = null; + /// + /// Overrides the HTTP endpoint used by for communicating with the Dapr runtime. + /// + /// + /// The URI endpoint to use for HTTP calls to the Dapr runtime. The default value will be + /// http://127.0.0.1:DAPR_HTTP_PORT where DAPR_HTTP_PORT represents the value of the + /// DAPR_HTTP_PORT environment variable. + /// + /// The instance. + public DaprInteractorBuilder UseHttpEndpoint(string httpEndpoint) + { + ArgumentVerifier.ThrowIfNullOrEmpty(httpEndpoint, nameof(httpEndpoint)); + this.HttpEndpoint = httpEndpoint; + return this; + } + + + /// + /// Overrides the gRPC endpoint used by for communicating with the Dapr runtime. + /// + /// + /// The URI endpoint to use for gRPC calls to the Dapr runtime. The default value will be + /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the + /// DAPR_GRPC_PORT environment variable. + /// + /// The instance. + public DaprInteractorBuilder UseGrpcEndpoint(string grpcEndpoint) + { + ArgumentVerifier.ThrowIfNullOrEmpty(grpcEndpoint, nameof(grpcEndpoint)); + this.GrpcEndpoint = grpcEndpoint; + return this; + } + + + /// + /// Uses the provided for creating the . + /// + /// The to use for creating the . + /// The instance. + public DaprInteractorBuilder UseGrpcChannelOptions(GrpcChannelOptions grpcChannelOptions) + { + this.GrpcChannelOptions = grpcChannelOptions; + return this; + } + + /// + /// Adds the provided on every request to the Dapr runtime. + /// + /// The token to be added to the request headers/>. + /// The instance. + public DaprInteractorBuilder UseDaprApiToken(string apiToken) + { + this.DaprApiToken = apiToken; + return this; + } + + /// + /// Adds the provided on every request to the Dapr runtime. + /// + /// The token to be added to the request headers/>. + /// The instance. + public DaprInteractorBuilder UseHandler(HttpMessageHandler handler) + { + this.Handler = handler; + return this; + } + + /// + /// Adds the provided on every request to the Dapr runtime. + /// + /// The token to be added to the request headers/>. + /// The instance. + public DaprInteractorBuilder UseRequestTimeout(TimeSpan? requestTimeout) + { + this.RequestTimeout = requestTimeout; + return this; + } + + /// + /// Builds a instance from the properties of the builder. + /// + /// The . + internal DaprGrpcInteractor Build() + { + var grpcEndpoint = new Uri(this.GrpcEndpoint); + if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https") + { + throw new InvalidOperationException("The gRPC endpoint must use http or https."); + } + + if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp)) + { + // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel. + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + } + + + var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions); + var client = new Autogenerated.Dapr.DaprClient(channel); + + return new DaprGrpcInteractor(channel, client, this.Handler, this.HttpEndpoint, this.DaprApiToken, this.RequestTimeout); + } + } +} diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs index 04eb66de9..d4c7b940e 100644 --- a/src/Dapr.Actors/IDaprInteractor.cs +++ b/src/Dapr.Actors/IDaprInteractor.cs @@ -17,6 +17,8 @@ namespace Dapr.Actors using System.Threading; using System.Threading.Tasks; using Dapr.Actors.Communication; + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + using System.Collections.Generic; /// /// Interface for interacting with Dapr runtime. @@ -44,6 +46,16 @@ internal interface IDaprInteractor /// A task that represents the asynchronous operation. Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default); + /// + /// Saves state batch to Dapr. + /// + /// Type of actor. + /// ActorId. + /// JSON data with state changes as per the Dapr spec for transaction state update. + /// Cancels the operation. + /// A task that represents the asynchronous operation. + Task SaveStateTransactionallyAsyncGrpc(string actorType, string actorId, List data, CancellationToken cancellationToken = default); + /// /// Saves a state to Dapr. /// diff --git a/src/Dapr.Actors/Runtime/ActorRuntime.cs b/src/Dapr.Actors/Runtime/ActorRuntime.cs index 8d2ae0cab..e51e0ce11 100644 --- a/src/Dapr.Actors/Runtime/ActorRuntime.cs +++ b/src/Dapr.Actors/Runtime/ActorRuntime.cs @@ -50,7 +50,17 @@ internal ActorRuntime(ActorRuntimeOptions options, ILoggerFactory loggerFactory, // Revisit this if actor initialization becomes a significant source of delay for large projects. foreach (var actor in options.Actors) { - var daprInteractor = new DaprHttpInteractor(clientHandler: null, httpEndpoint: options.HttpEndpoint, apiToken: options.DaprApiToken, requestTimeout: null); + IDaprInteractor daprInteractor; + if (options.useGrpc) { + daprInteractor = new DaprInteractorBuilder() + .UseGrpcEndpoint(options.GrpcEndpoint) + .UseHttpEndpoint(options.HttpEndpoint) + .UseDaprApiToken(options.DaprApiToken) + .UseGrpcChannelOptions(options.GrpcChannelOptions) + .Build(); + } else { + daprInteractor = new DaprHttpInteractor(null, options.HttpEndpoint, options.DaprApiToken, null); + } this.actorManagers[actor.Type.ActorTypeName] = new ActorManager( actor, actor.Activator ?? this.activatorFactory.CreateActivator(actor.Type), diff --git a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs index 3f4a6df88..e364f940b 100644 --- a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs +++ b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs @@ -13,6 +13,7 @@ using System; using System.Text.Json; +using Grpc.Net.Client; namespace Dapr.Actors.Runtime { @@ -225,5 +226,26 @@ public int? RemindersStoragePartitions /// /// public string HttpEndpoint { get; set; } = DaprDefaults.GetDefaultHttpEndpoint(); + + /// + /// Gets or sets the Grpc endpoint URI used to communicate with the Dapr sidecar. + /// + /// + /// The URI endpoint to use for Grpc calls to the Dapr runtime. The default value will be + /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the + /// DAPR_GRPC_PORT environment variable. + /// + /// + public string GrpcEndpoint { get; set; } = DaprDefaults.GetDefaultGrpcEndpoint(); + + /// + /// Option to use GRPC or HTTP + /// + public bool useGrpc { get; set; } = true; + + /// + /// Options for grpc channel + /// + public GrpcChannelOptions GrpcChannelOptions { get; set; } = new GrpcChannelOptions(){ThrowOperationCanceledOnCancellation = true,}; } } diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index ae86fb28b..67f1ec6db 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -20,6 +20,9 @@ namespace Dapr.Actors.Runtime using System.Text.Json; using System.Threading; using System.Threading.Tasks; + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + using Google.Protobuf; + using Google.Protobuf.WellKnownTypes; /// /// State Provider to interact with Dapr runtime. @@ -77,10 +80,60 @@ public async Task ContainsStateAsync(string actorType, string actorId, str public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) { - await this.DoStateChangesTransactionallyAsync(actorType, actorId, stateChanges, cancellationToken); + if (this.daprInteractor.GetType().Name.Equals("DaprGrpcInteractor")){ + await this.DoStateChangesTransactionallyAsyncGrpc(actorType, actorId, stateChanges, cancellationToken); + } else { + await this.DoStateChangesTransactionallyAsyncHttp(actorType, actorId, stateChanges, cancellationToken); + } + } + + private async Task DoStateChangesTransactionallyAsyncGrpc(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + { + List grpcOps = new List(); + + foreach (var stateChange in stateChanges) + { + var operation = this.GetDaprStateOperation(stateChange.ChangeKind); + var op = new Autogenerated.TransactionalActorStateOperation() + { + OperationType = operation, + }; + + switch (stateChange.ChangeKind) + { + case StateChangeKind.Remove: + break; + case StateChangeKind.Add: + case StateChangeKind.Update: + op.Key = stateChange.StateName; + if (this.actorStateSerializer != null) + { + + var buffer = ByteString.CopyFrom(this.actorStateSerializer.Serialize(stateChange.Type, stateChange.Value)); + op.Value = new Any() + { + Value = buffer, + }; + } + else + { + var buffer = ByteString.CopyFrom(JsonSerializer.SerializeToUtf8Bytes(stateChange.Value, stateChange.Type, jsonSerializerOptions)); + op.Value = new Any() + { + Value = buffer, + }; + + } + break; + default: + break; + } + grpcOps.Add(op); + } + await this.daprInteractor.SaveStateTransactionallyAsyncGrpc(actorType, actorId, grpcOps, cancellationToken); } - private async Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + private async Task DoStateChangesTransactionallyAsyncHttp(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) { // Transactional state update request body: /* diff --git a/src/Dapr.Actors/Runtime/TimerInfo.cs b/src/Dapr.Actors/Runtime/TimerInfo.cs index bc206915a..d5a208735 100644 --- a/src/Dapr.Actors/Runtime/TimerInfo.cs +++ b/src/Dapr.Actors/Runtime/TimerInfo.cs @@ -24,7 +24,6 @@ namespace Dapr.Actors.Runtime /// /// Represents the details of the timer set on an Actor. /// - [Obsolete("This class is an implementation detail of the framework and will be made internal in a future release.")] [JsonConverter(typeof(TimerInfoConverter))] public class TimerInfo { diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto index b83256286..666bee362 100644 --- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto +++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto @@ -455,6 +455,7 @@ message InvokeActorRequest { string actor_id = 2; string method = 3; bytes data = 4; + map metadata = 5; } // InvokeActorResponse is the method that returns an actor invocation response. From a0dc723b911fef03f4a0f929bb0007bfba3df6b3 Mon Sep 17 00:00:00 2001 From: Addison Juarez Date: Wed, 21 Sep 2022 19:52:21 -0500 Subject: [PATCH 02/17] Fix token tests Signed-off-by: addjuarez --- test/Dapr.Actors.Test/ApiTokenTests.cs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/test/Dapr.Actors.Test/ApiTokenTests.cs b/test/Dapr.Actors.Test/ApiTokenTests.cs index 29ee955c7..57e12b4d3 100644 --- a/test/Dapr.Actors.Test/ApiTokenTests.cs +++ b/test/Dapr.Actors.Test/ApiTokenTests.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ public async Task CreateProxyWithRemoting_WithApiToken() var options = new ActorProxyOptions { DaprApiToken = "test_token", + useGrpc = false, }; var request = await client.CaptureHttpRequestAsync(async handler => @@ -77,6 +78,7 @@ public async Task CreateProxyWithNoRemoting_WithApiToken() var options = new ActorProxyOptions { DaprApiToken = "test_token", + useGrpc = false, }; var request = await client.CaptureHttpRequestAsync(async handler => @@ -99,9 +101,14 @@ public async Task CreateProxyWithNoRemoting_WithNoApiToken() var actorId = new ActorId("abc"); + var options = new ActorProxyOptions + { + useGrpc = false, + }; + var request = await client.CaptureHttpRequestAsync(async handler => { - var factory = new ActorProxyFactory(null, handler); + var factory = new ActorProxyFactory(options, handler); var proxy = factory.Create(actorId, "TestActor"); await proxy.InvokeMethodAsync("SetCountAsync", 1, new CancellationToken()); }); From e9b18d681f98444351a43a292eff97d398108609 Mon Sep 17 00:00:00 2001 From: addjuarez Date: Thu, 22 Sep 2022 00:58:37 +0000 Subject: [PATCH 03/17] update go version Signed-off-by: addjuarez --- .github/workflows/itests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index b439c1faf..f5645acd9 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -41,7 +41,7 @@ jobs: prefix: 'net6' env: NUPKG_OUTDIR: bin/Release/nugets - GOVER: 1.17 + GOVER: 1.19 GOOS: linux GOARCH: amd64 GOPROXY: https://proxy.golang.org From 9afcb33cfc2cea777943d9a952af3c941e5c53ca Mon Sep 17 00:00:00 2001 From: addjuarez Date: Thu, 22 Sep 2022 20:14:14 +0000 Subject: [PATCH 04/17] Fix e2e tests Signed-off-by: addjuarez --- src/Dapr.Actors/DaprGrpcInteractor.cs | 17 +++++++++-------- .../Actors/E2ETests.ExceptionTests.cs | 8 ++++++-- .../Actors/E2ETests.ReentrantTests.cs | 2 +- test/Dapr.E2E.Test/E2ETests.cs | 3 ++- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs index b1b0d1f58..48563e55f 100644 --- a/src/Dapr.Actors/DaprGrpcInteractor.cs +++ b/src/Dapr.Actors/DaprGrpcInteractor.cs @@ -84,7 +84,7 @@ public async Task GetStateAsync(string actorType, string actorId, string } catch (RpcException ex) { - throw new DaprException("GetActorState operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("GetActorState operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } return response.Data.ToStringUtf8(); } @@ -110,7 +110,7 @@ public async Task SaveStateTransactionallyAsyncGrpc(string actorType, string act } catch (RpcException ex) { - throw new DaprException("SaveStateTransactionallyAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("SaveStateTransactionallyAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -159,7 +159,7 @@ public async Task InvokeActorMethodWithRemotingAsync(Acto } catch (RpcException ex) { - throw new DaprException("InvokeActorAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("InvokeActorAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } IActorResponseMessageHeader actorResponseMessageHeader = null; @@ -180,6 +180,7 @@ public async Task InvokeActorMethodWithRemotingAsync(Acto out var remoteMethodException); if (isDeserialzied) { + throw new ActorMethodInvocationException( "Remote Actor Method Exception, DETAILS: " + remoteMethodException.Message, remoteMethodException, @@ -241,7 +242,7 @@ public async Task InvokeActorMethodWithoutRemotingAsync(string actorType } catch (RpcException ex) { - throw new DaprException("InvokeActor operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("InvokeActor operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } return new MemoryStream(response.Data.ToArray()); } @@ -269,7 +270,7 @@ public async Task RegisterReminderAsync(string actorType, string actorId, string } catch (RpcException ex) { - throw new DaprException("RegisterReminde operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("RegisterReminde operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -290,7 +291,7 @@ public async Task UnregisterReminderAsync(string actorType, string actorId, stri } catch (RpcException ex) { - throw new DaprException("UnregisterReminder operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("UnregisterReminder operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -317,7 +318,7 @@ public async Task RegisterTimerAsync(string actorType, string actorId, string ti } catch (RpcException ex) { - throw new DaprException("RegisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("RegisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -337,7 +338,7 @@ public async Task UnregisterTimerAsync(string actorType, string actorId, string } catch (RpcException ex) { - throw new DaprException("UnregisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprApiException("UnregisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index 986a2c4f0..ab3a76655 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -19,6 +19,7 @@ namespace Dapr.E2E.Test using Dapr.Actors; using Dapr.E2E.Test.Actors.ExceptionTesting; using Xunit; + using Dapr.Actors.Client; public partial class E2ETests : IAsyncLifetime { [Fact] @@ -26,8 +27,11 @@ public async Task ActorCanProvideExceptionDetails() { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); var actorIds = new ActorId(Guid.NewGuid().ToString()); - - var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor"); + var options = new ActorProxyOptions + { + useGrpc = false, + }; + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor", options); await WaitForActorRuntimeAsync(proxy, cts.Token); ActorMethodInvocationException ex = await Assert.ThrowsAsync(async () => await proxy.ExceptionExample()); Assert.Contains("Remote Actor Method Exception", ex.Message); diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs index 30b19a450..74a722b73 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs @@ -41,7 +41,7 @@ public ReentrantTests(ITestOutputHelper output, DaprTestAppFixture fixture) : ba this.proxyFactory = new Lazy(() => { Debug.Assert(this.HttpEndpoint != null); - return new ActorProxyFactory(new ActorProxyOptions() { HttpEndpoint = this.HttpEndpoint, }); + return new ActorProxyFactory(new ActorProxyOptions() { HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint}); }); } diff --git a/test/Dapr.E2E.Test/E2ETests.cs b/test/Dapr.E2E.Test/E2ETests.cs index 94ebbf3df..5e1718539 100644 --- a/test/Dapr.E2E.Test/E2ETests.cs +++ b/test/Dapr.E2E.Test/E2ETests.cs @@ -40,7 +40,8 @@ public E2ETests(ITestOutputHelper output, DaprTestAppFixture fixture) this.proxyFactory = new Lazy(() => { Debug.Assert(this.HttpEndpoint != null); - return new ActorProxyFactory(new ActorProxyOptions(){ HttpEndpoint = this.HttpEndpoint, }); + Debug.Assert(this.GrpcEndpoint != null); + return new ActorProxyFactory(new ActorProxyOptions(){ HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint }); }); } From f90d2ec253edfa265183e812cca8734497c69b99 Mon Sep 17 00:00:00 2001 From: addjuarez Date: Thu, 22 Sep 2022 20:55:35 +0000 Subject: [PATCH 05/17] Fix exception test Signed-off-by: addjuarez --- test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index ab3a76655..54995acfd 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -30,6 +30,8 @@ public async Task ActorCanProvideExceptionDetails() var options = new ActorProxyOptions { useGrpc = false, + HttpEndpoint = this.HttpEndpoint, + GrpcEndpoint = this.GrpcEndpoint }; var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor", options); await WaitForActorRuntimeAsync(proxy, cts.Token); From d19061c58813513515abfdd1a9945e78f62e2a5d Mon Sep 17 00:00:00 2001 From: addjuarez Date: Thu, 22 Sep 2022 21:49:24 +0000 Subject: [PATCH 06/17] remove unused http vars Signed-off-by: addjuarez --- src/Dapr.Actors/Client/ActorProxyFactory.cs | 6 --- src/Dapr.Actors/DaprGrpcInteractor.cs | 21 +-------- src/Dapr.Actors/DaprInteractorBuilder.cs | 47 +-------------------- src/Dapr.Actors/Runtime/ActorRuntime.cs | 1 - 4 files changed, 2 insertions(+), 73 deletions(-) diff --git a/src/Dapr.Actors/Client/ActorProxyFactory.cs b/src/Dapr.Actors/Client/ActorProxyFactory.cs index 1edfc6be6..ed217f99c 100644 --- a/src/Dapr.Actors/Client/ActorProxyFactory.cs +++ b/src/Dapr.Actors/Client/ActorProxyFactory.cs @@ -70,11 +70,8 @@ public ActorProxy Create(ActorId actorId, string actorType, ActorProxyOptions op if (options.useGrpc) { daprInteractor = new DaprInteractorBuilder() .UseGrpcEndpoint(options.GrpcEndpoint) - .UseHttpEndpoint(options.HttpEndpoint) .UseDaprApiToken(options.DaprApiToken) .UseGrpcChannelOptions(options.GrpcChannelOptions) - .UseHandler(this.handler) - .UseRequestTimeout(options.RequestTimeout) .Build(); } else { daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); @@ -93,11 +90,8 @@ public object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string if (options.useGrpc) { daprInteractor = new DaprInteractorBuilder() .UseGrpcEndpoint(options.GrpcEndpoint) - .UseHttpEndpoint(options.HttpEndpoint) .UseDaprApiToken(options.DaprApiToken) .UseGrpcChannelOptions(options.GrpcChannelOptions) - .UseHandler(this.handler) - .UseRequestTimeout(options.RequestTimeout) .Build(); } else { daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs index 48563e55f..569a3c2b3 100644 --- a/src/Dapr.Actors/DaprGrpcInteractor.cs +++ b/src/Dapr.Actors/DaprGrpcInteractor.cs @@ -38,10 +38,6 @@ namespace Dapr.Actors internal class DaprGrpcInteractor : IDaprInteractor { private readonly JsonSerializerOptions jsonSerializerOptions = JsonSerializerDefaults.Web; - private readonly string httpEndpoint; - private readonly static HttpMessageHandler defaultHandler = new HttpClientHandler(); - private readonly HttpMessageHandler handler; - private HttpClient httpClient; private bool disposed; private string daprApiToken; private readonly Autogenerated.Dapr.DaprClient client; @@ -53,18 +49,11 @@ internal class DaprGrpcInteractor : IDaprInteractor public DaprGrpcInteractor( GrpcChannel channel, Autogenerated.Dapr.DaprClient inner, - HttpMessageHandler clientHandler, - string httpEndpoint, - string apiToken, - TimeSpan? requestTimeout) + string apiToken) { this.channel = channel; this.client = inner; - this.handler = clientHandler ?? defaultHandler; - this.httpEndpoint = httpEndpoint; this.daprApiToken = apiToken; - this.httpClient = this.CreateHttpClient(); - this.httpClient.Timeout = requestTimeout ?? this.httpClient.Timeout; } public async Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default) @@ -180,7 +169,6 @@ public async Task InvokeActorMethodWithRemotingAsync(Acto out var remoteMethodException); if (isDeserialzied) { - throw new ActorMethodInvocationException( "Remote Actor Method Exception, DETAILS: " + remoteMethodException.Message, remoteMethodException, @@ -352,8 +340,6 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - this.httpClient.Dispose(); - this.httpClient = null; this.channel.Dispose(); } @@ -374,10 +360,5 @@ private CallOptions CreateCallOptions(CancellationToken cancellationToken) return options; } - private HttpClient CreateHttpClient() - { - return new HttpClient(this.handler, false); - } - } } diff --git a/src/Dapr.Actors/DaprInteractorBuilder.cs b/src/Dapr.Actors/DaprInteractorBuilder.cs index d7494f429..963df887b 100644 --- a/src/Dapr.Actors/DaprInteractorBuilder.cs +++ b/src/Dapr.Actors/DaprInteractorBuilder.cs @@ -29,7 +29,6 @@ public sealed class DaprInteractorBuilder public DaprInteractorBuilder() { this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint(); - this.HttpEndpoint = DaprDefaults.GetDefaultHttpEndpoint(); this.GrpcChannelOptions = new GrpcChannelOptions() { @@ -40,31 +39,9 @@ public DaprInteractorBuilder() internal string GrpcEndpoint { get; private set; } - internal string HttpEndpoint { get; private set; } - internal GrpcChannelOptions GrpcChannelOptions { get; private set; } internal string DaprApiToken { get; private set; } - internal TimeSpan? RequestTimeout { get; set; } = null; - - internal HttpMessageHandler Handler { get; set; } = null; - /// - /// Overrides the HTTP endpoint used by for communicating with the Dapr runtime. - /// - /// - /// The URI endpoint to use for HTTP calls to the Dapr runtime. The default value will be - /// http://127.0.0.1:DAPR_HTTP_PORT where DAPR_HTTP_PORT represents the value of the - /// DAPR_HTTP_PORT environment variable. - /// - /// The instance. - public DaprInteractorBuilder UseHttpEndpoint(string httpEndpoint) - { - ArgumentVerifier.ThrowIfNullOrEmpty(httpEndpoint, nameof(httpEndpoint)); - this.HttpEndpoint = httpEndpoint; - return this; - } - - /// /// Overrides the gRPC endpoint used by for communicating with the Dapr runtime. /// @@ -104,28 +81,6 @@ public DaprInteractorBuilder UseDaprApiToken(string apiToken) return this; } - /// - /// Adds the provided on every request to the Dapr runtime. - /// - /// The token to be added to the request headers/>. - /// The instance. - public DaprInteractorBuilder UseHandler(HttpMessageHandler handler) - { - this.Handler = handler; - return this; - } - - /// - /// Adds the provided on every request to the Dapr runtime. - /// - /// The token to be added to the request headers/>. - /// The instance. - public DaprInteractorBuilder UseRequestTimeout(TimeSpan? requestTimeout) - { - this.RequestTimeout = requestTimeout; - return this; - } - /// /// Builds a instance from the properties of the builder. /// @@ -148,7 +103,7 @@ internal DaprGrpcInteractor Build() var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions); var client = new Autogenerated.Dapr.DaprClient(channel); - return new DaprGrpcInteractor(channel, client, this.Handler, this.HttpEndpoint, this.DaprApiToken, this.RequestTimeout); + return new DaprGrpcInteractor(channel, client, this.DaprApiToken); } } } diff --git a/src/Dapr.Actors/Runtime/ActorRuntime.cs b/src/Dapr.Actors/Runtime/ActorRuntime.cs index e51e0ce11..3643f29ea 100644 --- a/src/Dapr.Actors/Runtime/ActorRuntime.cs +++ b/src/Dapr.Actors/Runtime/ActorRuntime.cs @@ -54,7 +54,6 @@ internal ActorRuntime(ActorRuntimeOptions options, ILoggerFactory loggerFactory, if (options.useGrpc) { daprInteractor = new DaprInteractorBuilder() .UseGrpcEndpoint(options.GrpcEndpoint) - .UseHttpEndpoint(options.HttpEndpoint) .UseDaprApiToken(options.DaprApiToken) .UseGrpcChannelOptions(options.GrpcChannelOptions) .Build(); From 593786f9665f1cca5e7c9d51c0827253adb4ae3b Mon Sep 17 00:00:00 2001 From: saber-wang <45062099+saber-wang@users.noreply.github.com> Date: Sat, 24 Sep 2022 03:06:01 +0800 Subject: [PATCH 07/17] add metadata api (#947) * feat: add metadata api Signed-off-by: saberwang * Using HTTP API to get and set dapr metadata Signed-off-by: saberwang * style Signed-off-by: saberwang * using grpc to add metadata api Signed-off-by: saberwang * style Signed-off-by: saberwang Co-authored-by: halspang <70976921+halspang@users.noreply.github.com> Signed-off-by: addjuarez --- src/Dapr.Client/DaprClient.cs | 32 ++++- src/Dapr.Client/DaprClientGrpc.cs | 40 ++++++ src/Dapr.Client/DaprMetadata.cs | 126 ++++++++++++++++ .../DaprClientTest.InvokeMethodAsync.cs | 4 +- .../DaprClientTest.InvokeMethodGrpcAsync.cs | 136 +++++++++++++++--- 5 files changed, 318 insertions(+), 20 deletions(-) create mode 100644 src/Dapr.Client/DaprMetadata.cs diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 88949c2c0..26bf1d27f 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -338,6 +338,36 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, stri /// A that will return when the operation has completed. public abstract Task ShutdownSidecarAsync(CancellationToken cancellationToken = default); + /// + /// Calls the sidecar's metadata endpoint which returns information including: + /// + /// + /// The sidecar's ID. + /// + /// + /// The registered/active actors if any. + /// + /// + /// Registered components including name, type, version, and information on capabilities if present. + /// + /// + /// Any extended metadata that has been set via + /// + /// + /// + /// A that can be used to cancel the operation. + /// A that will return the value when the operation has completed. + public abstract Task GetMetadataAsync(CancellationToken cancellationToken = default); + + /// + /// Perform service add extended metadata to the Dapr sidecar. + /// + /// Custom attribute name + /// Custom attribute value + /// A that can be used to cancel the operation. + /// A that will return the value when the operation has completed. + public abstract Task SetMetadataAsync(string attributeName, string attributeValue, CancellationToken cancellationToken = default); + /// /// Perform service invocation using the request provided by . The response will /// be returned without performing any validation on the status code. diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index a21b30caf..cb3111462 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -1367,6 +1367,46 @@ public async override Task ShutdownSidecarAsync(CancellationToken cancellationTo await client.ShutdownAsync(new Empty(), CreateCallOptions(null, cancellationToken)); } + /// + public override async Task GetMetadataAsync(CancellationToken cancellationToken = default) + { + var options = CreateCallOptions(headers: null, cancellationToken); + try + { + var response = await client.GetMetadataAsync(new Empty(), options); + return new DaprMetadata(response.Id, + response.ActiveActorsCount.Select(c => new DaprActorMetadata(c.Type, c.Count)).ToList(), + response.ExtendedMetadata.ToDictionary(c => c.Key, c => c.Value), + response.RegisteredComponents.Select(c => new DaprComponentsMetadata(c.Name, c.Type, c.Version, c.Capabilities.ToArray())).ToList()); + } + catch (RpcException ex) + { + throw new DaprException("Get metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + /// + public override async Task SetMetadataAsync(string attributeName, string attributeValue, CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(attributeName, nameof(attributeName)); + + var envelope = new Autogenerated.SetMetadataRequest() + { + Key = attributeName, + Value = attributeValue + }; + + var options = CreateCallOptions(headers: null, cancellationToken); + + try + { + _ = await this.Client.SetMetadataAsync(envelope, options); + } + catch (RpcException ex) + { + throw new DaprException("Set metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } #endregion protected override void Dispose(bool disposing) diff --git a/src/Dapr.Client/DaprMetadata.cs b/src/Dapr.Client/DaprMetadata.cs new file mode 100644 index 000000000..4cd812e04 --- /dev/null +++ b/src/Dapr.Client/DaprMetadata.cs @@ -0,0 +1,126 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Generic; + +namespace Dapr.Client +{ + /// + /// Represents a metadata object returned from dapr sidecar. + /// + public sealed class DaprMetadata + { + /// + /// Initializes a new instance of the class. + /// + /// The application id. + /// The registered actors metadata. + /// The list of custom attributes as key-value pairs, where key is the attribute name. + /// The loaded components metadata. + public DaprMetadata(string id, IReadOnlyList actors, IReadOnlyDictionary extended, IReadOnlyList components) + { + Id = id; + Actors = actors; + Extended = extended; + Components = components; + } + + /// + /// Gets the application id. + /// + public string Id { get; } + + /// + /// Gets the registered actors metadata. + /// + public IReadOnlyList Actors { get; } + + /// + /// Gets the list of custom attributes as key-value pairs, where key is the attribute name. + /// + public IReadOnlyDictionary Extended { get; } + + /// + /// Gets the loaded components metadata. + /// + public IReadOnlyList Components { get; } + } + + /// + /// Represents a actor metadata object returned from dapr sidecar. + /// + public sealed class DaprActorMetadata + { + /// + /// Initializes a new instance of the class. + /// + /// This registered actor type. + /// The number of actors running. + public DaprActorMetadata(string type, int count) + { + Type = type; + Count = count; + } + + /// + /// Gets the registered actor type. + /// + public string Type { get; } + + /// + /// Gets the number of actors running. + /// + public int Count { get; } + } + + /// + /// Represents a components metadata object returned from dapr sidecar. + /// + public sealed class DaprComponentsMetadata + { + /// + /// Initializes a new instance of the class. + /// + /// The name of the component. + /// The component type. + /// The component version. + /// The supported capabilities for this component type and version. + public DaprComponentsMetadata(string name, string type, string version, string[] capabilities) + { + Name = name; + Type = type; + Version = version; + Capabilities = capabilities; + } + + /// + /// Gets the name of the component. + /// + public string Name { get; } + + /// + /// Gets the component type. + /// + public string Type { get; } + + /// + /// Gets the component version. + /// + public string Version { get; } + + /// + /// Gets the supported capabilities for this component type and version. + /// + public string[] Capabilities { get; } + } +} diff --git a/test/Dapr.Client.Test/DaprClientTest.InvokeMethodAsync.cs b/test/Dapr.Client.Test/DaprClientTest.InvokeMethodAsync.cs index 6c5488674..5d46000a1 100644 --- a/test/Dapr.Client.Test/DaprClientTest.InvokeMethodAsync.cs +++ b/test/Dapr.Client.Test/DaprClientTest.InvokeMethodAsync.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ namespace Dapr.Client.Test { using System; + using System.Collections.Generic; using System.Net; using System.Net.Http; using System.Net.Http.Json; @@ -22,6 +23,7 @@ namespace Dapr.Client.Test using System.Threading; using System.Threading.Tasks; using Dapr.Client; + using FluentAssertions; using Xunit; // Most of the InvokeMethodAsync functionality on DaprClient is non-abstract methods that diff --git a/test/Dapr.Client.Test/DaprClientTest.InvokeMethodGrpcAsync.cs b/test/Dapr.Client.Test/DaprClientTest.InvokeMethodGrpcAsync.cs index 93462415a..5412c4063 100644 --- a/test/Dapr.Client.Test/DaprClientTest.InvokeMethodGrpcAsync.cs +++ b/test/Dapr.Client.Test/DaprClientTest.InvokeMethodGrpcAsync.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ public partial class DaprClientTest [Fact] public async Task InvokeMethodGrpcAsync_WithCancelledToken() { - await using var client = TestClient.CreateForDaprClient(c => + await using var client = TestClient.CreateForDaprClient(c => { c.UseJsonSerializationOptions(this.jsonSerializerOptions); }); @@ -50,7 +50,7 @@ await Assert.ThrowsAsync(async () => [Fact] public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithReturnTypeAndData() { - await using var client = TestClient.CreateForDaprClient(c => + await using var client = TestClient.CreateForDaprClient(c => { c.UseJsonSerializationOptions(this.jsonSerializerOptions); }); @@ -88,7 +88,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithReturnTypeAndData_Thr Data = Any.Pack(data), }; - var response = + var response = client.Call() .SetResponse(invokeResponse) .Build(); @@ -105,7 +105,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithReturnTypeAndData_Thr .Setup(m => m.InvokeServiceAsync(It.IsAny(), It.IsAny())) .Throws(rpcException); - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(async () => { await client.DaprClient.InvokeMethodGrpcAsync("test", "test", new Request() { RequestParameter = "Hello " }); }); @@ -115,7 +115,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithReturnTypeAndData_Thr [Fact] public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithReturnTypeNoData() { - await using var client = TestClient.CreateForDaprClient(c => + await using var client = TestClient.CreateForDaprClient(c => { c.UseJsonSerializationOptions(this.jsonSerializerOptions); }); @@ -153,7 +153,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithReturnTypeNoData_Thro Data = Any.Pack(data), }; - var response = + var response = client.Call() .SetResponse(invokeResponse) .Build(); @@ -170,7 +170,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithReturnTypeNoData_Thro .Setup(m => m.InvokeServiceAsync(It.IsAny(), It.IsAny())) .Throws(rpcException); - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(async () => { await client.DaprClient.InvokeMethodGrpcAsync("test", "test"); }); @@ -188,11 +188,11 @@ public void InvokeMethodGrpcAsync_CanInvokeMethodWithNoReturnTypeAndData() Data = Any.Pack(data), }; - var response = + var response = client.Call() .SetResponse(invokeResponse) .Build(); - + client.Mock .Setup(m => m.InvokeServiceAsync(It.IsAny(), It.IsAny())) .Returns(response); @@ -210,7 +210,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithNoReturnTypeAndData_T Data = Any.Pack(data), }; - var response = + var response = client.Call() .SetResponse(invokeResponse) .Build(); @@ -228,7 +228,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithNoReturnTypeAndData_T .Setup(m => m.InvokeServiceAsync(It.IsAny(), It.IsAny())) .Throws(rpcException); - var ex = await Assert.ThrowsAsync(async () => + var ex = await Assert.ThrowsAsync(async () => { await client.DaprClient.InvokeMethodGrpcAsync("test", "test", new Request() { RequestParameter = "Hello " }); }); @@ -238,7 +238,7 @@ public async Task InvokeMethodGrpcAsync_CanInvokeMethodWithNoReturnTypeAndData_T [Fact] public async Task InvokeMethodGrpcAsync_WithNoReturnTypeAndData() { - await using var client = TestClient.CreateForDaprClient(c => + await using var client = TestClient.CreateForDaprClient(c => { c.UseJsonSerializationOptions(this.jsonSerializerOptions); }); @@ -256,7 +256,7 @@ public async Task InvokeMethodGrpcAsync_WithNoReturnTypeAndData() envelope.Id.Should().Be("test"); envelope.Message.Method.Should().Be("test"); envelope.Message.ContentType.Should().Be(Constants.ContentTypeApplicationGrpc); - + var actual = envelope.Message.Data.Unpack(); Assert.Equal(invokeRequest.RequestParameter, actual.RequestParameter); } @@ -264,7 +264,7 @@ public async Task InvokeMethodGrpcAsync_WithNoReturnTypeAndData() [Fact] public async Task InvokeMethodGrpcAsync_WithReturnTypeAndData() { - await using var client = TestClient.CreateForDaprClient(c => + await using var client = TestClient.CreateForDaprClient(c => { c.UseJsonSerializationOptions(this.jsonSerializerOptions); }); @@ -303,7 +303,7 @@ public async Task InvokeMethodGrpcAsync_AppCallback_SayHello() // Configure Client var httpClient = new AppCallbackClient(new DaprAppCallbackService()); var daprClient = new DaprClientBuilder() - .UseGrpcChannelOptions(new GrpcChannelOptions(){ HttpClient = httpClient, }) + .UseGrpcChannelOptions(new GrpcChannelOptions() { HttpClient = httpClient, }) .UseJsonSerializationOptions(this.jsonSerializerOptions) .Build(); @@ -320,7 +320,7 @@ public async Task InvokeMethodGrpcAsync_AppCallback_RepeatedField() // Configure Client var httpClient = new AppCallbackClient(new DaprAppCallbackService()); var daprClient = new DaprClientBuilder() - .UseGrpcChannelOptions(new GrpcChannelOptions(){ HttpClient = httpClient, }) + .UseGrpcChannelOptions(new GrpcChannelOptions() { HttpClient = httpClient, }) .UseJsonSerializationOptions(this.jsonSerializerOptions) .Build(); @@ -343,7 +343,7 @@ public async Task InvokeMethodGrpcAsync_AppCallback_UnexpectedMethod() // Configure Client var httpClient = new AppCallbackClient(new DaprAppCallbackService()); var daprClient = new DaprClientBuilder() - .UseGrpcChannelOptions(new GrpcChannelOptions(){ HttpClient = httpClient, }) + .UseGrpcChannelOptions(new GrpcChannelOptions() { HttpClient = httpClient, }) .UseJsonSerializationOptions(this.jsonSerializerOptions) .Build(); @@ -354,6 +354,106 @@ public async Task InvokeMethodGrpcAsync_AppCallback_UnexpectedMethod() response.Name.Should().Be("unexpected"); } + + [Fact] + public async Task GetMetadataAsync_WrapsRpcException() + { + var client = new MockClient(); + + const string rpcExceptionMessage = "RPC exception"; + const StatusCode rpcStatusCode = StatusCode.Unavailable; + const string rpcStatusDetail = "Non success"; + + var rpcStatus = new Status(rpcStatusCode, rpcStatusDetail); + var rpcException = new RpcException(rpcStatus, new Metadata(), rpcExceptionMessage); + + client.Mock + .Setup(m => m.GetMetadataAsync(It.IsAny(), It.IsAny())) + .Throws(rpcException); + + var ex = await Assert.ThrowsAsync(async () => + { + await client.DaprClient.GetMetadataAsync(default); + }); + Assert.Same(rpcException, ex.InnerException); + } + + [Fact] + public async Task GetMetadataAsync_WithReturnTypeAndData() + { + await using var client = TestClient.CreateForDaprClient(c => + { + c.UseJsonSerializationOptions(this.jsonSerializerOptions); + }); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + return await daprClient.GetMetadataAsync(default); + }); + + + // Create Response & Respond + var response = new Autogen.Grpc.v1.GetMetadataResponse() + { + Id = "testId", + }; + response.ActiveActorsCount.Add(new ActiveActorsCount { Type = "testType", Count = 1 }); + response.RegisteredComponents.Add(new RegisteredComponents { Name = "testName", Type = "testType", Version = "V1" }); + response.ExtendedMetadata.Add("e1", "v1"); + + // Validate Response + var metadata = await request.CompleteWithMessageAsync(response); + metadata.Id.Should().Be("testId"); + metadata.Extended.Should().Contain(new System.Collections.Generic.KeyValuePair("e1", "v1")); + metadata.Actors.Should().Contain(actors => actors.Count == 1 && actors.Type == "testType"); + metadata.Components.Should().Contain(components => components.Name == "testName" && components.Type == "testType" && components.Version == "V1" && components.Capabilities.Length == 0); + } + + [Fact] + public async Task SetMetadataAsync_WrapsRpcException() + { + var client = new MockClient(); + + const string rpcExceptionMessage = "RPC exception"; + const StatusCode rpcStatusCode = StatusCode.Unavailable; + const string rpcStatusDetail = "Non success"; + + var rpcStatus = new Status(rpcStatusCode, rpcStatusDetail); + var rpcException = new RpcException(rpcStatus, new Metadata(), rpcExceptionMessage); + + client.Mock + .Setup(m => m.SetMetadataAsync(It.IsAny(), It.IsAny())) + .Throws(rpcException); + + var ex = await Assert.ThrowsAsync(async () => + { + await client.DaprClient.SetMetadataAsync("testName", "", default); + }); + Assert.Same(rpcException, ex.InnerException); + } + + [Fact] + public async Task SetMetadataAsync_WithReturnTypeAndData() + { + await using var client = TestClient.CreateForDaprClient(c => + { + c.UseJsonSerializationOptions(this.jsonSerializerOptions); + }); + + var request = await client.CaptureGrpcRequestAsync(daprClient => + { + return daprClient.SetMetadataAsync("test", "testv", default); + }); + + // Get Request and validate + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.Key.Should().Be("test"); + envelope.Value.Should().Be("testv"); + + await request.CompleteWithMessageAsync(new Empty()); + + } + // Test implementation of the AppCallback.AppCallbackBase service private class DaprAppCallbackService : AppCallback.Autogen.Grpc.v1.AppCallback.AppCallbackBase { From db26e1d8d3dd8bc94f3f768e1826071783554eb8 Mon Sep 17 00:00:00 2001 From: addjuarez Date: Wed, 5 Oct 2022 20:35:57 +0000 Subject: [PATCH 08/17] refactor Signed-off-by: addjuarez --- src/Dapr.Actors/Client/ActorProxyFactory.cs | 45 +++++--- src/Dapr.Actors/Client/ActorProxyOptions.cs | 2 +- src/Dapr.Actors/DaprGrpcInteractor.cs | 16 ++- src/Dapr.Actors/DaprHttpInteractor.cs | 13 ++- src/Dapr.Actors/DaprInteractorBuilder.cs | 109 ------------------ src/Dapr.Actors/IDaprInteractor.cs | 25 ++-- src/Dapr.Actors/Runtime/ActorRuntime.cs | 23 +++- .../Runtime/ActorRuntimeOptions.cs | 2 +- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 14 +-- test/Dapr.Actors.Test/ApiTokenTests.cs | 6 +- .../Actors/E2ETests.ExceptionTests.cs | 2 +- 11 files changed, 88 insertions(+), 169 deletions(-) delete mode 100644 src/Dapr.Actors/DaprInteractorBuilder.cs diff --git a/src/Dapr.Actors/Client/ActorProxyFactory.cs b/src/Dapr.Actors/Client/ActorProxyFactory.cs index ed217f99c..2893d76fb 100644 --- a/src/Dapr.Actors/Client/ActorProxyFactory.cs +++ b/src/Dapr.Actors/Client/ActorProxyFactory.cs @@ -17,7 +17,8 @@ namespace Dapr.Actors.Client using System.Net.Http; using Dapr.Actors.Builder; using Dapr.Actors.Communication.Client; - + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + using Grpc.Net.Client; /// /// Represents a factory class to create a proxy to the remote actor objects. /// @@ -67,12 +68,21 @@ public ActorProxy Create(ActorId actorId, string actorType, ActorProxyOptions op var actorProxy = new ActorProxy(); IDaprInteractor daprInteractor; - if (options.useGrpc) { - daprInteractor = new DaprInteractorBuilder() - .UseGrpcEndpoint(options.GrpcEndpoint) - .UseDaprApiToken(options.DaprApiToken) - .UseGrpcChannelOptions(options.GrpcChannelOptions) - .Build(); + if (options.UseGrpc) { + var grpcEndpoint = new Uri(options.GrpcEndpoint); + if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https") + { + throw new InvalidOperationException("The gRPC endpoint must use http or https."); + } + + if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp)) + { + // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel. + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + } + var channel = GrpcChannel.ForAddress(options.GrpcEndpoint, options.GrpcChannelOptions); + var client = new Autogenerated.Dapr.DaprClient(channel); + daprInteractor = new DaprGrpcInteractor(channel, client, options.DaprApiToken); } else { daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); } @@ -87,12 +97,21 @@ public object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string { options ??= this.DefaultOptions; IDaprInteractor daprInteractor; - if (options.useGrpc) { - daprInteractor = new DaprInteractorBuilder() - .UseGrpcEndpoint(options.GrpcEndpoint) - .UseDaprApiToken(options.DaprApiToken) - .UseGrpcChannelOptions(options.GrpcChannelOptions) - .Build(); + if (options.UseGrpc) { + var grpcEndpoint = new Uri(options.GrpcEndpoint); + if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https") + { + throw new InvalidOperationException("The gRPC endpoint must use http or https."); + } + + if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp)) + { + // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel. + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + } + var channel = GrpcChannel.ForAddress(options.GrpcEndpoint, options.GrpcChannelOptions); + var client = new Autogenerated.Dapr.DaprClient(channel); + daprInteractor = new DaprGrpcInteractor(channel, client, options.DaprApiToken); } else { daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout); } diff --git a/src/Dapr.Actors/Client/ActorProxyOptions.cs b/src/Dapr.Actors/Client/ActorProxyOptions.cs index ab8758a43..958be1e17 100644 --- a/src/Dapr.Actors/Client/ActorProxyOptions.cs +++ b/src/Dapr.Actors/Client/ActorProxyOptions.cs @@ -77,7 +77,7 @@ public JsonSerializerOptions JsonSerializerOptions /// /// Option to use GRPC or HTTP /// - public bool useGrpc { get; set; } = true; + public bool UseGrpc { get; set; } = true; /// /// Options for grpc channel diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs index 569a3c2b3..669244562 100644 --- a/src/Dapr.Actors/DaprGrpcInteractor.cs +++ b/src/Dapr.Actors/DaprGrpcInteractor.cs @@ -13,12 +13,10 @@ namespace Dapr.Actors { - using System; using System.Collections.Generic; using System.Globalization; using System.IO; using System.Linq; - using System.Net.Http; using System.Text; using System.Text.Json; using System.Threading; @@ -78,19 +76,14 @@ public async Task GetStateAsync(string actorType, string actorId, string return response.Data.ToStringUtf8(); } - public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) - { - return null; - } - - public async Task SaveStateTransactionallyAsyncGrpc(string actorType, string actorId, List data, CancellationToken cancellationToken = default) + public async Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default, List grpcData = default) { var request = new Autogenerated.ExecuteActorStateTransactionRequest() { ActorId = actorId, ActorType = actorType, }; - request.Operations.AddRange(data); + request.Operations.AddRange(grpcData); var options = CreateCallOptions(cancellationToken); try @@ -103,6 +96,11 @@ public async Task SaveStateTransactionallyAsyncGrpc(string actorType, string act } } + public Task DoStateChangesTransactionallyAsync(DaprStateProvider provider, string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + { + return provider.DoStateChangesTransactionallyAsyncGrpc(actorType, actorId, stateChanges, cancellationToken); + } + public async Task InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default) { var requestMessageHeader = remotingRequestRequestMessage.GetHeader(); diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index a89b8da91..128ca80ca 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -29,6 +29,7 @@ namespace Dapr.Actors using Dapr.Actors.Resources; using System.Xml; using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + using Dapr.Actors.Runtime; /// /// Class to interact with Dapr runtime over http. @@ -76,12 +77,7 @@ HttpRequestMessage RequestFunc() return stringResponse; } - public Task SaveStateTransactionallyAsyncGrpc(string actorType, string actorId, List data, CancellationToken cancellationToken = default) - { - return null; - } - - public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default) + public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default, List grpcData = default) { var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorStateRelativeUrlFormat, actorType, actorId); @@ -99,6 +95,11 @@ HttpRequestMessage RequestFunc() return this.SendAsync(RequestFunc, relativeUrl, cancellationToken); } + public Task DoStateChangesTransactionallyAsync(DaprStateProvider provider, string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + { + return provider.DoStateChangesTransactionallyAsyncHttp(actorType, actorId, stateChanges, cancellationToken); + } + public async Task InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default) { var requestMessageHeader = remotingRequestRequestMessage.GetHeader(); diff --git a/src/Dapr.Actors/DaprInteractorBuilder.cs b/src/Dapr.Actors/DaprInteractorBuilder.cs deleted file mode 100644 index 963df887b..000000000 --- a/src/Dapr.Actors/DaprInteractorBuilder.cs +++ /dev/null @@ -1,109 +0,0 @@ -// ------------------------------------------------------------------------ -// Copyright 2022 The Dapr Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ------------------------------------------------------------------------ - -namespace Dapr.Actors -{ - using System; - using System.Net.Http; - using Grpc.Net.Client; - using Autogenerated = Dapr.Client.Autogen.Grpc.v1; - - /// - /// Builder for building - /// - public sealed class DaprInteractorBuilder - { - /// - /// Initializes a new instance of the class. - /// - public DaprInteractorBuilder() - { - this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint(); - - this.GrpcChannelOptions = new GrpcChannelOptions() - { - ThrowOperationCanceledOnCancellation = true, - }; - this.DaprApiToken = DaprDefaults.GetDefaultDaprApiToken(); - } - - internal string GrpcEndpoint { get; private set; } - - internal GrpcChannelOptions GrpcChannelOptions { get; private set; } - internal string DaprApiToken { get; private set; } - - /// - /// Overrides the gRPC endpoint used by for communicating with the Dapr runtime. - /// - /// - /// The URI endpoint to use for gRPC calls to the Dapr runtime. The default value will be - /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the - /// DAPR_GRPC_PORT environment variable. - /// - /// The instance. - public DaprInteractorBuilder UseGrpcEndpoint(string grpcEndpoint) - { - ArgumentVerifier.ThrowIfNullOrEmpty(grpcEndpoint, nameof(grpcEndpoint)); - this.GrpcEndpoint = grpcEndpoint; - return this; - } - - - /// - /// Uses the provided for creating the . - /// - /// The to use for creating the . - /// The instance. - public DaprInteractorBuilder UseGrpcChannelOptions(GrpcChannelOptions grpcChannelOptions) - { - this.GrpcChannelOptions = grpcChannelOptions; - return this; - } - - /// - /// Adds the provided on every request to the Dapr runtime. - /// - /// The token to be added to the request headers/>. - /// The instance. - public DaprInteractorBuilder UseDaprApiToken(string apiToken) - { - this.DaprApiToken = apiToken; - return this; - } - - /// - /// Builds a instance from the properties of the builder. - /// - /// The . - internal DaprGrpcInteractor Build() - { - var grpcEndpoint = new Uri(this.GrpcEndpoint); - if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https") - { - throw new InvalidOperationException("The gRPC endpoint must use http or https."); - } - - if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp)) - { - // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel. - AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); - } - - - var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions); - var client = new Autogenerated.Dapr.DaprClient(channel); - - return new DaprGrpcInteractor(channel, client, this.DaprApiToken); - } - } -} diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs index d4c7b940e..f88309dad 100644 --- a/src/Dapr.Actors/IDaprInteractor.cs +++ b/src/Dapr.Actors/IDaprInteractor.cs @@ -19,6 +19,7 @@ namespace Dapr.Actors using Dapr.Actors.Communication; using Autogenerated = Dapr.Client.Autogen.Grpc.v1; using System.Collections.Generic; + using Dapr.Actors.Runtime; /// /// Interface for interacting with Dapr runtime. @@ -42,19 +43,10 @@ internal interface IDaprInteractor /// Type of actor. /// ActorId. /// JSON data with state changes as per the Dapr spec for transaction state update. + /// GRPC data with state changes as per the Dapr spec for transaction state update. /// Cancels the operation. /// A task that represents the asynchronous operation. - Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default); - - /// - /// Saves state batch to Dapr. - /// - /// Type of actor. - /// ActorId. - /// JSON data with state changes as per the Dapr spec for transaction state update. - /// Cancels the operation. - /// A task that represents the asynchronous operation. - Task SaveStateTransactionallyAsyncGrpc(string actorType, string actorId, List data, CancellationToken cancellationToken = default); + Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default, List grpcData = default); /// /// Saves a state to Dapr. @@ -107,6 +99,17 @@ internal interface IDaprInteractor /// A representing the result of the asynchronous operation. Task RegisterTimerAsync(string actorType, string actorId, string timerName, string data, CancellationToken cancellationToken = default); + /// + /// Parses state changes. + /// + /// Type of actor. + /// Type of actor. + /// ActorId. + /// StateChanges. + /// Cancels the operation. + /// A representing the result of the asynchronous operation. + Task DoStateChangesTransactionallyAsync(DaprStateProvider provider, string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default); + /// /// Unegisters a timer. /// diff --git a/src/Dapr.Actors/Runtime/ActorRuntime.cs b/src/Dapr.Actors/Runtime/ActorRuntime.cs index 3643f29ea..95a68a35f 100644 --- a/src/Dapr.Actors/Runtime/ActorRuntime.cs +++ b/src/Dapr.Actors/Runtime/ActorRuntime.cs @@ -21,6 +21,8 @@ using System.Threading.Tasks; using Dapr.Actors.Client; using Microsoft.Extensions.Logging; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; +using Grpc.Net.Client; namespace Dapr.Actors.Runtime { @@ -51,12 +53,21 @@ internal ActorRuntime(ActorRuntimeOptions options, ILoggerFactory loggerFactory, foreach (var actor in options.Actors) { IDaprInteractor daprInteractor; - if (options.useGrpc) { - daprInteractor = new DaprInteractorBuilder() - .UseGrpcEndpoint(options.GrpcEndpoint) - .UseDaprApiToken(options.DaprApiToken) - .UseGrpcChannelOptions(options.GrpcChannelOptions) - .Build(); + if (options.UseGrpc) { + var grpcEndpoint = new Uri(options.GrpcEndpoint); + if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https") + { + throw new InvalidOperationException("The gRPC endpoint must use http or https."); + } + + if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp)) + { + // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel. + AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + } + var channel = GrpcChannel.ForAddress(options.GrpcEndpoint, options.GrpcChannelOptions); + var client = new Autogenerated.Dapr.DaprClient(channel); + daprInteractor = new DaprGrpcInteractor(channel, client, options.DaprApiToken); } else { daprInteractor = new DaprHttpInteractor(null, options.HttpEndpoint, options.DaprApiToken, null); } diff --git a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs index e364f940b..43b02b5be 100644 --- a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs +++ b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs @@ -241,7 +241,7 @@ public int? RemindersStoragePartitions /// /// Option to use GRPC or HTTP /// - public bool useGrpc { get; set; } = true; + public bool UseGrpc { get; set; } = true; /// /// Options for grpc channel diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index 67f1ec6db..0f93c71a4 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -80,14 +80,10 @@ public async Task ContainsStateAsync(string actorType, string actorId, str public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) { - if (this.daprInteractor.GetType().Name.Equals("DaprGrpcInteractor")){ - await this.DoStateChangesTransactionallyAsyncGrpc(actorType, actorId, stateChanges, cancellationToken); - } else { - await this.DoStateChangesTransactionallyAsyncHttp(actorType, actorId, stateChanges, cancellationToken); - } + await this.daprInteractor.DoStateChangesTransactionallyAsync(this, actorType, actorId, stateChanges, cancellationToken); } - private async Task DoStateChangesTransactionallyAsyncGrpc(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + public async Task DoStateChangesTransactionallyAsyncGrpc(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) { List grpcOps = new List(); @@ -130,10 +126,10 @@ private async Task DoStateChangesTransactionallyAsyncGrpc(string actorType, stri } grpcOps.Add(op); } - await this.daprInteractor.SaveStateTransactionallyAsyncGrpc(actorType, actorId, grpcOps, cancellationToken); + await this.daprInteractor.SaveStateTransactionallyAsync(actorType, actorId, null, cancellationToken, grpcOps); } - private async Task DoStateChangesTransactionallyAsyncHttp(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + public async Task DoStateChangesTransactionallyAsyncHttp(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) { // Transactional state update request body: /* @@ -198,7 +194,7 @@ private async Task DoStateChangesTransactionallyAsyncHttp(string actorType, stri await writer.FlushAsync(); var content = Encoding.UTF8.GetString(stream.ToArray()); - await this.daprInteractor.SaveStateTransactionallyAsync(actorType, actorId, content, cancellationToken); + await this.daprInteractor.SaveStateTransactionallyAsync(actorType, actorId, content, cancellationToken, null); } private string GetDaprStateOperation(StateChangeKind changeKind) diff --git a/test/Dapr.Actors.Test/ApiTokenTests.cs b/test/Dapr.Actors.Test/ApiTokenTests.cs index 57e12b4d3..bc9f1dccc 100644 --- a/test/Dapr.Actors.Test/ApiTokenTests.cs +++ b/test/Dapr.Actors.Test/ApiTokenTests.cs @@ -31,7 +31,7 @@ public async Task CreateProxyWithRemoting_WithApiToken() var options = new ActorProxyOptions { DaprApiToken = "test_token", - useGrpc = false, + UseGrpc = false, }; var request = await client.CaptureHttpRequestAsync(async handler => @@ -78,7 +78,7 @@ public async Task CreateProxyWithNoRemoting_WithApiToken() var options = new ActorProxyOptions { DaprApiToken = "test_token", - useGrpc = false, + UseGrpc = false, }; var request = await client.CaptureHttpRequestAsync(async handler => @@ -103,7 +103,7 @@ public async Task CreateProxyWithNoRemoting_WithNoApiToken() var options = new ActorProxyOptions { - useGrpc = false, + UseGrpc = false, }; var request = await client.CaptureHttpRequestAsync(async handler => diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index 54995acfd..c3aa24974 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -29,7 +29,7 @@ public async Task ActorCanProvideExceptionDetails() var actorIds = new ActorId(Guid.NewGuid().ToString()); var options = new ActorProxyOptions { - useGrpc = false, + UseGrpc = false, HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint }; From a36c327c3fa2a91d6821fa94d3991a6190b714bd Mon Sep 17 00:00:00 2001 From: addjuarez Date: Thu, 6 Oct 2022 15:25:38 +0000 Subject: [PATCH 09/17] suppress obsolete error Signed-off-by: addjuarez --- src/Dapr.Actors/DaprGrpcInteractor.cs | 2 ++ src/Dapr.Actors/Runtime/TimerInfo.cs | 1 + 2 files changed, 3 insertions(+) diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs index 669244562..78b81db37 100644 --- a/src/Dapr.Actors/DaprGrpcInteractor.cs +++ b/src/Dapr.Actors/DaprGrpcInteractor.cs @@ -281,6 +281,7 @@ public async Task UnregisterReminderAsync(string actorType, string actorId, stri } } + #pragma warning disable 0618 public async Task RegisterTimerAsync(string actorType, string actorId, string timerName, string data, CancellationToken cancellationToken = default) { var timerdata = JsonSerializer.Deserialize(data, jsonSerializerOptions); @@ -307,6 +308,7 @@ public async Task RegisterTimerAsync(string actorType, string actorId, string ti throw new DaprApiException("RegisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } + #pragma warning restore 0618 public async Task UnregisterTimerAsync(string actorType, string actorId, string timerName, CancellationToken cancellationToken = default) { diff --git a/src/Dapr.Actors/Runtime/TimerInfo.cs b/src/Dapr.Actors/Runtime/TimerInfo.cs index d5a208735..bc206915a 100644 --- a/src/Dapr.Actors/Runtime/TimerInfo.cs +++ b/src/Dapr.Actors/Runtime/TimerInfo.cs @@ -24,6 +24,7 @@ namespace Dapr.Actors.Runtime /// /// Represents the details of the timer set on an Actor. /// + [Obsolete("This class is an implementation detail of the framework and will be made internal in a future release.")] [JsonConverter(typeof(TimerInfoConverter))] public class TimerInfo { From a45d4289275bcbe54710fb6278e0ab23d30cafbd Mon Sep 17 00:00:00 2001 From: addjuarez <6789375+addjuarez@users.noreply.github.com> Date: Fri, 16 Dec 2022 20:44:45 +0000 Subject: [PATCH 10/17] change bools Signed-off-by: addjuarez <6789375+addjuarez@users.noreply.github.com> --- src/Dapr.Actors/Client/ActorProxyOptions.cs | 2 +- src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs | 2 +- test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Dapr.Actors/Client/ActorProxyOptions.cs b/src/Dapr.Actors/Client/ActorProxyOptions.cs index 958be1e17..dc7213358 100644 --- a/src/Dapr.Actors/Client/ActorProxyOptions.cs +++ b/src/Dapr.Actors/Client/ActorProxyOptions.cs @@ -77,7 +77,7 @@ public JsonSerializerOptions JsonSerializerOptions /// /// Option to use GRPC or HTTP /// - public bool UseGrpc { get; set; } = true; + public bool UseGrpc { get; set; } = false; /// /// Options for grpc channel diff --git a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs index 43b02b5be..91a9fbbbb 100644 --- a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs +++ b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs @@ -241,7 +241,7 @@ public int? RemindersStoragePartitions /// /// Option to use GRPC or HTTP /// - public bool UseGrpc { get; set; } = true; + public bool UseGrpc { get; set; } = false; /// /// Options for grpc channel diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index c3aa24974..0462444d5 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -29,8 +29,7 @@ public async Task ActorCanProvideExceptionDetails() var actorIds = new ActorId(Guid.NewGuid().ToString()); var options = new ActorProxyOptions { - UseGrpc = false, - HttpEndpoint = this.HttpEndpoint, + UseGrpc = true, GrpcEndpoint = this.GrpcEndpoint }; var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor", options); From 8fcf953a22cb2a8f6489bc86e4d3ae7a1e94c86a Mon Sep 17 00:00:00 2001 From: addjuarez <6789375+addjuarez@users.noreply.github.com> Date: Fri, 16 Dec 2022 21:13:55 +0000 Subject: [PATCH 11/17] excpetion test Signed-off-by: addjuarez <6789375+addjuarez@users.noreply.github.com> --- test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index 0462444d5..986a2c4f0 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -19,7 +19,6 @@ namespace Dapr.E2E.Test using Dapr.Actors; using Dapr.E2E.Test.Actors.ExceptionTesting; using Xunit; - using Dapr.Actors.Client; public partial class E2ETests : IAsyncLifetime { [Fact] @@ -27,12 +26,8 @@ public async Task ActorCanProvideExceptionDetails() { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); var actorIds = new ActorId(Guid.NewGuid().ToString()); - var options = new ActorProxyOptions - { - UseGrpc = true, - GrpcEndpoint = this.GrpcEndpoint - }; - var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor", options); + + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor"); await WaitForActorRuntimeAsync(proxy, cts.Token); ActorMethodInvocationException ex = await Assert.ThrowsAsync(async () => await proxy.ExceptionExample()); Assert.Contains("Remote Actor Method Exception", ex.Message); From 57e7579f73665d25be41d285016e18f7d0e43f68 Mon Sep 17 00:00:00 2001 From: addjuarez <6789375+addjuarez@users.noreply.github.com> Date: Fri, 16 Dec 2022 21:26:35 +0000 Subject: [PATCH 12/17] update test Signed-off-by: addjuarez <6789375+addjuarez@users.noreply.github.com> --- test/Dapr.E2E.Test/E2ETests.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/Dapr.E2E.Test/E2ETests.cs b/test/Dapr.E2E.Test/E2ETests.cs index 5e1718539..bf2d1e335 100644 --- a/test/Dapr.E2E.Test/E2ETests.cs +++ b/test/Dapr.E2E.Test/E2ETests.cs @@ -40,8 +40,7 @@ public E2ETests(ITestOutputHelper output, DaprTestAppFixture fixture) this.proxyFactory = new Lazy(() => { Debug.Assert(this.HttpEndpoint != null); - Debug.Assert(this.GrpcEndpoint != null); - return new ActorProxyFactory(new ActorProxyOptions(){ HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint }); + return new ActorProxyFactory(new ActorProxyOptions(){ HttpEndpoint = this.HttpEndpoint, }); }); } @@ -95,4 +94,4 @@ protected async Task WaitForActorRuntimeAsync(IPingActor proxy, CancellationToke await ActorRuntimeChecker.WaitForActorRuntimeAsync(this.AppId, this.Output, proxy, cancellationToken); } } -} +} \ No newline at end of file From 484b4704a511640bf7d0e3a79bef1fd71cc9765c Mon Sep 17 00:00:00 2001 From: addjuarez <6789375+addjuarez@users.noreply.github.com> Date: Mon, 2 Jan 2023 22:54:07 +0000 Subject: [PATCH 13/17] add e2e test Signed-off-by: addjuarez <6789375+addjuarez@users.noreply.github.com> --- .../Actors/E2ETests.ExceptionTests.cs | 15 +++ .../Actors/E2ETests.ReentrantTests.cs | 42 +++++++++ .../Actors/E2ETests.Regression762Tests.cs | 92 +++++++++++++++++++ .../Actors/E2ETests.ReminderTests.cs | 51 ++++++++++ .../Actors/E2ETests.TimerTests.cs | 50 ++++++++++ test/Dapr.E2E.Test/E2ETests.cs | 9 ++ 6 files changed, 259 insertions(+) diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index 986a2c4f0..f1bb71a67 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -34,5 +34,20 @@ public async Task ActorCanProvideExceptionDetails() Assert.Contains("ExceptionExample", ex.Message); Assert.Contains("32", ex.Message); } + + [Fact] + public async Task ActorCanProvideExceptionDetailsGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var actorIds = new ActorId(Guid.NewGuid().ToString()); + + var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor"); + await WaitForActorRuntimeAsync(proxy, cts.Token); + ActorMethodInvocationException ex = await Assert.ThrowsAsync(async () => await proxy.ExceptionExample()); + Assert.Contains("Remote Actor Method Exception", ex.Message); + Assert.Contains("ExceptionExample", ex.Message); + Assert.Contains("32", ex.Message); + } + } } \ No newline at end of file diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs index 74a722b73..1534a6b2e 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs @@ -27,7 +27,10 @@ public class ReentrantTests : DaprTestAppLifecycle { private static readonly int NumCalls = 10; private readonly Lazy proxyFactory; + private readonly Lazy proxyFactoryGrpc; private IActorProxyFactory ProxyFactory => this.HttpEndpoint == null ? null : this.proxyFactory.Value; + private IActorProxyFactory ProxyFactoryGrpc => this.GrpcEndpoint == null ? null : this.proxyFactoryGrpc.Value; + public ReentrantTests(ITestOutputHelper output, DaprTestAppFixture fixture) : base(output, fixture) { @@ -43,6 +46,13 @@ public ReentrantTests(ITestOutputHelper output, DaprTestAppFixture fixture) : ba Debug.Assert(this.HttpEndpoint != null); return new ActorProxyFactory(new ActorProxyOptions() { HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint}); }); + + this.proxyFactoryGrpc = new Lazy(() => + { + Debug.Assert(this.GrpcEndpoint != null); + return new ActorProxyFactory(new ActorProxyOptions() { HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint, UseGrpc = true,}); + }); + } [Fact] @@ -75,5 +85,37 @@ public async Task ActorCanPerformReentrantCalls() } } } + + [Fact] + public async Task ActorCanPerformReentrantCallsGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ReentrantActor"); + + await ActorRuntimeChecker.WaitForActorRuntimeAsync(this.AppId, this.Output, proxy, cts.Token); + + await proxy.ReentrantCall(new ReentrantCallOptions(){ CallsRemaining = NumCalls, }); + var records = new List(); + for (int i = 0; i < NumCalls; i++) + { + var state = await proxy.GetState(i); + records.AddRange(state.Records); + } + + var enterRecords = records.FindAll(record => record.IsEnter); + var exitRecords = records.FindAll(record => !record.IsEnter); + + this.Output.WriteLine($"Got {records.Count} records."); + Assert.True(records.Count == NumCalls * 2); + for (int i = 0; i < NumCalls; i++) + { + for (int j = 0; j < NumCalls; j++) + { + // Assert all the enters happen before the exits. + Assert.True(enterRecords[i].Timestamp < exitRecords[j].Timestamp); + } + } + } + } } diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs index 12d6c7365..676f43b26 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs @@ -112,5 +112,97 @@ public async Task ActorSuccessfullyClearsStateAfterErrorWithoutRemoting() var resp = await proxy.InvokeMethodAsync("GetState", key); Assert.Equal("Real value", resp); } + + [Fact] + public async Task ActorSuccessfullyClearsStateAfterErrorWithRemotingGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "Regression762Actor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + var key = Guid.NewGuid().ToString(); + var throwingCall = new StateCall + { + Key = key, + Value = "Throw value", + Operation = "ThrowException" + }; + + var setCall = new StateCall() + { + Key = key, + Value = "Real value", + Operation = "SetState" + }; + + var savingCall = new StateCall() + { + Operation = "SaveState" + }; + + // We attempt to delete it on the unlikely chance it's already there. + await proxy.RemoveState(throwingCall.Key); + + // Initiate a call that will set the state, then throw. + await Assert.ThrowsAsync(async () => await proxy.SaveState(throwingCall)); + + // Save the state and assert that the old value was not persisted. + await proxy.SaveState(savingCall); + var errorResp = await proxy.GetState(key); + Assert.Equal(string.Empty, errorResp); + + // Persist normally and ensure it works. + await proxy.SaveState(setCall); + var resp = await proxy.GetState(key); + Assert.Equal("Real value", resp); + } + + [Fact] + public async Task ActorSuccessfullyClearsStateAfterErrorWithoutRemotingGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var pingProxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "Regression762Actor"); + var proxy = this.ProxyFactoryGrpc.Create(ActorId.CreateRandom(), "Regression762Actor"); + + await WaitForActorRuntimeAsync(pingProxy, cts.Token); + + var key = Guid.NewGuid().ToString(); + var throwingCall = new StateCall + { + Key = key, + Value = "Throw value", + Operation = "ThrowException" + }; + + var setCall = new StateCall() + { + Key = key, + Value = "Real value", + Operation = "SetState" + }; + + var savingCall = new StateCall() + { + Operation = "SaveState" + }; + + // We attempt to delete it on the unlikely chance it's already there. + await proxy.InvokeMethodAsync("RemoveState", throwingCall.Key); + + // Initiate a call that will set the state, then throw. + await Assert.ThrowsAsync(async () => await proxy.InvokeMethodAsync("SaveState", throwingCall)); + + // Save the state and assert that the old value was not persisted. + await proxy.InvokeMethodAsync("SaveState", savingCall); + var errorResp = await proxy.InvokeMethodAsync("GetState", key); + Assert.Equal(string.Empty, errorResp); + + // Persist normally and ensure it works. + await proxy.InvokeMethodAsync("SaveState", setCall); + var resp = await proxy.InvokeMethodAsync("GetState", key); + Assert.Equal("Real value", resp); + } + } } diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs index 626de8c9f..ba4c5c841 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs @@ -70,5 +70,56 @@ public async Task ActorCanStartReminderWithTtl() Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Reminder may not have triggered."); Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Reminder triggered too recently. {DateTime.Now} - {state.Timestamp}"); } + + [Fact] + public async Task ActorCanStartAndStopReminderGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ReminderActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // Start reminder, to count up to 10 + await proxy.StartReminder(new StartReminderOptions(){ Total = 10, }); + + State state; + while (true) + { + cts.Token.ThrowIfCancellationRequested(); + + state = await proxy.GetState(); + this.Output.WriteLine($"Got Count: {state.Count} IsReminderRunning: {state.IsReminderRunning}"); + if (!state.IsReminderRunning) + { + break; + } + } + + // Should count up to exactly 10 + Assert.Equal(10, state.Count); + } + + [Fact] + public async Task ActorCanStartReminderWithTtlGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ReminderActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // Reminder that should fire 3 times (at 0, 1, and 2 seconds) + await proxy.StartReminderWithTtl(TimeSpan.FromSeconds(2)); + + // Record the start time and wait for longer than the reminder should exist for. + var start = DateTime.Now; + await Task.Delay(TimeSpan.FromSeconds(5)); + + var state = await proxy.GetState(); + + // Make sure the reminder has fired and that it didn't fire within the past second since it should have expired. + Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Reminder may not have triggered."); + Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Reminder triggered too recently. {DateTime.Now} - {state.Timestamp}"); + } + } } diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs index d9aba3863..fafc79ced 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs @@ -70,5 +70,55 @@ public async Task ActorCanStartTimerWithTtl() Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Timer may not have fired."); Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Timer fired too recently. {DateTime.Now} - {state.Timestamp}"); } + + [Fact] + public async Task ActorCanStartAndStopTimerGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "TimerActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // Start timer, to count up to 10 + await proxy.StartTimer(new StartTimerOptions(){ Total = 10, }); + + State state; + while (true) + { + cts.Token.ThrowIfCancellationRequested(); + + state = await proxy.GetState(); + this.Output.WriteLine($"Got Count: {state.Count} IsTimerRunning: {state.IsTimerRunning}"); + if (!state.IsTimerRunning) + { + break; + } + } + + // Should count up to exactly 10 + Assert.Equal(10, state.Count); + } + + [Fact] + public async Task ActorCanStartTimerWithTtlGrpc() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "TimerActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // Reminder that should fire 3 times (at 0, 1, and 2 seconds) + await proxy.StartTimerWithTtl(TimeSpan.FromSeconds(2)); + + // Record the start time and wait for longer than the reminder should exist for. + var start = DateTime.Now; + await Task.Delay(TimeSpan.FromSeconds(5)); + + var state = await proxy.GetState(); + + // Make sure the reminder has fired and that it didn't fire within the past second since it should have expired. + Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Timer may not have fired."); + Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Timer fired too recently. {DateTime.Now} - {state.Timestamp}"); + } } } diff --git a/test/Dapr.E2E.Test/E2ETests.cs b/test/Dapr.E2E.Test/E2ETests.cs index bf2d1e335..69d016c05 100644 --- a/test/Dapr.E2E.Test/E2ETests.cs +++ b/test/Dapr.E2E.Test/E2ETests.cs @@ -29,6 +29,7 @@ namespace Dapr.E2E.Test public partial class E2ETests : IClassFixture, IAsyncLifetime { private readonly Lazy proxyFactory; + private readonly Lazy proxyFactoryGrpc; private readonly DaprTestAppFixture fixture; private DaprTestAppFixture.State state; @@ -42,6 +43,12 @@ public E2ETests(ITestOutputHelper output, DaprTestAppFixture fixture) Debug.Assert(this.HttpEndpoint != null); return new ActorProxyFactory(new ActorProxyOptions(){ HttpEndpoint = this.HttpEndpoint, }); }); + + this.proxyFactoryGrpc = new Lazy(() => + { + Debug.Assert(this.GrpcEndpoint != null); + return new ActorProxyFactory(new ActorProxyOptions(){ GrpcEndpoint = this.GrpcEndpoint, UseGrpc = true, }); + }); } protected ITestOutputHelper Output { get; } @@ -61,6 +68,8 @@ public E2ETests(ITestOutputHelper output, DaprTestAppFixture fixture) public IActorProxyFactory ProxyFactory => this.HttpEndpoint == null ? null : this.proxyFactory.Value; + public IActorProxyFactory ProxyFactoryGrpc => this.GrpcEndpoint == null ? null : this.proxyFactoryGrpc.Value; + public async Task InitializeAsync() { this.state = await this.fixture.StartAsync(this.Output, this.Configuration); From 5dff37a193822d2b6583ed9eda55598d6c8eea8e Mon Sep 17 00:00:00 2001 From: addjuarez <6789375+addjuarez@users.noreply.github.com> Date: Mon, 2 Jan 2023 23:09:10 +0000 Subject: [PATCH 14/17] Fix exception test Signed-off-by: addjuarez <6789375+addjuarez@users.noreply.github.com> --- test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index f1bb71a67..69071b326 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -45,7 +45,6 @@ public async Task ActorCanProvideExceptionDetailsGrpc() await WaitForActorRuntimeAsync(proxy, cts.Token); ActorMethodInvocationException ex = await Assert.ThrowsAsync(async () => await proxy.ExceptionExample()); Assert.Contains("Remote Actor Method Exception", ex.Message); - Assert.Contains("ExceptionExample", ex.Message); Assert.Contains("32", ex.Message); } From 611c4e4cc6ef72f66c249ed15a829f54e062881d Mon Sep 17 00:00:00 2001 From: addjuarez <6789375+addjuarez@users.noreply.github.com> Date: Mon, 2 Jan 2023 23:44:05 +0000 Subject: [PATCH 15/17] Fix exception test Signed-off-by: addjuarez <6789375+addjuarez@users.noreply.github.com> --- test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs index 69071b326..ee58800f5 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs @@ -45,7 +45,6 @@ public async Task ActorCanProvideExceptionDetailsGrpc() await WaitForActorRuntimeAsync(proxy, cts.Token); ActorMethodInvocationException ex = await Assert.ThrowsAsync(async () => await proxy.ExceptionExample()); Assert.Contains("Remote Actor Method Exception", ex.Message); - Assert.Contains("32", ex.Message); } } From 62fcf04e819dffa591c9d17629251367c4f47d6a Mon Sep 17 00:00:00 2001 From: addjuarez Date: Mon, 9 Jan 2023 12:51:46 -0600 Subject: [PATCH 16/17] refactor Signed-off-by: addjuarez --- src/Dapr.Actors/DaprGrpcInteractor.cs | 67 ++++++++- src/Dapr.Actors/DaprHttpInteractor.cs | 86 +++++++++++- src/Dapr.Actors/IDaprInteractor.cs | 6 +- src/Dapr.Actors/Runtime/DaprStateProvider.cs | 135 +------------------ 4 files changed, 154 insertions(+), 140 deletions(-) diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs index 78b81db37..02857e915 100644 --- a/src/Dapr.Actors/DaprGrpcInteractor.cs +++ b/src/Dapr.Actors/DaprGrpcInteractor.cs @@ -29,6 +29,7 @@ namespace Dapr.Actors using Google.Protobuf; using Dapr.Actors.Runtime; using Grpc.Net.Client; + using Google.Protobuf.WellKnownTypes; /// /// Class to interact with Dapr runtime over grpc. @@ -96,9 +97,51 @@ public async Task SaveStateTransactionallyAsync(string actorType, string actorId } } - public Task DoStateChangesTransactionallyAsync(DaprStateProvider provider, string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + public async Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, IActorStateSerializer actorStateSerializer, JsonSerializerOptions jsonSerializerOptions, CancellationToken cancellationToken = default) { - return provider.DoStateChangesTransactionallyAsyncGrpc(actorType, actorId, stateChanges, cancellationToken); + List grpcOps = new List(); + + foreach (var stateChange in stateChanges) + { + var operation = this.GetDaprStateOperation(stateChange.ChangeKind); + var op = new Autogenerated.TransactionalActorStateOperation() + { + OperationType = operation, + }; + + switch (stateChange.ChangeKind) + { + case StateChangeKind.Remove: + break; + case StateChangeKind.Add: + case StateChangeKind.Update: + op.Key = stateChange.StateName; + if (actorStateSerializer != null) + { + + var buffer = ByteString.CopyFrom(actorStateSerializer.Serialize(stateChange.Type, stateChange.Value)); + op.Value = new Any() + { + Value = buffer, + }; + } + else + { + var buffer = ByteString.CopyFrom(JsonSerializer.SerializeToUtf8Bytes(stateChange.Value, stateChange.Type, jsonSerializerOptions)); + op.Value = new Any() + { + Value = buffer, + }; + + } + break; + default: + break; + } + grpcOps.Add(op); + } + + await this.SaveStateTransactionallyAsync(actorType, actorId, null, cancellationToken, grpcOps); } public async Task InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default) @@ -360,5 +403,25 @@ private CallOptions CreateCallOptions(CancellationToken cancellationToken) return options; } + private string GetDaprStateOperation(StateChangeKind changeKind) + { + var operation = string.Empty; + + switch (changeKind) + { + case StateChangeKind.Remove: + operation = "delete"; + break; + case StateChangeKind.Add: + case StateChangeKind.Update: + operation = "upsert"; + break; + default: + break; + } + + return operation; + } + } } diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index 128ca80ca..9530e2ab1 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -95,9 +95,72 @@ HttpRequestMessage RequestFunc() return this.SendAsync(RequestFunc, relativeUrl, cancellationToken); } - public Task DoStateChangesTransactionallyAsync(DaprStateProvider provider, string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) + public async Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, IActorStateSerializer actorStateSerializer, JsonSerializerOptions jsonSerializerOptions, CancellationToken cancellationToken = default) { - return provider.DoStateChangesTransactionallyAsyncHttp(actorType, actorId, stateChanges, cancellationToken); + // Transactional state update request body: + /* + [ + { + "operation": "upsert", + "request": { + "key": "key1", + "value": "myData" + } + }, + { + "operation": "delete", + "request": { + "key": "key2" + } + } + ] + */ + using var stream = new MemoryStream(); + using var writer = new Utf8JsonWriter(stream); + writer.WriteStartArray(); + foreach (var stateChange in stateChanges) + { + writer.WriteStartObject(); + var operation = GetDaprStateOperation(stateChange.ChangeKind); + writer.WriteString("operation", operation); + + // write the requestProperty + writer.WritePropertyName("request"); + writer.WriteStartObject(); // start object for request property + switch (stateChange.ChangeKind) + { + case StateChangeKind.Remove: + writer.WriteString("key", stateChange.StateName); + break; + case StateChangeKind.Add: + case StateChangeKind.Update: + writer.WriteString("key", stateChange.StateName); + + // perform default json serialization if custom serializer was not provided. + if (actorStateSerializer != null) + { + var buffer = actorStateSerializer.Serialize(stateChange.Type, stateChange.Value); + writer.WriteBase64String("value", buffer); + } + else + { + writer.WritePropertyName("value"); + JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions); + } + break; + default: + break; + } + + writer.WriteEndObject(); // end object for request property + writer.WriteEndObject(); + } + + writer.WriteEndArray(); + + await writer.FlushAsync(); + var content = Encoding.UTF8.GetString(stream.ToArray()); + await this.SaveStateTransactionallyAsync(actorType, actorId, content, cancellationToken); } public async Task InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default) @@ -479,6 +542,25 @@ private HttpClient CreateHttpClient() { return new HttpClient(this.handler, false); } + private string GetDaprStateOperation(StateChangeKind changeKind) + { + var operation = string.Empty; + + switch (changeKind) + { + case StateChangeKind.Remove: + operation = "delete"; + break; + case StateChangeKind.Add: + case StateChangeKind.Update: + operation = "upsert"; + break; + default: + break; + } + + return operation; + } private void AddDaprApiTokenHeader(HttpRequestMessage request) { diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs index f88309dad..9b09efa72 100644 --- a/src/Dapr.Actors/IDaprInteractor.cs +++ b/src/Dapr.Actors/IDaprInteractor.cs @@ -20,6 +20,7 @@ namespace Dapr.Actors using Autogenerated = Dapr.Client.Autogen.Grpc.v1; using System.Collections.Generic; using Dapr.Actors.Runtime; + using System.Text.Json; /// /// Interface for interacting with Dapr runtime. @@ -102,13 +103,14 @@ internal interface IDaprInteractor /// /// Parses state changes. /// - /// Type of actor. /// Type of actor. /// ActorId. /// StateChanges. + /// StateChanges. + /// StateChanges. /// Cancels the operation. /// A representing the result of the asynchronous operation. - Task DoStateChangesTransactionallyAsync(DaprStateProvider provider, string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default); + Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, IActorStateSerializer actorStateSerializer, JsonSerializerOptions jsonSerializerOptions, CancellationToken cancellationToken = default); /// /// Unegisters a timer. diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs index 0f93c71a4..3f62f604f 100644 --- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs +++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs @@ -80,141 +80,8 @@ public async Task ContainsStateAsync(string actorType, string actorId, str public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) { - await this.daprInteractor.DoStateChangesTransactionallyAsync(this, actorType, actorId, stateChanges, cancellationToken); + await this.daprInteractor.DoStateChangesTransactionallyAsync(actorType, actorId, stateChanges, this.actorStateSerializer, this.jsonSerializerOptions, cancellationToken); } - public async Task DoStateChangesTransactionallyAsyncGrpc(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) - { - List grpcOps = new List(); - - foreach (var stateChange in stateChanges) - { - var operation = this.GetDaprStateOperation(stateChange.ChangeKind); - var op = new Autogenerated.TransactionalActorStateOperation() - { - OperationType = operation, - }; - - switch (stateChange.ChangeKind) - { - case StateChangeKind.Remove: - break; - case StateChangeKind.Add: - case StateChangeKind.Update: - op.Key = stateChange.StateName; - if (this.actorStateSerializer != null) - { - - var buffer = ByteString.CopyFrom(this.actorStateSerializer.Serialize(stateChange.Type, stateChange.Value)); - op.Value = new Any() - { - Value = buffer, - }; - } - else - { - var buffer = ByteString.CopyFrom(JsonSerializer.SerializeToUtf8Bytes(stateChange.Value, stateChange.Type, jsonSerializerOptions)); - op.Value = new Any() - { - Value = buffer, - }; - - } - break; - default: - break; - } - grpcOps.Add(op); - } - await this.daprInteractor.SaveStateTransactionallyAsync(actorType, actorId, null, cancellationToken, grpcOps); - } - - public async Task DoStateChangesTransactionallyAsyncHttp(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default) - { - // Transactional state update request body: - /* - [ - { - "operation": "upsert", - "request": { - "key": "key1", - "value": "myData" - } - }, - { - "operation": "delete", - "request": { - "key": "key2" - } - } - ] - */ - using var stream = new MemoryStream(); - using var writer = new Utf8JsonWriter(stream); - writer.WriteStartArray(); - foreach (var stateChange in stateChanges) - { - writer.WriteStartObject(); - var operation = this.GetDaprStateOperation(stateChange.ChangeKind); - writer.WriteString("operation", operation); - - // write the requestProperty - writer.WritePropertyName("request"); - writer.WriteStartObject(); // start object for request property - switch (stateChange.ChangeKind) - { - case StateChangeKind.Remove: - writer.WriteString("key", stateChange.StateName); - break; - case StateChangeKind.Add: - case StateChangeKind.Update: - writer.WriteString("key", stateChange.StateName); - - // perform default json serialization if custom serializer was not provided. - if (this.actorStateSerializer != null) - { - var buffer = this.actorStateSerializer.Serialize(stateChange.Type, stateChange.Value); - writer.WriteBase64String("value", buffer); - } - else - { - writer.WritePropertyName("value"); - JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions); - } - break; - default: - break; - } - - writer.WriteEndObject(); // end object for request property - writer.WriteEndObject(); - } - - writer.WriteEndArray(); - - await writer.FlushAsync(); - var content = Encoding.UTF8.GetString(stream.ToArray()); - await this.daprInteractor.SaveStateTransactionallyAsync(actorType, actorId, content, cancellationToken, null); - } - - private string GetDaprStateOperation(StateChangeKind changeKind) - { - var operation = string.Empty; - - switch (changeKind) - { - case StateChangeKind.Remove: - operation = "delete"; - break; - case StateChangeKind.Add: - case StateChangeKind.Update: - operation = "upsert"; - break; - default: - break; - } - - return operation; - } } } From 09f4fd0eef81c495b31e4e9a4264992d361edd51 Mon Sep 17 00:00:00 2001 From: addjuarez Date: Mon, 9 Jan 2023 13:03:32 -0600 Subject: [PATCH 17/17] rerun Signed-off-by: addjuarez --- src/Dapr.Actors/DaprGrpcInteractor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs index 02857e915..bd3799722 100644 --- a/src/Dapr.Actors/DaprGrpcInteractor.cs +++ b/src/Dapr.Actors/DaprGrpcInteractor.cs @@ -1,5 +1,5 @@ // ------------------------------------------------------------------------ -// Copyright 2022 The Dapr Authors +// Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at