Skip to content

Commit

Permalink
Remove activity definition cache (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jul 25, 2023
1 parent 54e61f1 commit 0cded9b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 24 deletions.
30 changes: 6 additions & 24 deletions src/Temporalio/Activities/ActivityDefinition.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
Expand All @@ -13,8 +12,6 @@ namespace Temporalio.Activities
/// </summary>
public class ActivityDefinition
{
private static readonly ConcurrentDictionary<MethodInfo, ActivityDefinition> CachedDefinitions = new();

private readonly Func<object?[], object?> invoker;

private ActivityDefinition(
Expand Down Expand Up @@ -65,16 +62,14 @@ private ActivityDefinition(
/// must have <see cref="ActivityAttribute" /> set on it.
/// </summary>
/// <param name="del">Delegate to create definition from.</param>
/// <param name="cache">True if this should cache the result making successive invocations
/// for the same method quicker.</param>
/// <returns>Definition built from the delegate.</returns>
public static ActivityDefinition Create(Delegate del, bool cache = true)
public static ActivityDefinition Create(Delegate del)
{
if (del.Method == null)
{
throw new ArgumentException("Activities must have accessible methods");
}
return Create(del.Method, cache, del.DynamicInvoke);
return Create(del.Method, del.DynamicInvoke);
}

/// <summary>
Expand Down Expand Up @@ -147,10 +142,9 @@ public static ActivityDefinition Create(MethodInfo method, Func<object?[], objec
/// <typeparam name="T">Type with activity definitions.</typeparam>
/// <param name="instance">Instance to invoke the activity definitions on. Must be non-null
/// if any activities are non-static.</param>
/// <param name="cache">True if each definition should be cached.</param>
/// <returns>Collection of activity definitions on the type.</returns>
public static IReadOnlyCollection<ActivityDefinition> CreateAll<T>(
T? instance, bool cache = true) => CreateAll(typeof(T), instance, cache);
public static IReadOnlyCollection<ActivityDefinition> CreateAll<T>(T? instance) =>
CreateAll(typeof(T), instance);

/// <summary>
/// Create all applicable activity definitions for the given type. At least one activity
Expand All @@ -159,10 +153,8 @@ public static IReadOnlyCollection<ActivityDefinition> CreateAll<T>(
/// <param name="type">Type with activity definitions.</param>
/// <param name="instance">Instance to invoke the activity definitions on. Must be non-null
/// if any activities are non-static.</param>
/// <param name="cache">True if each definition should be cached.</param>
/// <returns>Collection of activity definitions on the type.</returns>
public static IReadOnlyCollection<ActivityDefinition> CreateAll(
Type type, object? instance, bool cache = true)
public static IReadOnlyCollection<ActivityDefinition> CreateAll(Type type, object? instance)
{
var ret = new List<ActivityDefinition>();
foreach (var method in type.GetMethods(
Expand All @@ -178,7 +170,7 @@ public static IReadOnlyCollection<ActivityDefinition> CreateAll(
throw new InvalidOperationException(
$"Instance not provided, but activity method {method} is non-static");
}
ret.Add(Create(method, cache, parameters => method.Invoke(instance, parameters)));
ret.Add(Create(method, parameters => method.Invoke(instance, parameters)));
}
if (ret.Count == 0)
{
Expand Down Expand Up @@ -242,16 +234,6 @@ internal static string NameFromMethodForCall(MethodInfo method)
$"{method} cannot be used directly since it is a dynamic activity");
}

private static ActivityDefinition Create(
MethodInfo method, bool cache, Func<object?[], object?> invoker)
{
if (cache)
{
return CachedDefinitions.GetOrAdd(method, method => Create(method, false, invoker));
}
return Create(method, invoker);
}

private static object?[] ParametersWithDefaults(
ParameterInfo[] paramInfos, object?[] parameters)
{
Expand Down
56 changes: 56 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3012,6 +3012,62 @@ await ExecuteWorkerAsync<TaskEventsWorkflow>(
workerOptions);
}

public class DuplicateActivities
{
private readonly string returnValue;

public DuplicateActivities(string returnValue) => this.returnValue = returnValue;

[Activity]
public string DoThing() => returnValue;
}

[Workflow]
public class DuplicateActivityWorkflow
{
[WorkflowRun]
public async Task<string[]> RunAsync(string taskQueue1, string taskQueue2)
{
return new[]
{
await Workflow.ExecuteActivityAsync(
(DuplicateActivities act) => act.DoThing(),
new()
{
TaskQueue = taskQueue1,
ScheduleToCloseTimeout = TimeSpan.FromHours(1),
}),
await Workflow.ExecuteActivityAsync(
(DuplicateActivities act) => act.DoThing(),
new()
{
TaskQueue = taskQueue2,
ScheduleToCloseTimeout = TimeSpan.FromHours(1),
}),
};
}
}

[Fact]
public async Task ExecuteWorkflowAsync_DuplicateActivity_DoesNotCacheInstance()
{
await ExecuteWorkerAsync<DuplicateActivityWorkflow>(
async worker1 =>
{
await ExecuteWorkerAsync<DuplicateActivityWorkflow>(
async worker2 =>
{
var ret = await Env.Client.ExecuteWorkflowAsync(
(DuplicateActivityWorkflow wf) =>
wf.RunAsync(worker1.Options.TaskQueue!, worker2.Options.TaskQueue!),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker1.Options.TaskQueue!));
Assert.Equal(new[] { "instance1", "instance2" }, ret);
},
new TemporalWorkerOptions().AddAllActivities(new DuplicateActivities("instance2")));
},
new TemporalWorkerOptions().AddAllActivities(new DuplicateActivities("instance1")));
}

private async Task ExecuteWorkerAsync<TWf>(
Func<TemporalWorker, Task> action,
TemporalWorkerOptions? options = null,
Expand Down

0 comments on commit 0cded9b

Please sign in to comment.