Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the AuthorizationInfo to resolve referenced authentication policies #399

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/core/Synapse.Core/SynapseDefaults.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Synapse.Resources;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using Synapse.Resources;
using System.Diagnostics;
using System.Reflection;

Expand Down Expand Up @@ -766,7 +766,7 @@ public static class Secrets
/// <summary>
/// Gets the prefix for all secrets related environment variables
/// </summary>
public const string Prefix = EnvironmentVariables.Prefix + "SECRETS";
public const string Prefix = EnvironmentVariables.Prefix + "SECRETS_";
/// <summary>
/// Gets the name of the environment variable used to configure the path to the directory that contains secrets files
/// </summary>
Expand Down
9 changes: 7 additions & 2 deletions src/runner/Synapse.Runner/AuthorizationInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Docker.DotNet.Models;
using Neuroglia.Data.Infrastructure.ResourceOriented;
using ServerlessWorkflow.Sdk.Models.Authentication;
using System.Text;
Expand Down Expand Up @@ -42,16 +41,22 @@ public class AuthorizationInfo(string scheme, string parameter)
/// <summary>
/// Creates a new <see cref="AuthorizationInfo"/> based on the specified <see cref="AuthenticationPolicyDefinition"/>
/// </summary>
/// <param name="workflow">The <see cref="WorkflowDefinition"/> that defines the <see cref="AuthenticationPolicyDefinition"/> to create a new <see cref="AuthorizationInfo"/> for</param>
/// <param name="authentication">The <see cref="AuthenticationPolicyDefinition"/> to create a new <see cref="AuthorizationInfo"/> for</param>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new <see cref="AuthorizationInfo"/> based on the specified <see cref="AuthenticationPolicyDefinition"/></returns>
public static async Task<AuthorizationInfo> CreateAsync(AuthenticationPolicyDefinition authentication, IServiceProvider serviceProvider, CancellationToken cancellationToken = default)
public static async Task<AuthorizationInfo> CreateAsync(WorkflowDefinition workflow, AuthenticationPolicyDefinition authentication, IServiceProvider serviceProvider, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(nameof(authentication));
ArgumentNullException.ThrowIfNull(nameof(serviceProvider));
string scheme, parameter;
var logger = serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger("AuthenticationPolicyHandler");
if (!string.IsNullOrWhiteSpace(authentication.Use))
{
if (workflow.Use?.Authentications?.TryGetValue(authentication.Use, out AuthenticationPolicyDefinition? referencedAuthentication) != true || referencedAuthentication == null) throw new NullReferenceException($"Failed to find the specified authentication policy '{authentication.Use}'");
else authentication = referencedAuthentication;
}
var isSecretBased = authentication.TryGetBaseSecret(out var secretName);
object? authenticationProperties = null;
if (isSecretBased && !string.IsNullOrWhiteSpace(secretName))
Expand Down
6 changes: 4 additions & 2 deletions src/runner/Synapse.Runner/Extensions/HttpClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ public static class HttpClientExtensions
/// Configures the <see cref="HttpClient"/> to use the specified authentication mechanism
/// </summary>
/// <param name="httpClient">The <see cref="HttpClient"/> to configure</param>
/// <param name="workflow">The <see cref="WorkflowDefinition"/> that defines the authentication to configure</param>
/// <param name="authentication">An object that describes the authentication mechanism to use</param>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
public static async Task ConfigureAuthenticationAsync(this HttpClient httpClient, AuthenticationPolicyDefinition? authentication, IServiceProvider serviceProvider, CancellationToken cancellationToken = default)
public static async Task ConfigureAuthenticationAsync(this HttpClient httpClient, WorkflowDefinition workflow, AuthenticationPolicyDefinition? authentication, IServiceProvider serviceProvider, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(workflow);
if (authentication == null) return;
var authorization = await AuthorizationInfo.CreateAsync(authentication, serviceProvider, cancellationToken).ConfigureAwait(false);
var authorization = await AuthorizationInfo.CreateAsync(workflow, authentication, serviceProvider, cancellationToken).ConfigureAwait(false);
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue(authorization.Scheme, authorization.Parameter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ protected virtual async Task OnIterationFaultAsync(ITaskExecutor executor, Cance
ArgumentNullException.ThrowIfNull(executor);
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -123,7 +122,6 @@ protected virtual async Task OnIterationCompletedAsync(ITaskExecutor executor, C
var output = executor.Task.Output!;
this.Executors.Remove(executor);
if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false);
await executor.DisposeAsync().ConfigureAwait(false);
var index = int.Parse(last.Reference.OriginalString.Split('/', StringSplitOptions.RemoveEmptyEntries)[^2]) + 1;
if (index == this.Collection.Count)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,9 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell
using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
foreach (var subExecutor in this.Executors)
{
await subExecutor.CancelAsync(cancellationToken).ConfigureAwait(false);
await subExecutor.DisposeAsync().ConfigureAwait(false);
this.Executors.Remove(executor);
}
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
Expand All @@ -102,11 +100,7 @@ protected virtual async Task OnSubTaskCompletedAsync(ITaskExecutor executor, Can
using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
if (this.Task.Instance.Status != TaskInstanceStatus.Running)
{
if (this.Executors.Remove(executor))
{
await executor.CancelAsync(cancellationToken).ConfigureAwait(false);
await executor.DisposeAsync().ConfigureAwait(false);
}
if (this.Executors.Remove(executor)) await executor.CancelAsync(cancellationToken).ConfigureAwait(false);
}
if (this.Task.Definition.Fork.Compete == true)
{
Expand All @@ -115,7 +109,6 @@ protected virtual async Task OnSubTaskCompletedAsync(ITaskExecutor executor, Can
{
this.Executors.Remove(concurrentTaskExecutor);
await concurrentTaskExecutor.CancelAsync(cancellationToken).ConfigureAwait(false);
await concurrentTaskExecutor.DisposeAsync().ConfigureAwait(false);
}
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell
ArgumentNullException.ThrowIfNull(executor);
var error = executor.Task.Instance.Error ?? throw new NullReferenceException();
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
await this.SetErrorAsync(error, cancellationToken).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
{
this.Http = (HttpCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(HttpCallDefinition))!;
var authentication = this.Http.Endpoint.Authentication == null ? null : await this.Task.Workflow.Expressions.EvaluateAsync<AuthenticationPolicyDefinition>(this.Http.Endpoint.Authentication, this.Task.Input, this.Task.Arguments, cancellationToken).ConfigureAwait(false);
await this.HttpClient.ConfigureAuthenticationAsync(authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
await this.HttpClient.ConfigureAuthenticationAsync(this.Task.Workflow.Definition, authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
}
catch(Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
{
this.OpenApi = (OpenApiCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(OpenApiCallDefinition))!;
using var httpClient = this.HttpClientFactory.CreateClient();
await httpClient.ConfigureAuthenticationAsync(this.OpenApi.Document.Endpoint.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
await httpClient.ConfigureAuthenticationAsync(this.Task.Workflow.Definition, this.OpenApi.Document.Endpoint.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
using var request = new HttpRequestMessage(HttpMethod.Get, this.OpenApi.Document.EndpointUri);
using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
Expand Down Expand Up @@ -226,7 +226,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
}
}
using var httpClient = this.HttpClientFactory.CreateClient();
await httpClient.ConfigureAuthenticationAsync(this.OpenApi.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
await httpClient.ConfigureAuthenticationAsync(this.Task.Workflow.Definition, this.OpenApi.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
using var response = await httpClient.SendAsync(request, cancellationToken);
if (response.StatusCode == HttpStatusCode.ServiceUnavailable) continue;
var rawContent = await response.Content.ReadAsByteArrayAsync(cancellationToken)!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public virtual async Task<Stream> ReadAsync(WorkflowDefinition workflow, Externa
/// <returns>The specified <see cref="ExternalResourceDefinition"/>'s content <see cref="Stream"/></returns>
protected virtual async Task<Stream> ReadOverHttpAsync(WorkflowDefinition workflow, ExternalResourceDefinition resource, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(workflow);
ArgumentNullException.ThrowIfNull(resource);
using var httpClient = this.HttpClientFactory.CreateClient();
await httpClient.ConfigureAuthenticationAsync(resource.Endpoint.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
await httpClient.ConfigureAuthenticationAsync(workflow, resource.Endpoint.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
return await httpClient.GetStreamAsync(resource.EndpointUri, cancellationToken).ConfigureAwait(false);
}

Expand Down
54 changes: 50 additions & 4 deletions src/runner/Synapse.Runner/Services/TaskExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,34 @@ public abstract class TaskExecutor<TDefinition>(IServiceProvider serviceProvider
/// <inheritdoc/>
public virtual async Task InitializeAsync(CancellationToken cancellationToken = default)
{
await this.DoInitializeAsync(cancellationToken).ConfigureAwait(false);
await this.Task.InitializeAsync(cancellationToken).ConfigureAwait(false);
this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Initialized));
try
{
await this.DoInitializeAsync(cancellationToken).ConfigureAwait(false);
await this.Task.InitializeAsync(cancellationToken).ConfigureAwait(false);
this.Subject.OnNext(new TaskLifeCycleEvent(TaskLifeCycleEventType.Initialized));
}
catch(HttpRequestException ex)
{
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Communication,
Title = ErrorTitle.Communication,
Status = ex.StatusCode.HasValue ? (ushort)ex.StatusCode : (ushort)ErrorStatus.Communication,
Detail = ex.Message,
Instance = this.Task.Instance.Reference
}, cancellationToken).ConfigureAwait(false);
}
catch(Exception ex)
{
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Runtime,
Title = ErrorTitle.Runtime,
Status = ErrorStatus.Runtime,
Detail = ex.Message,
Instance = this.Task.Instance.Reference
}, cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
Expand Down Expand Up @@ -171,6 +196,28 @@ await this.SetErrorAsync(new()
await this.TaskCompletionSource.Task.ConfigureAwait(false);
}
catch (OperationCanceledException) { }
catch (HttpRequestException ex)
{
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Communication,
Title = ErrorTitle.Communication,
Status = ex.StatusCode.HasValue ? (ushort)ex.StatusCode : (ushort)ErrorStatus.Communication,
Detail = ex.Message,
Instance = this.Task.Instance.Reference
}, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
await this.SetErrorAsync(new Error()
{
Type = ErrorType.Runtime,
Title = ErrorTitle.Runtime,
Status = ErrorStatus.Runtime,
Detail = ex.Message,
Instance = this.Task.Instance.Reference
}, cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
Expand Down Expand Up @@ -198,7 +245,6 @@ protected virtual async Task BeforeExecuteAsync(CancellationToken cancellationTo
}
input = executor.Task.Output ?? new();
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
}
}

Expand Down
1 change: 0 additions & 1 deletion src/runner/Synapse.Runner/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ protected virtual async Task OnTaskCompletedAsync(ITaskExecutor executor, Cancel
};
var completedTask = executor.Task;
this.Executors.Remove(executor);
await executor.DisposeAsync().ConfigureAwait(false);
if (nextDefinition == null)
{
await this.SetResultAsync(completedTask.Output, cancellationToken).ConfigureAwait(false);
Expand Down
Loading