Skip to content

Implement streaming features for the listen task and for the asyncapi call #478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<ItemGroup>
<PackageReference Include="IdentityServer4" Version="4.1.2" NoWarn="NU1902" />
<PackageReference Include="IdentityServer4.Storage" Version="4.1.2" NoWarn="NU1902" />
<PackageReference Include="Polly" Version="8.5.0" />
<PackageReference Include="Polly" Version="8.5.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="9.0.0" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.2" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="9.0.1" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.3" />
<PackageReference Include="System.Reactive" Version="6.0.1" />
</ItemGroup>

Expand Down
6 changes: 3 additions & 3 deletions src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="9.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="9.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="9.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.2.0" />
</ItemGroup>
Expand Down
6 changes: 3 additions & 3 deletions src/cli/Synapse.Cli/Synapse.Cli.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.1" />
<PackageReference Include="moment.net" Version="1.3.4" />
<PackageReference Include="NetEscapades.Configuration.Yaml" Version="3.1.0" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.3" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
<PackageReference Include="System.CommandLine.NamingConventionBinder" Version="2.0.0-beta4.22272.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<PackageReference Include="Neuroglia.Mediation" Version="4.18.1" />
<PackageReference Include="Neuroglia.Plugins" Version="4.18.1" />
<PackageReference Include="Neuroglia.Serialization.Xml" Version="4.18.1" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha6.3" />
</ItemGroup>

<ItemGroup>
Expand Down
10 changes: 8 additions & 2 deletions src/core/Synapse.Core/Resources/CorrelationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ public record CorrelationContext
/// <summary>
/// Gets a key/value mapping of the context's correlation keys
/// </summary>
[DataMember(Name = "keys", Order = 2), JsonPropertyName("keys"), JsonPropertyOrder(2), YamlMember(Alias = "keys", Order = 2)]
[DataMember(Name = "keys", Order = 3), JsonPropertyName("keys"), JsonPropertyOrder(3), YamlMember(Alias = "keys", Order = 3)]
public virtual EquatableDictionary<string, string> Keys { get; set; } = [];

/// <summary>
/// Gets a key/value mapping of all correlated events, with the key being the index of the matched correlation filter
/// </summary>
[DataMember(Name = "events", Order = 3), JsonPropertyName("events"), JsonPropertyOrder(3), YamlMember(Alias = "events", Order = 3)]
[DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)]
public virtual EquatableDictionary<int, CloudEvent> Events { get; set; } = [];

/// <summary>
/// Gets the offset that serves as the index of the event being processed by the consumer, if streaming has been enabled for the correlation associated with the context.
/// </summary>
[DataMember(Name = "offset", Order = 5), JsonPropertyName("offset"), JsonPropertyOrder(5), YamlMember(Alias = "offset", Order = 5)]
public virtual uint? Offset { get; set; }

}
8 changes: 7 additions & 1 deletion src/core/Synapse.Core/Resources/CorrelationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,16 @@ public record CorrelationSpec
[DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)]
public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!;

/// <summary>
/// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete
/// </summary>
[DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)]
public virtual bool Stream { get; set; }

/// <summary>
/// Gets/sets an object used to configure the correlation's outcome
/// </summary>
[DataMember(Name = "outcome", Order = 5), JsonPropertyName("outcome"), JsonPropertyOrder(5), YamlMember(Alias = "outcome", Order = 5)]
[DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)]
public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!;

}
4 changes: 2 additions & 2 deletions src/core/Synapse.Core/Synapse.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@
<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.12.0" />
<PackageReference Include="Docker.DotNet" Version="3.125.15" />
<PackageReference Include="KubernetesClient" Version="15.0.1" />
<PackageReference Include="KubernetesClient" Version="16.0.1" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.18.1" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.18.1" />
<PackageReference Include="Semver" Version="3.0.0" />
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha6.2" />
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha6.3" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
{
var index = updatedResource.Status.Contexts.IndexOf(existingContext);
updatedResource.Status.Contexts.Remove(existingContext);
if (!completed) updatedResource.Status.Contexts.Insert(index, context);
updatedResource.Status.Contexts.Insert(index, context);
}
if (completed)
{
Expand Down
4 changes: 2 additions & 2 deletions src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="Neuroglia.Data.Expressions.JavaScript" Version="4.18.1" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.18.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 3 additions & 3 deletions src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
<PackageReference Include="Blazor.Bootstrap" Version="3.2.0" />
<PackageReference Include="BlazorMonaco" Version="3.3.0" />
<PackageReference Include="IdentityModel" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Authentication" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.DevServer" Version="9.0.0" PrivateAssets="all" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly" Version="9.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Authentication" Version="9.0.1" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.DevServer" Version="9.0.1" PrivateAssets="all" />
<PackageReference Include="moment.net" Version="1.3.4" />
<PackageReference Include="Neuroglia.Blazor.Dagre" Version="4.18.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected override async Task OnResourceCreatedAsync(WorkflowInstance workflowIn
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
this.Logger.LogError("An error occurred while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/operator/Synapse.Operator/Synapse.Operator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@

<ItemGroup>
<PackageReference Include="Cronos" Version="0.9.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
</ItemGroup>

Expand Down
39 changes: 39 additions & 0 deletions src/runner/Synapse.Runner/IStreamedCloudEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Synapse.Runner;

/// <summary>
/// Defines the fundamentals of an object used to wrap a streamed <see cref="CloudEvent"/>
/// </summary>
public interface IStreamedCloudEvent
{

/// <summary>
/// Gets the streamed <see cref="CloudEvent"/>
/// </summary>
CloudEvent Event { get; }

/// <summary>
/// Gets the position of the <see cref="CloudEvent"/> within its originating stream
/// </summary>
uint Offset { get; }

/// <summary>
/// Acknowledges that the <see cref="CloudEvent"/> has been successfully processed
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
Task AckAsync(CancellationToken cancellationToken = default);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
using Synapse.Events.Tasks;
using Synapse.Events.Workflows;
using System.Net.Mime;
using System.Reactive.Disposables;
using System.Reactive.Threading.Tasks;

namespace Synapse.Runner.Services;

Expand Down Expand Up @@ -304,11 +306,125 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa
this.Logger.LogInformation("The workflow's execution has been resumed.");
}

/// <inheritdoc/>
public virtual async Task<IObservable<IStreamedCloudEvent>> StreamAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(task);
if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task));
if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task doesn't use streaming, the {nameof(CorrelateAsync)} method must be used instead");
if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return Observable.Empty<IStreamedCloudEvent>();
var @namespace = task.Workflow.Instance.GetNamespace()!;
var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}";
Correlation? correlation = null;
try { correlation = await this.Api.Correlations.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false); }
catch { }
if (correlation == null)
{
correlation = await this.Api.Correlations.CreateAsync(new()
{
Metadata = new()
{
Namespace = @namespace,
Name = name,
Labels = new Dictionary<string, string>()
{
{ SynapseDefaults.Resources.Labels.WorkflowInstance, this.Instance.GetQualifiedName() }
}
},
Spec = new()
{
Source = new ResourceReference<WorkflowInstance>(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()),
Lifetime = CorrelationLifetime.Ephemeral,
Events = listenTask.Listen.To,
Stream = true,
Expressions = task.Workflow.Definition.Evaluate ?? new(),
Outcome = new()
{
Correlate = new()
{
Instance = task.Workflow.Instance.GetQualifiedName(),
Task = task.Instance.Reference.OriginalString
}
}
}
}, cancellationToken).ConfigureAwait(false);
}
var taskCompletionSource = new TaskCompletionSource<CorrelationContext>();
var cancellationTokenRegistration = cancellationToken.Register(() => taskCompletionSource.TrySetCanceled());
var correlationSubscription = this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken)
.ToObservable()
.Where(e => e.Type == ResourceWatchEventType.Updated)
.Select(e => e.Resource.Status?.Correlation?.Contexts)
.Scan((Previous: (EquatableDictionary<string, CorrelationContext>?)null, Current: (EquatableDictionary<string, CorrelationContext>?)null), (accumulator, current) => (accumulator.Current ?? [], current))
.Where(v => v.Current?.Count > v.Previous?.Count) //ensures we are not handling changes in a circular loop: if length of current is smaller than previous, it means a context has been processed
.Subscribe(value =>
{
var patch = JsonPatchUtility.CreateJsonPatchFromDiff(value.Previous, value.Current);
var patchOperation = patch.Operations.FirstOrDefault(o => o.Op == OperationType.Add && o.Path[0] == task.Instance.Reference.OriginalString);
if (patchOperation == null) return;
context = this.JsonSerializer.Deserialize<CorrelationContext>(patchOperation.Value!)!;
taskCompletionSource.SetResult(context);
});
var endOfStream = false;
var stopObservable = taskCompletionSource.Task.ToObservable();
var stopSubscription = stopObservable.Take(1).Subscribe(_ => endOfStream = true);
return Observable.Create<StreamedCloudEvent>(observer =>
{
var subscription = Observable.Using(
() => new CompositeDisposable
{
cancellationTokenRegistration,
correlationSubscription
},
disposable => this.Api.Correlations.MonitorAsync(correlation.GetName(), correlation.GetNamespace()!, cancellationToken)
.ToObservable()
.Where(e => e.Type == ResourceWatchEventType.Updated)
.Select(e => e.Resource.Status?.Contexts?.FirstOrDefault())
.Where(c => c != null)
.SelectMany(c =>
{
var acknowledgedOffset = c!.Offset.HasValue ? (int)c.Offset.Value : 0;
return c.Events.Values
.Skip(acknowledgedOffset)
.Select((evt, index) => new
{
ContextId = c.Id,
Event = evt,
Offset = (uint)(acknowledgedOffset + index + 1)
});
})
.Distinct(e => e.Offset)
.Select(e => new StreamedCloudEvent(e.Event, e.Offset, async (offset, token) =>
{
var original = await this.Api.Correlations.GetAsync(name, @namespace, token).ConfigureAwait(false);
var updated = original.Clone()!;
var context = updated.Status?.Contexts.FirstOrDefault(c => c.Id == e.ContextId);
if (context == null)
{
this.Logger.LogError("Failed to find a context with the specified id '{contextId}' in correlation '{name}.{@namespace}'", e.ContextId, name, @namespace);
throw new Exception($"Failed to find a context with the specified id '{e.ContextId}' in correlation '{name}.{@namespace}'");
}
context.Offset = offset;
var patch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
await this.Api.Correlations.PatchStatusAsync(name, @namespace, new Patch(PatchType.JsonPatch, patch), cancellationToken: token).ConfigureAwait(false);
})))
.Subscribe(e =>
{
observer.OnNext(e);
if (endOfStream) observer.OnCompleted();
},
ex => observer.OnError(ex),
() => observer.OnCompleted());
return new CompositeDisposable(subscription, stopSubscription);
});
}

/// <inheritdoc/>
public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(task);
if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task));
if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead");
if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return context;
var @namespace = task.Workflow.Instance.GetNamespace()!;
var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}";
Expand Down
Loading
Loading