Skip to content

Commit

Permalink
User metadata and workflow metadata support (#378)
Browse files Browse the repository at this point in the history
Fixes #359
  • Loading branch information
cretz authored Dec 11, 2024
1 parent 66436bf commit 2546f07
Show file tree
Hide file tree
Showing 24 changed files with 666 additions and 67 deletions.
4 changes: 2 additions & 2 deletions src/Temporalio/Client/Schedules/Schedule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public record Schedule(
/// <param name="proto">Proto.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Converted value.</returns>
internal static Schedule FromProto(
internal static async Task<Schedule> FromProtoAsync(
Api.Schedule.V1.Schedule proto, DataConverter dataConverter) =>
new(
Action: ScheduleAction.FromProto(proto.Action, dataConverter),
Action: await ScheduleAction.FromProtoAsync(proto.Action, dataConverter).ConfigureAwait(false),
Spec: ScheduleSpec.FromProto(proto.Spec))
{
Policy = SchedulePolicy.FromProto(proto.Policies),
Expand Down
5 changes: 3 additions & 2 deletions src/Temporalio/Client/Schedules/ScheduleAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ public abstract record ScheduleAction
/// <param name="proto">Proto.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Converted value.</returns>
internal static ScheduleAction FromProto(
internal static async Task<ScheduleAction> FromProtoAsync(
Api.Schedule.V1.ScheduleAction proto, DataConverter dataConverter)
{
if (proto.StartWorkflow != null)
{
return ScheduleActionStartWorkflow.FromProto(proto.StartWorkflow, dataConverter);
return await ScheduleActionStartWorkflow.FromProtoAsync(
proto.StartWorkflow, dataConverter).ConfigureAwait(false);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static ScheduleActionStartWorkflow Create(
/// <param name="proto">Proto.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Converted value.</returns>
internal static ScheduleActionStartWorkflow FromProto(
internal static async Task<ScheduleActionStartWorkflow> FromProtoAsync(
Api.Workflow.V1.NewWorkflowExecutionInfo proto, DataConverter dataConverter)
{
IReadOnlyCollection<object?> args = proto.Input == null ?
Expand All @@ -93,6 +93,8 @@ internal static ScheduleActionStartWorkflow FromProto(
var headers = proto.Header?.Fields?.ToDictionary(
kvp => kvp.Key,
kvp => (IEncodedRawValue)new EncodedRawValue(dataConverter, kvp.Value));
var (staticSummary, staticDetails) =
await dataConverter.FromUserMetadataAsync(proto.UserMetadata).ConfigureAwait(false);
return new(
Workflow: proto.WorkflowType.Name,
Args: args,
Expand All @@ -109,6 +111,8 @@ internal static ScheduleActionStartWorkflow FromProto(
TypedSearchAttributes = proto.SearchAttributes == null ?
SearchAttributeCollection.Empty :
SearchAttributeCollection.FromProto(proto.SearchAttributes),
StaticSummary = staticSummary,
StaticDetails = staticDetails,
},
Headers: headers);
}
Expand Down Expand Up @@ -168,6 +172,8 @@ internal static ScheduleActionStartWorkflow FromProto(
WorkflowTaskTimeout = Options.TaskTimeout is TimeSpan taskTimeout ?
Duration.FromTimeSpan(taskTimeout) : null,
RetryPolicy = Options.RetryPolicy?.ToProto(),
UserMetadata = await dataConverter.ToUserMetadataAsync(
Options.StaticSummary, Options.StaticDetails).ConfigureAwait(false),
};
if (Options.Memo != null && Options.Memo.Count > 0)
{
Expand Down
28 changes: 19 additions & 9 deletions src/Temporalio/Client/Schedules/ScheduleDescription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Temporalio.Api.WorkflowService.V1;
using Temporalio.Common;
using Temporalio.Converters;
Expand All @@ -16,16 +17,11 @@ public class ScheduleDescription
private readonly Lazy<IReadOnlyDictionary<string, IEncodedRawValue>> memo;
private readonly Lazy<SearchAttributeCollection> searchAttributes;

/// <summary>
/// Initializes a new instance of the <see cref="ScheduleDescription"/> class.
/// </summary>
/// <param name="id">Workflow ID.</param>
/// <param name="rawDescription">Raw proto description.</param>
/// <param name="dataConverter">Data converter.</param>
internal ScheduleDescription(
string id, DescribeScheduleResponse rawDescription, DataConverter dataConverter)
private ScheduleDescription(
string id, Schedule schedule, DescribeScheduleResponse rawDescription, DataConverter dataConverter)
{
Id = id;
Schedule = schedule;
RawDescription = rawDescription;
// Search attribute conversion is cheap so it doesn't need to lock on publication. But
// memo conversion may use remote codec so it should only ever be created once lazily.
Expand All @@ -39,7 +35,6 @@ internal ScheduleDescription(
SearchAttributeCollection.Empty :
SearchAttributeCollection.FromProto(rawDescription.SearchAttributes),
LazyThreadSafetyMode.PublicationOnly);
Schedule = Schedule.FromProto(rawDescription.Schedule, dataConverter);
Info = ScheduleInfo.FromProto(rawDescription.Info);
}

Expand Down Expand Up @@ -77,5 +72,20 @@ internal ScheduleDescription(
/// Gets the raw proto description.
/// </summary>
internal DescribeScheduleResponse RawDescription { get; private init; }

/// <summary>
/// Convert from proto.
/// </summary>
/// <param name="id">ID.</param>
/// <param name="rawDescription">Proto.</param>
/// <param name="dataConverter">Converter.</param>
/// <returns>Converted value.</returns>
internal static async Task<ScheduleDescription> FromProtoAsync(
string id, DescribeScheduleResponse rawDescription, DataConverter dataConverter) =>
new(
id,
await Schedule.FromProtoAsync(rawDescription.Schedule, dataConverter).ConfigureAwait(false),
rawDescription,
dataConverter);
}
}
3 changes: 2 additions & 1 deletion src/Temporalio/Client/TemporalClient.Schedules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ public override async Task<ScheduleDescription> DescribeScheduleAsync(
ScheduleId = input.Id,
},
DefaultRetryOptions(input.RpcOptions)).ConfigureAwait(false);
return new(input.Id, desc, Client.Options.DataConverter);
return await ScheduleDescription.FromProtoAsync(
input.Id, desc, Client.Options.DataConverter).ConfigureAwait(false);
}

/// <inheritdoc />
Expand Down
7 changes: 6 additions & 1 deletion src/Temporalio/Client/TemporalClient.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public override async Task<WorkflowExecutionDescription> DescribeWorkflowAsync(
},
},
DefaultRetryOptions(input.Options?.Rpc)).ConfigureAwait(false);
return new(resp, Client.Options.DataConverter);
return await WorkflowExecutionDescription.FromProtoAsync(
resp, Client.Options.DataConverter).ConfigureAwait(false);
}

/// <inheritdoc />
Expand Down Expand Up @@ -513,6 +514,9 @@ private async Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowInternalAsyn
WorkflowIdConflictPolicy = input.Options.IdConflictPolicy,
RetryPolicy = input.Options.RetryPolicy?.ToProto(),
RequestEagerExecution = input.Options.RequestEagerStart,
UserMetadata = await Client.Options.DataConverter.ToUserMetadataAsync(
input.Options.StaticSummary, input.Options.StaticDetails).
ConfigureAwait(false),
};
if (input.Args.Count > 0)
{
Expand Down Expand Up @@ -614,6 +618,7 @@ private async Task<WorkflowHandle<TWorkflow, TResult>> StartWorkflowInternalAsyn
WorkflowStartDelay = req.WorkflowStartDelay,
SignalName = input.Options.StartSignal,
WorkflowIdConflictPolicy = input.Options.IdConflictPolicy,
UserMetadata = req.UserMetadata,
};
if (input.Options.StartSignalArgs != null && input.Options.StartSignalArgs.Count > 0)
{
Expand Down
44 changes: 38 additions & 6 deletions src/Temporalio/Client/WorkflowExecutionDescription.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Threading.Tasks;
using Temporalio.Api.WorkflowService.V1;
using Temporalio.Converters;

Expand All @@ -8,18 +9,49 @@ namespace Temporalio.Client
/// </summary>
public class WorkflowExecutionDescription : WorkflowExecution
{
private WorkflowExecutionDescription(
DescribeWorkflowExecutionResponse rawDescription,
string? staticSummary,
string? staticDetails,
DataConverter dataConverter)
: base(rawDescription.WorkflowExecutionInfo, dataConverter)
{
RawDescription = rawDescription;
StaticSummary = staticSummary;
StaticDetails = staticDetails;
}

/// <summary>
/// Initializes a new instance of the <see cref="WorkflowExecutionDescription"/> class.
/// Gets the single-line fixed summary for this workflow execution that may appear in
/// UI/CLI. This can be in single-line Temporal markdown format.
/// </summary>
/// <param name="rawDescription">Raw description response.</param>
/// <param name="dataConverter">Data converter for memos.</param>
internal WorkflowExecutionDescription(
DescribeWorkflowExecutionResponse rawDescription, DataConverter dataConverter)
: base(rawDescription.WorkflowExecutionInfo, dataConverter) => RawDescription = rawDescription;
/// <remarks>WARNING: This setting is experimental.</remarks>
public string? StaticSummary { get; private init; }

/// <summary>
/// Gets the general fixed details for this workflow execution that may appear in UI/CLI.
/// This can be in Temporal markdown format and can span multiple lines.
/// </summary>
/// <remarks>WARNING: This setting is experimental.</remarks>
public string? StaticDetails { get; private init; }

/// <summary>
/// Gets the raw proto info.
/// </summary>
internal DescribeWorkflowExecutionResponse RawDescription { get; private init; }

/// <summary>
/// Convert from proto.
/// </summary>
/// <param name="rawDescription">Raw description.</param>
/// <param name="dataConverter">Data converter.</param>
/// <returns>Converted value.</returns>
internal static async Task<WorkflowExecutionDescription> FromProtoAsync(
DescribeWorkflowExecutionResponse rawDescription, DataConverter dataConverter)
{
var (staticSummary, staticDetails) = await dataConverter.FromUserMetadataAsync(
rawDescription.ExecutionConfig?.UserMetadata).ConfigureAwait(false);
return new(rawDescription, staticSummary, staticDetails, dataConverter);
}
}
}
16 changes: 16 additions & 0 deletions src/Temporalio/Client/WorkflowOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ public WorkflowOptions(string id, string taskQueue)
/// </summary>
public string? TaskQueue { get; set; }

/// <summary>
/// Gets or sets a single-line fixed summary for this workflow execution that may appear in
/// UI/CLI. This can be in single-line Temporal markdown format.
/// </summary>
/// <remarks>WARNING: This setting is experimental.</remarks>
public string? StaticSummary { get; set; }

/// <summary>
/// Gets or sets general fixed details for this workflow execution that may appear in
/// UI/CLI. This can be in Temporal markdown format and can span multiple lines. This is a
/// fixed value on the workflow that cannot be updated. For details that can be updated, use
/// <see cref="Workflows.Workflow.CurrentDetails" /> within the workflow.
/// </summary>
/// <remarks>WARNING: This setting is experimental.</remarks>
public string? StaticDetails { get; set; }

/// <summary>
/// Gets or sets the total workflow execution timeout including retries and continue as new.
/// </summary>
Expand Down
40 changes: 40 additions & 0 deletions src/Temporalio/Converters/ConverterExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using Temporalio.Api.Common.V1;
using Temporalio.Api.Failure.V1;
using Temporalio.Api.Sdk.V1;

namespace Temporalio.Converters
{
Expand Down Expand Up @@ -195,5 +196,44 @@ public static T ToValue<T>(this IPayloadConverter converter, IRawValue rawValue)
/// <returns>The converted value.</returns>
public static RawValue ToRawValue(this IPayloadConverter converter, object? value) =>
new(converter.ToPayload(value));

/// <summary>
/// Create user metadata using this converter.
/// </summary>
/// <param name="converter">Converter.</param>
/// <param name="summary">Summary.</param>
/// <param name="details">Details.</param>
/// <returns>Created metadata if any.</returns>
internal static async Task<UserMetadata?> ToUserMetadataAsync(
this DataConverter converter, string? summary, string? details)
{
if (summary == null && details == null)
{
return null;
}
var metadata = new UserMetadata();
if (summary != null)
{
metadata.Summary = await converter.ToPayloadAsync(summary).ConfigureAwait(false);
}
if (details != null)
{
metadata.Details = await converter.ToPayloadAsync(details).ConfigureAwait(false);
}
return metadata;
}

/// <summary>
/// Extract summary and details from the given user metadata.
/// </summary>
/// <param name="converter">Converter.</param>
/// <param name="metadata">Metadata.</param>
/// <returns>Extracted summary and details if any.</returns>
internal static async Task<(string? Summary, string? Details)> FromUserMetadataAsync(
this DataConverter converter, UserMetadata? metadata) => (
Summary: metadata?.Summary is { } s ?
await converter.ToValueAsync<string>(s).ConfigureAwait(false) : null,
Details: metadata?.Details is { } d ?
await converter.ToValueAsync<string>(d).ConfigureAwait(false) : null);
}
}
15 changes: 14 additions & 1 deletion src/Temporalio/Worker/Interceptors/DelayAsyncInput.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading;
using Temporalio.Workflows;

namespace Temporalio.Worker.Interceptors
{
Expand All @@ -8,11 +9,23 @@ namespace Temporalio.Worker.Interceptors
/// </summary>
/// <param name="Delay">Delay duration.</param>
/// <param name="CancellationToken">Optional cancellation token.</param>
/// <param name="Summary">Summary for the delay.</param>
/// <remarks>
/// WARNING: This constructor may have required properties added. Do not rely on the exact
/// constructor, only use "with" clauses.
/// </remarks>
public record DelayAsyncInput(
TimeSpan Delay,
CancellationToken? CancellationToken);
CancellationToken? CancellationToken,
string? Summary)
{
/// <summary>
/// Initializes a new instance of the <see cref="DelayAsyncInput"/> class.
/// </summary>
/// <param name="options">Options.</param>
internal DelayAsyncInput(DelayOptions options)
: this(options.Delay, options.CancellationToken, options.Summary)
{
}
}
}
Loading

0 comments on commit 2546f07

Please sign in to comment.