Skip to content

Commit

Permalink
Merge branch 'master' into workflow-ex-monitor
Browse files Browse the repository at this point in the history
Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
  • Loading branch information
WhitWaldo authored Nov 12, 2024
2 parents 6598180 + 6e2841a commit 072ab13
Show file tree
Hide file tree
Showing 20 changed files with 966 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/itests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jobs:
name: run integration tests
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
dotnet-version: ['6.0', '7.0', '8.0']
include:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/sdk_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
name: Test .NET ${{ matrix.dotnet-version }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
dotnet-version: ['6.0', '7.0', '8.0']
include:
Expand Down
22 changes: 22 additions & 0 deletions all.sln
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowMonitor", "examples\Workflow\WorkflowMonitor\WorkflowMonitor.csproj", "{7F73A3D8-FFC2-4E31-AA3D-A4840316A8C6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs", "src\Dapr.Jobs\Dapr.Jobs.csproj", "{C8BB6A85-A7EA-40C0-893D-F36F317829B3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs.Test", "test\Dapr.Jobs.Test\Dapr.Jobs.Test.csproj", "{BF9828E9-5597-4D42-AA6E-6E6C12214204}"
Expand Down Expand Up @@ -316,6 +323,18 @@ Global
{7F73A3D8-FFC2-4E31-AA3D-A4840316A8C6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7F73A3D8-FFC2-4E31-AA3D-A4840316A8C6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7F73A3D8-FFC2-4E31-AA3D-A4840316A8C6}.Release|Any CPU.Build.0 = Release|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CPU
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -385,6 +404,9 @@ Global
{B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{7F73A3D8-FFC2-4E31-AA3D-A4840316A8C6} = {BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9}
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
{C8BB6A85-A7EA-40C0-893D-F36F317829B3} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{BF9828E9-5597-4D42-AA6E-6E6C12214204} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{D9697361-232F-465D-A136-4561E0E88488} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Text;
using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient();
var app = builder.Build();

//Process each message returned from the subscription
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}

var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();

//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
HandleMessageAsync, cancellationTokenSource.Token);

await Task.Delay(TimeSpan.FromMinutes(1));

//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Dapr.Messaging\Dapr.Messaging.csproj" />
</ItemGroup>

</Project>
22 changes: 22 additions & 0 deletions src/Dapr.Messaging/Dapr.Messaging.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<Description>This package contains the reference assemblies for developing messaging services using Dapr.</Description>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageId>Dapr.Messaging</PackageId>
<Title>Dapr Messaging SDK</Title>
<Description>Dapr Messaging SDK for building applications that utilize messaging components.</Description>
<VersionSuffix>alpha</VersionSuffix>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Dapr.Common\Dapr.Common.csproj" />
<ProjectReference Include="..\Dapr.Protos\Dapr.Protos.csproj" />
</ItemGroup>

</Project>
31 changes: 31 additions & 0 deletions src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// The base implementation of a Dapr pub/sub client.
/// </summary>
public abstract class DaprPublishSubscribeClient
{
/// <summary>
/// Dynamically subscribes to a Publish/Subscribe component and topic.
/// </summary>
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
/// <param name="topicName">The name of the topic to subscribe to.</param>
/// <param name="options">Configuration options.</param>
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public abstract Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Common;
using Microsoft.Extensions.Configuration;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// Builds a <see cref="DaprPublishSubscribeClient"/>.
/// </summary>
public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder<DaprPublishSubscribeClient>
{
/// <summary>
/// Used to initialize a new instance of the <see cref="DaprPublishSubscribeClientBuilder"/>.
/// </summary>
/// <param name="configuration">An optional instance of <see cref="IConfiguration"/>.</param>
public DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : base(configuration)
{
}

/// <summary>
/// Builds the client instance from the properties of the builder.
/// </summary>
/// <returns>The Dapr client instance.</returns>
/// <summary>
/// Builds the client instance from the properties of the builder.
/// </summary>
public override DaprPublishSubscribeClient Build()
{
var daprClientDependencies = BuildDaprClientDependencies();
var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel);

return new DaprPublishSubscribeGrpcClient(client);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using P = Dapr.Client.Autogen.Grpc.v1.Dapr;

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// A client for interacting with the Dapr endpoints.
/// </summary>
internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient
{
private readonly P.DaprClient daprClient;

/// <summary>
/// Creates a new instance of a <see cref="DaprPublishSubscribeGrpcClient"/>
/// </summary>
public DaprPublishSubscribeGrpcClient(P.DaprClient client)
{
daprClient = client;
}

/// <summary>
/// Dynamically subscribes to a Publish/Subscribe component and topic.
/// </summary>
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
/// <param name="topicName">The name of the topic to subscribe to.</param>
/// <param name="options">Configuration options.</param>
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
{
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient);
await receiver.SubscribeAsync(cancellationToken);
return receiver;
}
}

44 changes: 44 additions & 0 deletions src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// Options used to configure the dynamic Dapr subscription.
/// </summary>
/// <param name="MessageHandlingPolicy">Describes the policy to take on messages that have not been acknowledged within the timeout period.</param>
public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandlingPolicy)
{
/// <summary>
/// Subscription metadata.
/// </summary>
public IReadOnlyDictionary<string, string> Metadata { get; init; } = new Dictionary<string, string>();

/// <summary>
/// The optional name of the dead-letter topic to send unprocessed messages to.
/// </summary>
public string? DeadLetterTopic { get; init; }

/// <summary>
/// If populated, this reflects the maximum number of messages that can be queued for processing on the replica. By default,
/// no maximum boundary is enforced.
/// </summary>
public int? MaximumQueuedMessages { get; init; }

/// <summary>
/// The maximum amount of time to take to dispose of acknowledgement messages after the cancellation token has
/// been signaled.
/// </summary>
public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Dapr.Messaging.PublishSubscribe.Extensions;

/// <summary>
/// Contains extension methods for using Dapr Publish/Subscribe with dependency injection.
/// </summary>
public static class PublishSubscribeServiceCollectionExtensions
{
/// <summary>
/// Adds Dapr Publish/Subscribe support to the service collection.
/// </summary>
/// <param name="services">The <see cref="IServiceCollection"/>.</param>
/// <param name="configure">Optionally allows greater configuration of the <see cref="DaprPublishSubscribeClient"/> using injected services.</param>
/// <returns></returns>
public static IServiceCollection AddDaprPubSubClient(this IServiceCollection services, Action<IServiceProvider, DaprPublishSubscribeClientBuilder>? configure = null)
{
ArgumentNullException.ThrowIfNull(services, nameof(services));

//Register the IHttpClientFactory implementation
services.AddHttpClient();

services.TryAddSingleton(serviceProvider =>
{
var httpClientFactory = serviceProvider.GetRequiredService<IHttpClientFactory>();

var builder = new DaprPublishSubscribeClientBuilder();
builder.UseHttpClientFactory(httpClientFactory);

configure?.Invoke(serviceProvider, builder);

return builder.Build();
});

return services;
}
}
23 changes: 23 additions & 0 deletions src/Dapr.Messaging/PublishSubscribe/MessageHandlingPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------


namespace Dapr.Messaging.PublishSubscribe;

/// <summary>
/// Defines the policy for handling streaming message subscriptions, including retry logic and timeout settings.
/// </summary>
/// <param name="TimeoutDuration">The duration to wait before timing out a message handling operation.</param>
/// <param name="DefaultResponseAction">The default action to take when a message handling operation times out.</param>
public sealed record MessageHandlingPolicy(TimeSpan TimeoutDuration, TopicResponseAction DefaultResponseAction);

Loading

0 comments on commit 072ab13

Please sign in to comment.