diff --git a/Directory.Packages.props b/Directory.Packages.props
index 3c1459b5d..99ea0f2cc 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -27,6 +27,7 @@
+
diff --git a/all.sln b/all.sln
index 1dd0ab3c5..02d9afcc4 100644
--- a/all.sln
+++ b/all.sln
@@ -119,6 +119,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common", "src\Dapr.Com
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -303,6 +309,18 @@ Global
{CDB47863-BEBD-4841-A807-46D868962521}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.Build.0 = Release|Any CPU
+ {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU
+ {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -359,6 +377,9 @@ Global
{DFBABB04-50E9-42F6-B470-310E1B545638} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B}
+ {4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B}
+ {0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
+ {290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40}
diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs
new file mode 100644
index 000000000..2eac0c919
--- /dev/null
+++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/Program.cs
@@ -0,0 +1,31 @@
+using System.Text;
+using Dapr.Messaging.PublishSubscribe;
+
+var daprMessagingClientBuilder = new DaprPublishSubscribeClientBuilder(null);
+var daprMessagingClient = daprMessagingClientBuilder.Build();
+
+//Process each message returned from the subscription
+Task HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
+{
+ try
+ {
+ //Do something with the message
+ Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
+ return Task.FromResult(TopicResponseAction.Success);
+ }
+ catch
+ {
+ return Task.FromResult(TopicResponseAction.Retry);
+ }
+}
+
+//Create a dynamic streaming subscription and subscribe with a timeout of 20 seconds and 10 seconds for message handling
+var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(20));
+var subscription = await daprMessagingClient.SubscribeAsync("pubsub", "myTopic",
+ new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
+ HandleMessageAsync, cancellationTokenSource.Token);
+
+await Task.Delay(TimeSpan.FromMinutes(1));
+
+//When you're done with the subscription, simply dispose of it
+await subscription.DisposeAsync();
diff --git a/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj
new file mode 100644
index 000000000..c73345b40
--- /dev/null
+++ b/examples/Client/PublishSubscribe/StreamingSubscriptionExample/StreamingSubscriptionExample.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ net6.0
+ enable
+ enable
+
+
+
+
+
+
+
diff --git a/src/Dapr.Common/DaprGenericClientBuilder.cs b/src/Dapr.Common/DaprGenericClientBuilder.cs
new file mode 100644
index 000000000..f88b810bc
--- /dev/null
+++ b/src/Dapr.Common/DaprGenericClientBuilder.cs
@@ -0,0 +1,214 @@
+using System.Text.Json;
+using Grpc.Net.Client;
+using Microsoft.Extensions.Configuration;
+
+namespace Dapr.Common;
+
+///
+/// Builder for building a generic Dapr client.
+///
+public abstract class DaprGenericClientBuilder where TClientBuilder : class
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ protected DaprGenericClientBuilder(IConfiguration? configuration = null)
+ {
+ this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint();
+ this.HttpEndpoint = DaprDefaults.GetDefaultHttpEndpoint();
+
+ this.GrpcChannelOptions = new GrpcChannelOptions()
+ {
+ // The gRPC client doesn't throw the right exception for cancellation
+ // by default, this switches that behavior on.
+ ThrowOperationCanceledOnCancellation = true,
+ };
+
+ this.JsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);
+ this.DaprApiToken = DaprDefaults.GetDefaultDaprApiToken(configuration);
+ }
+
+ ///
+ /// Property exposed for testing purposes.
+ ///
+ internal string GrpcEndpoint { get; private set; }
+
+ ///
+ /// Property exposed for testing purposes.
+ ///
+ internal string HttpEndpoint { get; private set; }
+
+ ///
+ /// Property exposed for testing purposes.
+ ///
+ internal Func? HttpClientFactory { get; set; }
+
+ ///
+ /// Property exposed for testing purposes.
+ ///
+ public JsonSerializerOptions JsonSerializerOptions { get; private set; }
+
+ ///
+ /// Property exposed for testing purposes.
+ ///
+ internal GrpcChannelOptions GrpcChannelOptions { get; private set; }
+
+ ///
+ /// Property exposed for testing purposes.
+ ///
+ public string DaprApiToken { get; private set; }
+
+ ///
+ /// Property exposed for testing purposes.
+ ///
+ internal TimeSpan Timeout { get; private set; }
+
+ ///
+ /// Overrides the HTTP endpoint used by the Dapr client for communicating with the Dapr runtime.
+ ///
+ ///
+ /// The URI endpoint to use for HTTP calls to the Dapr runtime. The default value will be
+ /// DAPR_HTTP_ENDPOINT first, or http://127.0.0.1:DAPR_HTTP_PORT as fallback
+ /// where DAPR_HTTP_ENDPOINT and DAPR_HTTP_PORT represents the value of the
+ /// corresponding environment variables.
+ ///
+ /// The instance.
+ public DaprGenericClientBuilder UseHttpEndpoint(string httpEndpoint)
+ {
+ ArgumentVerifier.ThrowIfNullOrEmpty(httpEndpoint, nameof(httpEndpoint));
+ this.HttpEndpoint = httpEndpoint;
+ return this;
+ }
+
+ ///
+ /// Exposed internally for testing purposes.
+ ///
+ internal DaprGenericClientBuilder UseHttpClientFactory(Func factory)
+ {
+ this.HttpClientFactory = factory;
+ return this;
+ }
+
+ ///
+ /// Overrides the legacy mechanism for building an HttpClient and uses the new
+ /// introduced in .NET Core 2.1.
+ ///
+ /// The factory used to create instances.
+ ///
+ public DaprGenericClientBuilder UseHttpClientFactory(IHttpClientFactory httpClientFactory)
+ {
+ this.HttpClientFactory = httpClientFactory.CreateClient;
+ return this;
+ }
+
+ ///
+ /// Overrides the gRPC endpoint used by the Dapr client for communicating with the Dapr runtime.
+ ///
+ ///
+ /// The URI endpoint to use for gRPC calls to the Dapr runtime. The default value will be
+ /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the
+ /// DAPR_GRPC_PORT environment variable.
+ ///
+ /// The instance.
+ public DaprGenericClientBuilder UseGrpcEndpoint(string grpcEndpoint)
+ {
+ ArgumentVerifier.ThrowIfNullOrEmpty(grpcEndpoint, nameof(grpcEndpoint));
+ this.GrpcEndpoint = grpcEndpoint;
+ return this;
+ }
+
+ ///
+ ///
+ /// Uses the specified when serializing or deserializing using .
+ ///
+ ///
+ /// The default value is created using .
+ ///
+ ///
+ /// Json serialization options.
+ /// The instance.
+ public DaprGenericClientBuilder UseJsonSerializationOptions(JsonSerializerOptions options)
+ {
+ this.JsonSerializerOptions = options;
+ return this;
+ }
+
+ ///
+ /// Uses the provided for creating the .
+ ///
+ /// The to use for creating the .
+ /// The instance.
+ public DaprGenericClientBuilder UseGrpcChannelOptions(GrpcChannelOptions grpcChannelOptions)
+ {
+ this.GrpcChannelOptions = grpcChannelOptions;
+ return this;
+ }
+
+ ///
+ /// Adds the provided on every request to the Dapr runtime.
+ ///
+ /// The token to be added to the request headers/>.
+ /// The instance.
+ public DaprGenericClientBuilder UseDaprApiToken(string apiToken)
+ {
+ this.DaprApiToken = apiToken;
+ return this;
+ }
+
+ ///
+ /// Sets the timeout for the HTTP client used by the Dapr client.
+ ///
+ ///
+ ///
+ public DaprGenericClientBuilder UseTimeout(TimeSpan timeout)
+ {
+ this.Timeout = timeout;
+ return this;
+ }
+
+ ///
+ /// Builds out the inner DaprClient that provides the core shape of the
+ /// runtime gRPC client used by the consuming package.
+ ///
+ ///
+ protected (GrpcChannel channel, HttpClient httpClient, Uri httpEndpoint) BuildDaprClientDependencies()
+ {
+ var grpcEndpoint = new Uri(this.GrpcEndpoint);
+ if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https")
+ {
+ throw new InvalidOperationException("The gRPC endpoint must use http or https.");
+ }
+
+ if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp))
+ {
+ // Set correct switch to make secure gRPC service calls. This switch must be set before creating the GrpcChannel.
+ AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
+ }
+
+ var httpEndpoint = new Uri(this.HttpEndpoint);
+ if (httpEndpoint.Scheme != "http" && httpEndpoint.Scheme != "https")
+ {
+ throw new InvalidOperationException("The HTTP endpoint must use http or https.");
+ }
+
+ var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions);
+
+ var httpClient = HttpClientFactory is not null ? HttpClientFactory() : new HttpClient();
+ if (this.Timeout > TimeSpan.Zero)
+ {
+ httpClient.Timeout = this.Timeout;
+ }
+
+ return (channel, httpClient, httpEndpoint);
+ }
+
+ ///
+ /// Builds the client instance from the properties of the builder.
+ ///
+ /// The Dapr client instance.
+ ///
+ /// Builds the client instance from the properties of the builder.
+ ///
+ public abstract TClientBuilder Build();
+}
+
diff --git a/src/Dapr.Messaging/Dapr.Messaging.csproj b/src/Dapr.Messaging/Dapr.Messaging.csproj
new file mode 100644
index 000000000..0c7f159a4
--- /dev/null
+++ b/src/Dapr.Messaging/Dapr.Messaging.csproj
@@ -0,0 +1,22 @@
+
+
+
+ This package contains the reference assemblies for developing messaging services using Dapr.
+ enable
+ enable
+ Dapr.Messaging
+ Dapr Messaging SDK
+ Dapr Messaging SDK for building applications that utilize messaging components.
+ alpha
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs
new file mode 100644
index 000000000..5d933a860
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs
@@ -0,0 +1,31 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// The base implementation of a Dapr pub/sub client.
+///
+public abstract class DaprPublishSubscribeClient
+{
+ ///
+ /// Dynamically subscribes to a Publish/Subscribe component and topic.
+ ///
+ /// The name of the Publish/Subscribe component.
+ /// The name of the topic to subscribe to.
+ /// Configuration options.
+ /// The delegate reflecting the action to take upon messages received by the subscription.
+ /// Cancellation token.
+ ///
+ public abstract Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken);
+}
diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs
new file mode 100644
index 000000000..b94bc5cdf
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClientBuilder.cs
@@ -0,0 +1,47 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using Dapr.Common;
+using Microsoft.Extensions.Configuration;
+using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// Builds a .
+///
+public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder
+{
+ ///
+ /// Used to initialize a new instance of the .
+ ///
+ /// An optional instance of .
+ public DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : base(configuration)
+ {
+ }
+
+ ///
+ /// Builds the client instance from the properties of the builder.
+ ///
+ /// The Dapr client instance.
+ ///
+ /// Builds the client instance from the properties of the builder.
+ ///
+ public override DaprPublishSubscribeClient Build()
+ {
+ var daprClientDependencies = BuildDaprClientDependencies();
+ var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel);
+
+ return new DaprPublishSubscribeGrpcClient(client);
+ }
+}
diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs
new file mode 100644
index 000000000..d935d250b
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs
@@ -0,0 +1,49 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using P = Dapr.Client.Autogen.Grpc.v1.Dapr;
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// A client for interacting with the Dapr endpoints.
+///
+internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient
+{
+ private readonly P.DaprClient daprClient;
+
+ ///
+ /// Creates a new instance of a
+ ///
+ public DaprPublishSubscribeGrpcClient(P.DaprClient client)
+ {
+ daprClient = client;
+ }
+
+ ///
+ /// Dynamically subscribes to a Publish/Subscribe component and topic.
+ ///
+ /// The name of the Publish/Subscribe component.
+ /// The name of the topic to subscribe to.
+ /// Configuration options.
+ /// The delegate reflecting the action to take upon messages received by the subscription.
+ /// Cancellation token.
+ ///
+ public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken)
+ {
+ var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient);
+ await receiver.SubscribeAsync(cancellationToken);
+ return receiver;
+ }
+}
+
diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs
new file mode 100644
index 000000000..73838b605
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs
@@ -0,0 +1,44 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// Options used to configure the dynamic Dapr subscription.
+///
+/// Describes the policy to take on messages that have not been acknowledged within the timeout period.
+public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandlingPolicy)
+{
+ ///
+ /// Subscription metadata.
+ ///
+ public IReadOnlyDictionary Metadata { get; init; } = new Dictionary();
+
+ ///
+ /// The optional name of the dead-letter topic to send unprocessed messages to.
+ ///
+ public string? DeadLetterTopic { get; init; }
+
+ ///
+ /// If populated, this reflects the maximum number of messages that can be queued for processing on the replica. By default,
+ /// no maximum boundary is enforced.
+ ///
+ public int? MaximumQueuedMessages { get; init; }
+
+ ///
+ /// The maximum amount of time to take to dispose of acknowledgement messages after the cancellation token has
+ /// been signaled.
+ ///
+ public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30);
+}
+
diff --git a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs
new file mode 100644
index 000000000..ae09ddb48
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs
@@ -0,0 +1,55 @@
+// using Microsoft.Extensions.DependencyInjection;
+//
+// namespace System.Runtime.CompilerServices.PublishSubscribe.Extensions;
+//
+// ///
+// /// Contains extension methods for using Dapr Publish/Subscribe with dependency injection.
+// ///
+// public static class PublishSubscribeServiceCollectionExtensions
+// {
+// ///
+// /// Adds Dapr Publish/Subscribe support to the service collection.
+// ///
+// /// The .
+// ///
+// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services) =>
+// AddDaprPubSubClient(services, (_, _) => { });
+//
+// ///
+// /// Adds Dapr Publish/Subscribe support to the service collection.
+// ///
+// /// The .
+// /// Optionally allows greater configuration of the .
+// ///
+// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services,
+// Action? configure) =>
+// services.AddDaprPubSubClient((_, builder) => configure?.Invoke(builder));
+//
+// ///
+// /// Adds Dapr Publish/Subscribe support to the service collection.
+// ///
+// /// The .
+// /// Optionally allows greater configuration of the using injected services.
+// ///
+// public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, Action? configure)
+// {
+// ArgumentNullException.ThrowIfNull(services, nameof(services));
+//
+// //Register the IHttpClientFactory implementation
+// services.AddHttpClient();
+//
+// services.TryAddSingleton(serviceProvider =>
+// {
+// var httpClientFactory = serviceProvider.GetRequiredService();
+//
+// var builder = new DaprPublishSubscribeClientBuilder();
+// builder.UseHttpClientFactory(httpClientFactory);
+//
+// configure?.Invoke(serviceProvider, builder);
+//
+// return builder.Build();
+// });
+//
+// return services;
+// }
+// }
diff --git a/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs
new file mode 100644
index 000000000..de6882095
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs
@@ -0,0 +1,23 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// Defines the policy for handling streaming message subscriptions, including retry logic and timeout settings.
+///
+/// The duration to wait before timing out a message handling operation.
+/// The default action to take when a message handling operation times out.
+public sealed record MessageHandlingPolicy(TimeSpan TimeoutDuration, TopicResponseAction DefaultResponseAction);
+
diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
new file mode 100644
index 000000000..be80809f8
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
@@ -0,0 +1,313 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using System.Threading.Channels;
+using Dapr.AppCallback.Autogen.Grpc.v1;
+using Grpc.Core;
+using P = Dapr.Client.Autogen.Grpc.v1;
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// A thread-safe implementation of a receiver for messages from a specified Dapr publish/subscribe component and
+/// topic.
+///
+internal sealed class PublishSubscribeReceiver : IAsyncDisposable
+{
+ ///
+ /// Provides options for the unbounded channel.
+ ///
+ private readonly static UnboundedChannelOptions UnboundedChannelOptions = new()
+ {
+ SingleWriter = true, SingleReader = true
+ };
+
+ ///
+ /// The name of the Dapr publish/subscribe component.
+ ///
+ private readonly string pubSubName;
+ ///
+ /// The name of the topic to subscribe to.
+ ///
+ private readonly string topicName;
+ ///
+ /// Options allowing the behavior of the receiver to be configured.
+ ///
+ private readonly DaprSubscriptionOptions options;
+ ///
+ /// A channel used to decouple the messages received from the sidecar to their consumption.
+ ///
+ private readonly Channel topicMessagesChannel;
+ ///
+ /// Maintains the various acknowledgements for each message.
+ ///
+ private readonly Channel acknowledgementsChannel = Channel.CreateUnbounded(UnboundedChannelOptions);
+ ///
+ /// The stream connection between this instance and the Dapr sidecar.
+ ///
+ private AsyncDuplexStreamingCall? clientStream;
+ ///
+ /// Used to ensure thread-safe operations against the stream.
+ ///
+ private readonly SemaphoreSlim semaphore = new(1, 1);
+ ///
+ /// The handler delegate responsible for processing the topic messages.
+ ///
+ private readonly TopicMessageHandler messageHandler;
+ ///
+ /// A reference to the DaprClient instance.
+ ///
+ private readonly P.Dapr.DaprClient client;
+ ///
+ /// Flag that prevents the developer from accidentally initializing the subscription more than once from the same receiver.
+ ///
+ private int hasInitialized;
+ ///
+ /// Flag that ensures the instance is only disposed a single time.
+ ///
+ private bool isDisposed;
+
+ ///
+ /// Constructs a new instance of a instance.
+ ///
+ /// The name of the Dapr Publish/Subscribe component.
+ /// The name of the topic to subscribe to.
+ /// Options allowing the behavior of the receiver to be configured.
+ /// The delegate reflecting the action to take upon messages received by the subscription.
+ /// A reference to the DaprClient instance.
+ internal PublishSubscribeReceiver(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler handler, P.Dapr.DaprClient client)
+ {
+ this.client = client;
+ this.pubSubName = pubSubName;
+ this.topicName = topicName;
+ this.options = options;
+ this.messageHandler = handler;
+ topicMessagesChannel = options.MaximumQueuedMessages is > 0
+ ? Channel.CreateBounded(new BoundedChannelOptions((int)options.MaximumQueuedMessages)
+ {
+ SingleReader = true, SingleWriter = true, FullMode = BoundedChannelFullMode.Wait
+ })
+ : Channel.CreateUnbounded(UnboundedChannelOptions);
+ }
+
+ ///
+ /// Dynamically subscribes to messages on a PubSub topic provided by the Dapr sidecar.
+ ///
+ /// Cancellation token.
+ /// An containing messages provided by the sidecar.
+ internal async Task SubscribeAsync(CancellationToken cancellationToken = default)
+ {
+ //Prevents the receiver from performing the subscribe operation more than once (as the multiple initialization messages would cancel the stream).
+ if (Interlocked.Exchange(ref hasInitialized, 1) == 1)
+ {
+ return;
+ }
+
+ var stream = await GetStreamAsync(cancellationToken);
+
+ //Retrieve the messages from the sidecar and write to the messages channel
+ var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken);
+
+ //Process the messages as they're written to either channel
+ var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(cancellationToken);
+ var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken);
+
+ try
+ {
+ await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask);
+ }
+ catch (OperationCanceledException)
+ {
+ // Will be cleaned up during DisposeAsync
+ }
+ }
+
+ ///
+ /// Retrieves or creates the bidirectional stream to the DaprClient for transacting pub/sub subscriptions.
+ ///
+ /// Cancellation token.
+ /// The stream connection.
+ private async Task> GetStreamAsync(CancellationToken cancellationToken)
+ {
+ await semaphore.WaitAsync(cancellationToken);
+
+ try
+ {
+ return clientStream ??= client.SubscribeTopicEventsAlpha1(cancellationToken: cancellationToken);
+ }
+ finally
+ {
+ semaphore.Release();
+ }
+ }
+
+ ///
+ /// Acknowledges the indicated message back to the Dapr sidecar with an indicated behavior to take on the message.
+ ///
+ /// The identifier of the message the behavior is in reference to.
+ /// The behavior to take on the message as indicated by either the message handler or timeout message handling configuration.
+ /// Cancellation token.
+ ///
+ private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction behavior, CancellationToken cancellationToken)
+ {
+ var action = behavior switch
+ {
+ TopicResponseAction.Success => TopicEventResponse.Types.TopicEventResponseStatus.Success,
+ TopicResponseAction.Retry => TopicEventResponse.Types.TopicEventResponseStatus.Retry,
+ TopicResponseAction.Drop => TopicEventResponse.Types.TopicEventResponseStatus.Drop,
+ _ => throw new InvalidOperationException(
+ $"Unrecognized topic acknowledgement action: {behavior}")
+ };
+
+ var acknowledgement = new TopicAcknowledgement(messageId, action);
+ await acknowledgementsChannel.Writer.WriteAsync(acknowledgement, cancellationToken);
+ }
+
+ ///
+ /// Processes each acknowledgement from the acknowledgement channel reader as it's populated.
+ ///
+ /// Cancellation token.
+ private async Task ProcessAcknowledgementChannelMessagesAsync(CancellationToken cancellationToken)
+ {
+ var messageStream = await GetStreamAsync(cancellationToken);
+ await foreach (var acknowledgement in acknowledgementsChannel.Reader.ReadAllAsync(cancellationToken))
+ {
+ await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1
+ {
+ EventProcessed = new P.SubscribeTopicEventsRequestProcessedAlpha1
+ {
+ Id = acknowledgement.MessageId,
+ Status = new TopicEventResponse { Status = acknowledgement.Action }
+ }
+ }, cancellationToken);
+ }
+ }
+
+ ///
+ /// Processes each topic messages from the channel as it's populated.
+ ///
+ /// Cancellation token.
+ private async Task ProcessTopicChannelMessagesAsync(CancellationToken cancellationToken)
+ {
+ await foreach (var message in topicMessagesChannel.Reader.ReadAllAsync(cancellationToken))
+ {
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ cts.CancelAfter(options.MessageHandlingPolicy.TimeoutDuration);
+
+ //Evaluate the message and return an acknowledgement result
+ var messageAction = await messageHandler(message, cts.Token);
+
+ try
+ {
+ //Share the result with the sidecar
+ await AcknowledgeMessageAsync(message.Id, messageAction, cancellationToken);
+ }
+ catch (OperationCanceledException)
+ {
+ //Acknowledge the message using the configured default response action
+ await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction, cancellationToken);
+ }
+ }
+ }
+
+ ///
+ /// Retrieves the subscription stream data from the Dapr sidecar.
+ ///
+ /// The stream connection to and from the Dapr sidecar instance.
+ /// The channel writer instance.
+ /// Cancellation token.
+ private async Task FetchDataFromSidecarAsync(
+ AsyncDuplexStreamingCall stream,
+ ChannelWriter channelWriter, CancellationToken cancellationToken)
+ {
+ //Build out the initial topic events request
+ var initialRequest = new P.SubscribeTopicEventsRequestInitialAlpha1()
+ {
+ PubsubName = pubSubName, DeadLetterTopic = options.DeadLetterTopic ?? string.Empty, Topic = topicName
+ };
+
+ if (options.Metadata.Count > 0)
+ {
+ foreach (var (key, value) in options.Metadata)
+ {
+ initialRequest.Metadata.Add(key, value);
+ }
+ }
+
+ //Send this request to the Dapr sidecar
+ await stream.RequestStream.WriteAsync(
+ new P.SubscribeTopicEventsRequestAlpha1 { InitialRequest = initialRequest }, cancellationToken);
+
+ //Each time a message is received from the stream, push it into the topic messages channel
+ await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
+ {
+ var message =
+ new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
+ response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
+ response.EventMessage.Topic, response.EventMessage.PubsubName)
+ {
+ Path = response.EventMessage.Path,
+ Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
+ };
+
+ try
+ {
+ await channelWriter.WaitToWriteAsync(cancellationToken);
+ await channelWriter.WriteAsync(message, cancellationToken);
+ }
+ catch (Exception)
+ {
+ // Handle being unable to write because the writer is completed due to an active DisposeAsync operation
+ }
+ }
+ }
+
+ ///
+ /// Disposes the various resources associated with the instance.
+ ///
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ if (isDisposed)
+ {
+ return;
+ }
+
+ isDisposed = true;
+
+ //Stop processing new events - we'll leave any messages yet unseen as unprocessed and
+ //Dapr will handle as necessary when they're not acknowledged
+ topicMessagesChannel.Writer.Complete();
+
+ //Flush the remaining acknowledgements, but start by marking the writer as complete so it doesn't receive any other messages either
+ acknowledgementsChannel.Writer.Complete();
+
+ try
+ {
+ //Process any remaining acknowledgements on the channel without exceeding the configured maximum clean up time
+ await acknowledgementsChannel.Reader.Completion.WaitAsync(options.MaximumCleanupTimeout);
+ }
+ catch (OperationCanceledException)
+ {
+ //Handled
+ }
+ }
+
+ ///
+ /// Reflects the action to take on a given message identifier.
+ ///
+ /// The identifier of the message.
+ /// The action to take on the message in the acknowledgement request.
+ private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
+}
+
diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs
new file mode 100644
index 000000000..402a89e9f
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessage.cs
@@ -0,0 +1,46 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using Google.Protobuf.WellKnownTypes;
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// A message retrieved from a Dapr publish/subscribe topic.
+///
+/// The unique identifier of the topic message.
+/// Identifies the context in which an event happened, such as the organization publishing the
+/// event or the process that produced the event. The exact syntax and semantics behind the data
+/// encoded in the URI is defined by the event producer.
+/// The type of event related to the originating occurrence.
+/// The spec version of the CloudEvents specification.
+/// The content type of the data.
+/// The name of the topic.
+/// The name of the Dapr publish/subscribe component.
+public sealed record TopicMessage(string Id, string Source, string Type, string SpecVersion, string DataContentType, string Topic, string PubSubName)
+{
+ ///
+ /// The content of the event.
+ ///
+ public ReadOnlyMemory Data { get; init; }
+
+ ///
+ /// The matching path from the topic subscription/routes (if specified) for this event.
+ ///
+ public string? Path { get; init; }
+
+ ///
+ /// A map of additional custom properties sent to the app. These are considered to be CloudEvent extensions.
+ ///
+ public IReadOnlyDictionary Extensions { get; init; } = new Dictionary();
+}
diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs
new file mode 100644
index 000000000..65b7abf01
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/TopicMessageHandler.cs
@@ -0,0 +1,23 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// The handler delegate responsible for processing the topic message.
+///
+/// The message request to process.
+/// Cancellation token.
+/// The acknowledgement behavior to report back to the pub/sub endpoint about the message.
+public delegate Task TopicMessageHandler(TopicMessage request,
+ CancellationToken cancellationToken = default);
diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs
new file mode 100644
index 000000000..5a34f4cc2
--- /dev/null
+++ b/src/Dapr.Messaging/PublishSubscribe/TopicResponseAction.cs
@@ -0,0 +1,34 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr.Messaging.PublishSubscribe;
+
+///
+/// Describes the various actions that can be taken on a topic message.
+///
+public enum TopicResponseAction
+{
+ ///
+ /// Indicates the message was processed successfully and should be deleted from the pub/sub topic.
+ ///
+ Success,
+ ///
+ /// Indicates a failure while processing the message and that the message should be resent for a retry.
+ ///
+ Retry,
+ ///
+ /// Indicates a failure while processing the message and that the message should be dropped or sent to the
+ /// dead-letter topic if specified.
+ ///
+ Drop
+}
diff --git a/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs b/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs
new file mode 100644
index 000000000..d28b40058
--- /dev/null
+++ b/test/Dapr.Common.Test/DaprGenericClientBuilderTest.cs
@@ -0,0 +1,94 @@
+using System;
+using System.Text.Json;
+using Xunit;
+
+namespace Dapr.Common.Test;
+
+public class DaprGenericClientBuilderTest
+{
+ [Fact]
+ public void DaprGenericClientBuilder_ShouldUpdateHttpEndpoint_WhenHttpEndpointIsProvided()
+ {
+ // Arrange
+ var builder = new SampleDaprGenericClientBuilder();
+ const string endpointValue = "http://sample-endpoint";
+
+ // Act
+ builder.UseHttpEndpoint(endpointValue);
+
+ // Assert
+ Assert.Equal(endpointValue, builder.HttpEndpoint);
+ }
+
+ [Fact]
+ public void DaprGenericClientBuilder_ShouldUpdateHttpEndpoint_WhenGrpcEndpointIsProvided()
+ {
+ // Arrange
+ var builder = new SampleDaprGenericClientBuilder();
+ const string endpointValue = "http://sample-endpoint";
+
+ // Act
+ builder.UseGrpcEndpoint(endpointValue);
+
+ // Assert
+ Assert.Equal(endpointValue, builder.GrpcEndpoint);
+ }
+
+ [Fact]
+ public void DaprGenericClientBuilder_ShouldUpdateJsonSerializerOptions()
+ {
+ // Arrange
+ const int maxDepth = 8;
+ const bool writeIndented = true;
+ var builder = new SampleDaprGenericClientBuilder();
+ var options = new JsonSerializerOptions
+ {
+ WriteIndented = writeIndented,
+ MaxDepth = maxDepth
+ };
+
+ // Act
+ builder.UseJsonSerializationOptions(options);
+
+ // Assert
+ Assert.Equal(writeIndented, builder.JsonSerializerOptions.WriteIndented);
+ Assert.Equal(maxDepth, builder.JsonSerializerOptions.MaxDepth);
+ }
+
+ [Fact]
+ public void DaprGenericClientBuilder_ShouldUpdateDaprApiToken()
+ {
+ // Arrange
+ const string apiToken = "abc123";
+ var builder = new SampleDaprGenericClientBuilder();
+
+ // Act
+ builder.UseDaprApiToken(apiToken);
+
+ // Assert
+ Assert.Equal(apiToken, builder.DaprApiToken);
+ }
+
+ [Fact]
+ public void DaprGenericClientBuilder_ShouldUpdateTimeout()
+ {
+ // Arrange
+ var timeout = new TimeSpan(4, 2, 1, 2);
+ var builder = new SampleDaprGenericClientBuilder();
+
+ // Act
+ builder.UseTimeout(timeout);
+
+ // Assert
+ Assert.Equal(timeout, builder.Timeout);
+ }
+
+ private class SampleDaprGenericClientBuilder : DaprGenericClientBuilder
+ {
+ public override SampleDaprGenericClientBuilder Build()
+ {
+ // Implementation
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
new file mode 100644
index 000000000..8f39e1713
--- /dev/null
+++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
@@ -0,0 +1,42 @@
+
+
+
+ enable
+ enable
+ false
+ true
+
+
+
+
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+ all
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs
new file mode 100644
index 000000000..0efb5e879
--- /dev/null
+++ b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs
@@ -0,0 +1,55 @@
+using Dapr.Messaging.PublishSubscribe;
+
+namespace Dapr.Messaging.Test.PublishSubscribe
+{
+ public class MessageHandlingPolicyTest
+ {
+ [Fact]
+ public void Test_MessageHandlingPolicy_Constructor()
+ {
+ var timeoutDuration = TimeSpan.FromMilliseconds(2000);
+ const TopicResponseAction defaultResponseAction = TopicResponseAction.Drop;
+
+ var policy = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction);
+
+ Assert.Equal(timeoutDuration, policy.TimeoutDuration);
+ Assert.Equal(defaultResponseAction, policy.DefaultResponseAction);
+ }
+
+ [Fact]
+ public void Test_MessageHandlingPolicy_Equality()
+ {
+ var timeSpan1 = TimeSpan.FromMilliseconds(1000);
+ var timeSpan2 = TimeSpan.FromMilliseconds(2000);
+
+ var policy1 = new MessageHandlingPolicy(timeSpan1, TopicResponseAction.Success);
+ var policy2 = new MessageHandlingPolicy(timeSpan1, TopicResponseAction.Success);
+ var policy3 = new MessageHandlingPolicy(timeSpan2, TopicResponseAction.Retry);
+
+ Assert.Equal(policy1, policy2); // Value Equality
+ Assert.NotEqual(policy1, policy3); // Different values
+ }
+
+ [Fact]
+ public void Test_MessageHandlingPolicy_Immutability()
+ {
+ var timeoutDuration = TimeSpan.FromMilliseconds(2000);
+ const TopicResponseAction defaultResponseAction = TopicResponseAction.Drop;
+
+ var policy1 = new MessageHandlingPolicy(timeoutDuration, defaultResponseAction);
+
+ var newTimeoutDuration = TimeSpan.FromMilliseconds(3000);
+ const TopicResponseAction newDefaultResponseAction = TopicResponseAction.Retry;
+
+ // Creating a new policy with different values.
+ var policy2 = policy1 with
+ {
+ TimeoutDuration = newTimeoutDuration, DefaultResponseAction = newDefaultResponseAction
+ };
+
+ // Asserting that original policy is unaffected by changes made to new policy.
+ Assert.Equal(timeoutDuration, policy1.TimeoutDuration);
+ Assert.Equal(defaultResponseAction, policy1.DefaultResponseAction);
+ }
+ }
+}
diff --git a/test/Dapr.Messaging.Test/protos/test.proto b/test/Dapr.Messaging.Test/protos/test.proto
new file mode 100644
index 000000000..b1c1ad8a9
--- /dev/null
+++ b/test/Dapr.Messaging.Test/protos/test.proto
@@ -0,0 +1,32 @@
+// ------------------------------------------------------------------------
+// Copyright 2021 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+syntax = "proto3";
+
+option csharp_namespace = "Dapr.Client.Autogen.Test.Grpc.v1";
+
+message TestRun {
+ repeated TestCase tests = 1;
+}
+
+message TestCase {
+ string name = 1;
+}
+
+message Request {
+ string RequestParameter = 1;
+}
+
+message Response {
+ string Name = 1;
+}
\ No newline at end of file