Skip to content

Commit

Permalink
Add update ID and name in log scope (#355)
Browse files Browse the repository at this point in the history
Fixes #351
  • Loading branch information
cretz authored Oct 8, 2024
1 parent fb3c991 commit 6006744
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 156 deletions.
312 changes: 161 additions & 151 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -900,183 +900,193 @@ private void ApplyDoUpdate(DoUpdate update)
var ignored = Instance;

// Set the current update for the life of this task
CurrentUpdateInfoLocal.Value = new(Id: update.Id, Name: update.Name);
var updateInfo = new WorkflowUpdateInfo(Id: update.Id, Name: update.Name);
CurrentUpdateInfoLocal.Value = updateInfo;

// Find update definition or reject
var updates = mutableUpdates.IsValueCreated ? mutableUpdates.Value : Definition.Updates;
if (!updates.TryGetValue(update.Name, out var updateDefn))
// Put the entire update in the log scope
using (logger.BeginScope(updateInfo.CreateLoggerScope()))
{
updateDefn = DynamicUpdate;
if (updateDefn == null)
{
var knownUpdates = updates.Keys.OrderBy(k => k);
var failure = new InvalidOperationException(
$"Update handler for {update.Name} expected but not found, " +
$"known updates: [{string.Join(" ", knownUpdates)}]");
AddCommand(new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(failure, PayloadConverter),
},
});
return Task.CompletedTask;
}
return ApplyDoUpdateAsync(update);
}
});
}

// May be loaded inside validate after validator, maybe not
object?[]? argsForUpdate = null;
// We define this up here because there are multiple places it's called below
object?[] DecodeUpdateArgs() => DecodeArgs(
method: updateDefn.Method ?? updateDefn.Delegate!.Method,
payloads: update.Input,
itemName: $"Update {update.Name}",
dynamic: updateDefn.Dynamic,
dynamicArgPrepend: update.Name);

// Do validation. Whether or not this runs a validator, this should accept/reject.
try
private Task ApplyDoUpdateAsync(DoUpdate update)
{
// Find update definition or reject
var updates = mutableUpdates.IsValueCreated ? mutableUpdates.Value : Definition.Updates;
if (!updates.TryGetValue(update.Name, out var updateDefn))
{
updateDefn = DynamicUpdate;
if (updateDefn == null)
{
if (update.RunValidator)
{
// We only call the validation interceptor if a validator is present. We are
// not allowed to share the arguments. We do not share the arguments (i.e.
// we re-convert) to prevent a user from mistakenly mutating an argument in
// the validator. We call the interceptor only if a validator is present to
// match other SDKs where doubly converting arguments here unnecessarily
// (because of the no-reuse-argument rule above) causes performance issues
// for them (even if they don't much for us).
if (updateDefn.ValidatorMethod != null || updateDefn.ValidatorDelegate != null)
{
// Capture command count so we can ensure it is unchanged after call
var origCmdCount = completion?.Successful?.Commands?.Count ?? 0;
inbound.Value.ValidateUpdate(new(
Id: update.Id,
Update: update.Name,
Definition: updateDefn,
Args: DecodeUpdateArgs(),
Headers: update.Headers));
// If the command count changed, we need to issue a task failure
var newCmdCount = completion?.Successful?.Commands?.Count ?? 0;
if (origCmdCount != newCmdCount)
{
currentActivationException = new InvalidOperationException(
$"Update validator for {update.Name} created workflow commands");
return Task.CompletedTask;
}
}

// We want to try to decode args here _inside_ the validator rejection
// try/catch so we can prevent acceptance on invalid args
argsForUpdate = DecodeUpdateArgs();
}

// Send accepted
var knownUpdates = updates.Keys.OrderBy(k => k);
var failure = new InvalidOperationException(
$"Update handler for {update.Name} expected but not found, " +
$"known updates: [{string.Join(" ", knownUpdates)}]");
AddCommand(new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Accepted = new(),
Rejected = failureConverter.ToFailure(failure, PayloadConverter),
},
});
return Task.CompletedTask;
}
catch (Exception e)
}

// May be loaded inside validate after validator, maybe not
object?[]? argsForUpdate = null;
// We define this up here because there are multiple places it's called below
object?[] DecodeUpdateArgs() => DecodeArgs(
method: updateDefn.Method ?? updateDefn.Delegate!.Method,
payloads: update.Input,
itemName: $"Update {update.Name}",
dynamic: updateDefn.Dynamic,
dynamicArgPrepend: update.Name);

// Do validation. Whether or not this runs a validator, this should accept/reject.
try
{
if (update.RunValidator)
{
// Send rejected
AddCommand(new()
// We only call the validation interceptor if a validator is present. We are
// not allowed to share the arguments. We do not share the arguments (i.e.
// we re-convert) to prevent a user from mistakenly mutating an argument in
// the validator. We call the interceptor only if a validator is present to
// match other SDKs where doubly converting arguments here unnecessarily
// (because of the no-reuse-argument rule above) causes performance issues
// for them (even if they don't much for us).
if (updateDefn.ValidatorMethod != null || updateDefn.ValidatorDelegate != null)
{
UpdateResponse = new()
// Capture command count so we can ensure it is unchanged after call
var origCmdCount = completion?.Successful?.Commands?.Count ?? 0;
inbound.Value.ValidateUpdate(new(
Id: update.Id,
Update: update.Name,
Definition: updateDefn,
Args: DecodeUpdateArgs(),
Headers: update.Headers));
// If the command count changed, we need to issue a task failure
var newCmdCount = completion?.Successful?.Commands?.Count ?? 0;
if (origCmdCount != newCmdCount)
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(e, PayloadConverter),
},
});
return Task.CompletedTask;
currentActivationException = new InvalidOperationException(
$"Update validator for {update.Name} created workflow commands");
return Task.CompletedTask;
}
}

// We want to try to decode args here _inside_ the validator rejection
// try/catch so we can prevent acceptance on invalid args
argsForUpdate = DecodeUpdateArgs();
}

// Issue actual update. We are using ContinueWith instead of await here so that user
// code can run immediately. But the user code _or_ the task could fail so we need
// to reject in both cases.
try
// Send accepted
AddCommand(new()
{
// If the args were not already decoded because we didn't run the validation
// step, decode them here
argsForUpdate ??= DecodeUpdateArgs();

var task = inbound.Value.HandleUpdateAsync(new(
Id: update.Id,
Update: update.Name,
Definition: updateDefn,
Args: argsForUpdate,
Headers: update.Headers));
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
update.Name, update.Id, updateDefn.UnfinishedPolicy));
return task.ContinueWith(
_ =>
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Accepted = new(),
},
});
}
catch (Exception e)
{
// Send rejected
AddCommand(new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(e, PayloadConverter),
},
});
return Task.CompletedTask;
}

// Issue actual update. We are using ContinueWith instead of await here so that user
// code can run immediately. But the user code _or_ the task could fail so we need
// to reject in both cases.
try
{
// If the args were not already decoded because we didn't run the validation
// step, decode them here
argsForUpdate ??= DecodeUpdateArgs();

var task = inbound.Value.HandleUpdateAsync(new(
Id: update.Id,
Update: update.Name,
Definition: updateDefn,
Args: argsForUpdate,
Headers: update.Headers));
var inProgress = inProgressHandlers.AddLast(new Handlers.Handler(
update.Name, update.Id, updateDefn.UnfinishedPolicy));
return task.ContinueWith(
_ =>
{
inProgressHandlers.Remove(inProgress);
// If workflow failure exception, it's an update failure. If it's some
// other exception, it's a task failure. Otherwise it's a success.
var exc = task.Exception?.InnerExceptions?.SingleOrDefault();
// There are .NET cases where cancellation occurs but is not considered
// an exception. We are going to make it an exception. Unfortunately
// there is no easy way to make it include the outer stack trace at this
// time.
if (exc == null && task.IsCanceled)
{
inProgressHandlers.Remove(inProgress);
// If workflow failure exception, it's an update failure. If it's some
// other exception, it's a task failure. Otherwise it's a success.
var exc = task.Exception?.InnerExceptions?.SingleOrDefault();
// There are .NET cases where cancellation occurs but is not considered
// an exception. We are going to make it an exception. Unfortunately
// there is no easy way to make it include the outer stack trace at this
// time.
if (exc == null && task.IsCanceled)
{
exc = new TaskCanceledException();
}
if (exc != null && IsWorkflowFailureException(exc))
exc = new TaskCanceledException();
}
if (exc != null && IsWorkflowFailureException(exc))
{
AddCommand(new()
{
AddCommand(new()
UpdateResponse = new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(exc, PayloadConverter),
},
});
}
else if (task.Exception is { } taskExc)
{
// Fails the task
currentActivationException =
taskExc.InnerExceptions.SingleOrDefault() ?? taskExc;
}
else
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(exc, PayloadConverter),
},
});
}
else if (task.Exception is { } taskExc)
{
// Fails the task
currentActivationException =
taskExc.InnerExceptions.SingleOrDefault() ?? taskExc;
}
else
{
// Success, have to use reflection to extract value if it's a Task<>
var taskType = task.GetType();
var result = taskType.IsGenericType ?
taskType.GetProperty("Result")!.GetValue(task) : ValueTuple.Create();
AddCommand(new()
{
// Success, have to use reflection to extract value if it's a Task<>
var taskType = task.GetType();
var result = taskType.IsGenericType ?
taskType.GetProperty("Result")!.GetValue(task) : ValueTuple.Create();
AddCommand(new()
UpdateResponse = new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Completed = PayloadConverter.ToPayload(result),
},
});
}
return Task.CompletedTask;
},
this).Unwrap();
}
catch (FailureException e)
ProtocolInstanceId = update.ProtocolInstanceId,
Completed = PayloadConverter.ToPayload(result),
},
});
}
return Task.CompletedTask;
},
this).Unwrap();
}
catch (FailureException e)
{
AddCommand(new()
{
AddCommand(new()
UpdateResponse = new()
{
UpdateResponse = new()
{
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(e, PayloadConverter),
},
});
return Task.CompletedTask;
}
});
ProtocolInstanceId = update.ProtocolInstanceId,
Rejected = failureConverter.ToFailure(e, PayloadConverter),
},
});
return Task.CompletedTask;
}
}

private void ApplyFireTimer(FireTimer fireTimer)
Expand Down
12 changes: 12 additions & 0 deletions src/Temporalio/Workflows/WorkflowUpdateInfo.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Collections.Generic;

namespace Temporalio.Workflows
{
/// <summary>
Expand All @@ -12,5 +14,15 @@ public record WorkflowUpdateInfo(
string Id,
string Name)
{
/// <summary>
/// Creates the value that is set on
/// <see cref="Microsoft.Extensions.Logging.ILogger.BeginScope" /> for the update handler.
/// </summary>
/// <returns>Scope.</returns>
internal Dictionary<string, object> CreateLoggerScope() => new()
{
["UpdateId"] = Id,
["UpdateName"] = Name,
};
}
}
Loading

0 comments on commit 6006744

Please sign in to comment.