Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove activity definition cache #118

Merged
merged 2 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 6 additions & 24 deletions src/Temporalio/Activities/ActivityDefinition.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
Expand All @@ -13,8 +12,6 @@ namespace Temporalio.Activities
/// </summary>
public class ActivityDefinition
{
private static readonly ConcurrentDictionary<MethodInfo, ActivityDefinition> CachedDefinitions = new();

private readonly Func<object?[], object?> invoker;

private ActivityDefinition(
Expand Down Expand Up @@ -65,16 +62,14 @@ private ActivityDefinition(
/// must have <see cref="ActivityAttribute" /> set on it.
/// </summary>
/// <param name="del">Delegate to create definition from.</param>
/// <param name="cache">True if this should cache the result making successive invocations
/// for the same method quicker.</param>
/// <returns>Definition built from the delegate.</returns>
public static ActivityDefinition Create(Delegate del, bool cache = true)
public static ActivityDefinition Create(Delegate del)
{
if (del.Method == null)
{
throw new ArgumentException("Activities must have accessible methods");
}
return Create(del.Method, cache, del.DynamicInvoke);
return Create(del.Method, del.DynamicInvoke);
}

/// <summary>
Expand Down Expand Up @@ -147,10 +142,9 @@ public static ActivityDefinition Create(MethodInfo method, Func<object?[], objec
/// <typeparam name="T">Type with activity definitions.</typeparam>
/// <param name="instance">Instance to invoke the activity definitions on. Must be non-null
/// if any activities are non-static.</param>
/// <param name="cache">True if each definition should be cached.</param>
/// <returns>Collection of activity definitions on the type.</returns>
public static IReadOnlyCollection<ActivityDefinition> CreateAll<T>(
T? instance, bool cache = true) => CreateAll(typeof(T), instance, cache);
public static IReadOnlyCollection<ActivityDefinition> CreateAll<T>(T? instance) =>
CreateAll(typeof(T), instance);

/// <summary>
/// Create all applicable activity definitions for the given type. At least one activity
Expand All @@ -159,10 +153,8 @@ public static IReadOnlyCollection<ActivityDefinition> CreateAll<T>(
/// <param name="type">Type with activity definitions.</param>
/// <param name="instance">Instance to invoke the activity definitions on. Must be non-null
/// if any activities are non-static.</param>
/// <param name="cache">True if each definition should be cached.</param>
/// <returns>Collection of activity definitions on the type.</returns>
public static IReadOnlyCollection<ActivityDefinition> CreateAll(
Type type, object? instance, bool cache = true)
public static IReadOnlyCollection<ActivityDefinition> CreateAll(Type type, object? instance)
{
var ret = new List<ActivityDefinition>();
foreach (var method in type.GetMethods(
Expand All @@ -178,7 +170,7 @@ public static IReadOnlyCollection<ActivityDefinition> CreateAll(
throw new InvalidOperationException(
$"Instance not provided, but activity method {method} is non-static");
}
ret.Add(Create(method, cache, parameters => method.Invoke(instance, parameters)));
ret.Add(Create(method, parameters => method.Invoke(instance, parameters)));
}
if (ret.Count == 0)
{
Expand Down Expand Up @@ -242,16 +234,6 @@ internal static string NameFromMethodForCall(MethodInfo method)
$"{method} cannot be used directly since it is a dynamic activity");
}

private static ActivityDefinition Create(
MethodInfo method, bool cache, Func<object?[], object?> invoker)
{
if (cache)
{
return CachedDefinitions.GetOrAdd(method, method => Create(method, false, invoker));
}
return Create(method, invoker);
}

private static object?[] ParametersWithDefaults(
ParameterInfo[] paramInfos, object?[] parameters)
{
Expand Down
56 changes: 56 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3012,6 +3012,62 @@ await ExecuteWorkerAsync<TaskEventsWorkflow>(
workerOptions);
}

public class DuplicateActivities
{
private readonly string returnValue;

public DuplicateActivities(string returnValue) => this.returnValue = returnValue;

[Activity]
public string DoThing() => returnValue;
}

[Workflow]
public class DuplicateActivityWorkflow
{
[WorkflowRun]
public async Task<string[]> RunAsync(string taskQueue1, string taskQueue2)
{
return new[]
{
await Workflow.ExecuteActivityAsync(
(DuplicateActivities act) => act.DoThing(),
new()
{
TaskQueue = taskQueue1,
ScheduleToCloseTimeout = TimeSpan.FromHours(1),
}),
await Workflow.ExecuteActivityAsync(
(DuplicateActivities act) => act.DoThing(),
new()
{
TaskQueue = taskQueue2,
ScheduleToCloseTimeout = TimeSpan.FromHours(1),
}),
};
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this test use two workers with different task queues?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only needs the activities on different workers not really the workflow (but I just reuse the execute worker helper). We use two workers to confirm that the same activity definition but with different instances for different workers returns different values.

So basically this workflow calls the same activity on different task queues and the test confirms that the activity called was the different instances on different task queues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels to me that the assertion being made is about what happens when we have distinct activity method instances and so it's not obvious that testing it requires multiple workers and task queues.

Copy link
Member Author

@cretz cretz Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is specifically "what happens when a workflow calls the same activity method but on different task queues where the different activity instances may have different state". This requires multiple task queues (task queues are 1:1 with workers, so it requires multiple workers).


[Fact]
public async Task ExecuteWorkflowAsync_DuplicateActivity_DoesNotCacheInstance()
{
await ExecuteWorkerAsync<DuplicateActivityWorkflow>(
async worker1 =>
{
await ExecuteWorkerAsync<DuplicateActivityWorkflow>(
async worker2 =>
{
var ret = await Env.Client.ExecuteWorkflowAsync(
(DuplicateActivityWorkflow wf) =>
wf.RunAsync(worker1.Options.TaskQueue!, worker2.Options.TaskQueue!),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker1.Options.TaskQueue!));
Assert.Equal(new[] { "instance1", "instance2" }, ret);
},
new TemporalWorkerOptions().AddAllActivities(new DuplicateActivities("instance2")));
},
new TemporalWorkerOptions().AddAllActivities(new DuplicateActivities("instance1")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this test is written in a point-free style makes it a bit hard to read because the test assertion makes reference to "instance1" and "instance2" before the reader has encountered them.

Copy link
Member Author

@cretz cretz Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the consequence of lambdas. Parameters to methods accepting lambdas are invoked and often used before the lambda is invoked. We make the parameters after the lambda because they are optional and many don't use them. This is test code so it is an acceptable tradeoff IMO.

}

Copy link
Contributor

@dandavison dandavison Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explaining this test to myself:

  1. We define a workflow that returns a result containing the results of two activity calls.

  2. The two activities are calls to the same activity method but on distinct instances (and using distinct task queues)

  3. The utility ExecuteWorkerAsync allows us to start a worker and, while it's running, execute an arbitrary function that has access to the worker instance. We use it twice:

  4. The first usage starts the first worker and uses the function to make the second nested call to ExecuteWorkerAsync

  5. The second usage starts a second worker and uses the function to execute our workflow and check that the result shows that distinct activity methods were invoked.

Copy link
Contributor

@dandavison dandavison Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're using the lambda passed to ExecuteWorkerAsync to essentially play the role of customer application (non-Worker) code, since it starts a workflow. Therefore it's a bit artificial that the function has access to an actual instance of the worker (this confused me initially). But it looks like the pass-a-function-that-receives-a-worker-instance is a deeper part of the non-test-code SDK design.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bingo. Before I removed the cache, this test would fail because the result of the workflow would be ["instance1","instance1"] because no matter which activity instance you gave to a worker, it used the first cached one based on method which was bad. Now it returns ["instance1","instance2"] as expected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we're using the lambda passed to ExecuteWorkerAsync to essentially play the role of customer application (non-Worker) code, since it starts a workflow.

Yes

Therefore it's a bit artificial that the function has access to an actual instance of the worker (this confused me initially).

Mostly. It's just so I can get it to create that random task queue which I can use

But it looks like the pass-a-function-that-receives-a-worker-instance is a deeper part of the non-test-code SDK design.

Basically you have a worker that can run for some amount of time. In non-test code we support a worker running forever, until cancellation token is triggered, and/or until the completion of an async function. In test code, I made ExecuteWorkerAsync which is just a shortcut for the latter that also adds a workflow and makes a random task queue and does a couple of other things and gives the worker as the parameter just in case the test function wants to do something with it (most just use the task queue).

private async Task ExecuteWorkerAsync<TWf>(
Func<TemporalWorker, Task> action,
TemporalWorkerOptions? options = null,
Expand Down