Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.3.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.4" />
<PackageVersion Include="Microsoft.Extensions.Http" Version="6.0.0" />
Expand Down
21 changes: 21 additions & 0 deletions all.sln
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common", "src\Dapr.Com
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -303,6 +309,18 @@ Global
{CDB47863-BEBD-4841-A807-46D868962521}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.Build.0 = Release|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -359,6 +377,9 @@ Global
{DFBABB04-50E9-42F6-B470-310E1B545638} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System.Text;
using Dapr.Messaging.PublishSubscribe;

var daprMessagingClientBuilder = new DaprPublishSubscribeClientBuilder(null);
var daprMessagingClient = daprMessagingClientBuilder.Build();

//Process each message returned from the subscription
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}

//Create a dynamic streaming subscription and subscribe with a timeout of 20 seconds and 10 seconds for message handling
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(20));
var subscription = await daprMessagingClient.SubscribeAsync("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
HandleMessageAsync, cancellationTokenSource.Token);

await Task.Delay(TimeSpan.FromMinutes(1));

//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Dapr.Messaging\Dapr.Messaging.csproj" />
</ItemGroup>

</Project>
214 changes: 214 additions & 0 deletions src/Dapr.Common/DaprGenericClientBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
using System.Text.Json;
using Grpc.Net.Client;
using Microsoft.Extensions.Configuration;

namespace Dapr.Common;

/// <summary>
/// Builder for building a generic Dapr client.
/// </summary>
public abstract class DaprGenericClientBuilder<TClientBuilder> where TClientBuilder : class
{
/// <summary>
/// Initializes a new instance of the <see cref="DaprGenericClientBuilder{TClientBuilder}"/> class.
/// </summary>
protected DaprGenericClientBuilder(IConfiguration? configuration = null)
{
this.GrpcEndpoint = DaprDefaults.GetDefaultGrpcEndpoint();
this.HttpEndpoint = DaprDefaults.GetDefaultHttpEndpoint();

this.GrpcChannelOptions = new GrpcChannelOptions()
{
// The gRPC client doesn't throw the right exception for cancellation
// by default, this switches that behavior on.
ThrowOperationCanceledOnCancellation = true,
};

this.JsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);
this.DaprApiToken = DaprDefaults.GetDefaultDaprApiToken(configuration);
}

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal string GrpcEndpoint { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal string HttpEndpoint { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal Func<HttpClient>? HttpClientFactory { get; set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
public JsonSerializerOptions JsonSerializerOptions { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal GrpcChannelOptions GrpcChannelOptions { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
public string DaprApiToken { get; private set; }

/// <summary>
/// Property exposed for testing purposes.
/// </summary>
internal TimeSpan Timeout { get; private set; }

/// <summary>
/// Overrides the HTTP endpoint used by the Dapr client for communicating with the Dapr runtime.
/// </summary>
/// <param name="httpEndpoint">
/// The URI endpoint to use for HTTP calls to the Dapr runtime. The default value will be
/// <c>DAPR_HTTP_ENDPOINT</c> first, or <c>http://127.0.0.1:DAPR_HTTP_PORT</c> as fallback
/// where <c>DAPR_HTTP_ENDPOINT</c> and <c>DAPR_HTTP_PORT</c> represents the value of the
/// corresponding environment variables.
/// </param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseHttpEndpoint(string httpEndpoint)
{
ArgumentVerifier.ThrowIfNullOrEmpty(httpEndpoint, nameof(httpEndpoint));
this.HttpEndpoint = httpEndpoint;
return this;
}

/// <summary>
/// Exposed internally for testing purposes.
/// </summary>
internal DaprGenericClientBuilder<TClientBuilder> UseHttpClientFactory(Func<HttpClient> factory)
{
this.HttpClientFactory = factory;
return this;
}

/// <summary>
/// Overrides the legacy mechanism for building an HttpClient and uses the new <see cref="IHttpClientFactory"/>
/// introduced in .NET Core 2.1.
/// </summary>
/// <param name="httpClientFactory">The factory used to create <see cref="HttpClient"/> instances.</param>
/// <returns></returns>
public DaprGenericClientBuilder<TClientBuilder> UseHttpClientFactory(IHttpClientFactory httpClientFactory)
{
this.HttpClientFactory = httpClientFactory.CreateClient;
return this;
}

/// <summary>
/// Overrides the gRPC endpoint used by the Dapr client for communicating with the Dapr runtime.
/// </summary>
/// <param name="grpcEndpoint">
/// The URI endpoint to use for gRPC calls to the Dapr runtime. The default value will be
/// <c>http://127.0.0.1:DAPR_GRPC_PORT</c> where <c>DAPR_GRPC_PORT</c> represents the value of the
/// <c>DAPR_GRPC_PORT</c> environment variable.
/// </param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseGrpcEndpoint(string grpcEndpoint)
{
ArgumentVerifier.ThrowIfNullOrEmpty(grpcEndpoint, nameof(grpcEndpoint));
this.GrpcEndpoint = grpcEndpoint;
return this;
}

/// <summary>
/// <para>
/// Uses the specified <see cref="JsonSerializerOptions"/> when serializing or deserializing using <see cref="System.Text.Json"/>.
/// </para>
/// <para>
/// The default value is created using <see cref="JsonSerializerDefaults.Web" />.
/// </para>
/// </summary>
/// <param name="options">Json serialization options.</param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseJsonSerializationOptions(JsonSerializerOptions options)
{
this.JsonSerializerOptions = options;
return this;
}

/// <summary>
/// Uses the provided <paramref name="grpcChannelOptions" /> for creating the <see cref="GrpcChannel" />.
/// </summary>
/// <param name="grpcChannelOptions">The <see cref="GrpcChannelOptions" /> to use for creating the <see cref="GrpcChannel" />.</param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseGrpcChannelOptions(GrpcChannelOptions grpcChannelOptions)
{
this.GrpcChannelOptions = grpcChannelOptions;
return this;
}

/// <summary>
/// Adds the provided <paramref name="apiToken" /> on every request to the Dapr runtime.
/// </summary>
/// <param name="apiToken">The token to be added to the request headers/>.</param>
/// <returns>The <see cref="DaprGenericClientBuilder{TClientBuilder}" /> instance.</returns>
public DaprGenericClientBuilder<TClientBuilder> UseDaprApiToken(string apiToken)
{
this.DaprApiToken = apiToken;
return this;
}

/// <summary>
/// Sets the timeout for the HTTP client used by the Dapr client.
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
public DaprGenericClientBuilder<TClientBuilder> UseTimeout(TimeSpan timeout)
{
this.Timeout = timeout;
return this;
}

/// <summary>
/// Builds out the inner DaprClient that provides the core shape of the
/// runtime gRPC client used by the consuming package.
/// </summary>
/// <exception cref="InvalidOperationException"></exception>
protected (GrpcChannel channel, HttpClient httpClient, Uri httpEndpoint) BuildDaprClientDependencies()
{
var grpcEndpoint = new Uri(this.GrpcEndpoint);
if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https")
{
throw new InvalidOperationException("The gRPC endpoint must use http or https.");
}

if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp))
{
// Set correct switch to make secure gRPC service calls. This switch must be set before creating the GrpcChannel.
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
}

var httpEndpoint = new Uri(this.HttpEndpoint);
if (httpEndpoint.Scheme != "http" && httpEndpoint.Scheme != "https")
{
throw new InvalidOperationException("The HTTP endpoint must use http or https.");
}

var channel = GrpcChannel.ForAddress(this.GrpcEndpoint, this.GrpcChannelOptions);

var httpClient = HttpClientFactory is not null ? HttpClientFactory() : new HttpClient();
if (this.Timeout > TimeSpan.Zero)
{
httpClient.Timeout = this.Timeout;
}

return (channel, httpClient, httpEndpoint);
}

/// <summary>
/// Builds the client instance from the properties of the builder.
/// </summary>
/// <returns>The Dapr client instance.</returns>
/// <summary>
/// Builds the client instance from the properties of the builder.
/// </summary>
public abstract TClientBuilder Build();
}

22 changes: 22 additions & 0 deletions src/Dapr.Messaging/Dapr.Messaging.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>This package contains the reference assemblies for developing messaging services using Dapr.</Description>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageId>Dapr.Messaging</PackageId>
<Title>Dapr Messaging SDK</Title>
<Description>Dapr Messaging SDK for building applications that utilize messaging components.</Description>
<VersionSuffix>alpha</VersionSuffix>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Dapr.Common\Dapr.Common.csproj" />
<ProjectReference Include="..\Dapr.Protos\Dapr.Protos.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// The base implementation of a Dapr pub/sub client.
/// </summary>
public abstract class DaprPublishSubscribeClient
{
/// <summary>
/// Dynamically subscribes to a Publish/Subscribe component and topic.
/// </summary>
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
/// <param name="topicName">The name of the topic to subscribe to.</param>
/// <param name="options">Configuration options.</param>
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public abstract Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken);
}
Loading
Loading