-
Notifications
You must be signed in to change notification settings - Fork 774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Baggage.Current gets shared/overwritten when running multiple parallel tasks? #3257
Comments
@CodeBlanch tagging you as you might know about this area per #2298 |
@gaurav137 How it is working above is how I would expect it to work, for better or worse 😄 Baggage behaves exactly like HttpContext. Why? Imagine some code like...
Lot's of stuff going on there! The main thing is 3 wants to see the change from 2. That's what the holder allows us to do. 4 wants to clear the baggage captured on all threads, holder also makes that possible. For the common case, it should work well without surprises. But for your case, it is causing problems. Could you suppress the flow? // Receiver/backend logic
WorkflowSchedulerAsync()
{
while (!stopRequested)
{
List<Message> messages = await getFromAzureQueue();
if (messages.any())
{
MyAppContext currentContext = MyAppContext.Current;
// Don't automatically flow current context to parallel threads
using var flowControl = ExecutionContext.SuppressFlow();
// Start some work in parallel and pick some more work
Parallel.Foreach(messages, async m => {
MyAppContext.Current = currentContext; // Manually flow our app context
await ProcessMessage(m);
});
}
}
} Not ideal but maybe it will help. We can probably add a mechanism to make this easier, just need to come up with something, discuss in PR, ship it, etc. Could take some time. |
Thanks @CodeBlanch for your response. It has clarified some of the assumptions I was making. In the HttpContext implementation one is able to reset the holder: if (value != null)
{
// Use an object indirection to hold the HttpContext in the AsyncLocal,
// so it can be cleared in all ExecutionContexts when its cleared.
_httpContextCurrent.Value = new HttpContextHolder { Context = value };
} But the setter of Baggage.Current does not create a new holder instance. If "Baggage.Current = defaultBaggage;" in my pseudo code above had allocated a new BaggageHolder instance then this issue would not have arisen (as AsyncLocal's slot value would have changed) and I think one would not need ExecutionContext.SuppressFlow() then. So was sticking with a fixed instance of BaggageHolder via EnsureBaggageHolder() a conscious choice as the behavior looks different from HttpContext.Current creating new instances of HttpContextHolder? I'll anyway try out the ExecutionContext.SuppressFlow() suggestion and report back. For any flow that follows this pattern of EstablishRootActivity() => DoWork() => => EstablishRootActivity() =>.DoWork() => ... doing ExecutionContext.SuppressFlow() should be a recommendation? I'd think any one writing a custom context propagation logic based of the the RabbitMQ example should be aware that Baggage.Current behaves this way and if they do not want each logical unit of work interfering with each others Baggage.Current then use ExecutionContext.SuppressFlow(). So ReceiveMessage in the example could also showcase how to use ExecutionContext.SuppressFlow() so that Baggage.Current being set in that method is scoped to each message's processing. Apart from the problem described above I was also planning to use Baggage.Current as a way to enrich my logs by passing some transient data via Baggage that is then used by my custom LoggingProvider. That is also explained as one of the use cases of baggage to enrich logs/tracing/metrics. But this behavior of Baggage.Current looks not be suitable for enriching logs with transient data like below? ProcessMessage()
{
var parentContext = propagator.Extract(default, m.CarrierPayload, SomeMethodToExtractContextFromCarrier(...))
Baggage.Current = parentContext.Baggage;
// Expect above baggage to be available w/o interference.
await DoProcessing(m);
}
async DoProcessing(m)
{
Task t1 = Task.Run(() =>
{
Baggage.Current.SetBaggage("k1", "v1");
logger.LogInformation("Foo"); << LoggingProvider implementation accesses k1 and adds it to the log
Baggage.Current.RemoveBaggage("k1");
});
Task t2 = Task.Run(() =>
{
Baggage.Current.SetBaggage("k1", "v2");
logger.LogInformation("Foo"); << LoggingProvider implementation accesses k1 and adds it to the log
Baggage.Current.RemoveBaggage("k1");
});
await Task.WhenAll(t1, t2);
} So the sequence can go something like: So any suggestions for how to go about passing ambient baggage data like above to enrich logs? |
Short answer is yes, this was intentional. Slightly different use case for HttpContext vs Baggage. In the case of HttpContext, it should really only be set at the start of a request (or maybe at the end). If some middleware replaced the HttpContext in the middle of the execution pipeline, there be dragons! Baggage however, that is something I wanted to support. Some middleware might want to populate and then set the current baggage with the expectation being it would then transmit downstream. That is tricky with async local semantics. Here is an example. [Fact]
public async void BaggageFlowTest()
{
Baggage.Current = new Baggage(new Dictionary<string, string>() { ["key1"] = "value1" });
await Task.Run(async () => // Pretend this is some middleware
{
var baggage = Baggage.Current; // 1 key
await Task.Yield(); // Any async await here. Could be we look something up from DB. Maybe we call some other service to get it.
baggage = baggage.SetBaggage("key2", "value2"); // We want to update the root baggage with whatever we just resolved
Baggage.Current = baggage; // current now has 2 keys
}).ConfigureAwait(false);
await Task.Run(async () => // Pretend this is some service call
{
var baggage = Baggage.Current; // 2 keys here ready to send
await Task.Delay(2000); // Pretend this is an outgoing http call where we want the "Current" baggage to propagate. In this case, working fine.
});
} Using the holder, this all works fine. If users set Here is that same code using a standard private static readonly AsyncLocal<Baggage> _BaggageWithoutHolder = new();
[Fact]
public async void BaggageDetachedFlowTest()
{
_BaggageWithoutHolder.Value = new Baggage(new Dictionary<string, string>() { ["key1"] = "value1" });
await Task.Run(async () =>
{
var baggage = _BaggageWithoutHolder.Value; // Sees the 1 key from the outer.
await Task.Yield();
baggage = baggage.SetBaggage("key2", "value2");
_BaggageWithoutHolder.Value = baggage; // Sets 2 keys but this change only flows FORWARD. Outer isn't updated.
}).ConfigureAwait(false);
await Task.Run(async () =>
{
var baggage = _BaggageWithoutHolder.Value; // Only 1 key set in this baggage. The change above was lost!
await Task.Delay(2000); // Pretend this is an outgoing http call where we want the "Current" baggage to propagate. Not working as expected!
});
} The set up we have currently is geared towards that common case. The thing you are doing is more advanced. More below on that. What I was thinking is we could expose an API like this for users who need to set baggage forward without impacting the root... public readonly struct Baggage : IEquatable<Baggage>
{
// Use to establish a new root baggage that won't impact the calling scope...
public static void SetRootBaggage(Baggage baggage)
{
RuntimeContextSlot.Set(new BaggageHolder
{
Baggage = baggage,
});
}
} Would that help you? Just not sure on the name 😄
I do agree with this, that In the spec there is this concept of links and "scatter/gather": https://github.com/open-telemetry/opentelemetry-specification/blob/f124b21de091f3d668e4ce11b954ec17e3da231f/specification/overview.md#links-between-spans That is kind of what you are doing here. I think what the spec says to do here is stop the root span or at least detach from it (suppress flow). When you begin your work, start a new span that is linked to the original span. This is something @alanwest and I have discussed in the past. To say that another way, when you take a message off the queue, you want to start a root span and link it to the original. If you happen to have a span already running in the scope where you receive the message, suppress flow is the way to go otherwise it will use that as the parent. Same goes for baggage. @ejsmith might have also run into this.
Not something I have considered until this moment, but this might work: var baggage = Baggage.Current; // Capture current baggage
baggage = baggage.SetBaggage("k1", "v1"); // Doesn't change Baggage.Current, returns a new copy
using var scope = logger.BeginScope(baggage.GetBaggage());
logger.LogInformation("Foo"); That will basically expose the baggage you want as a scope to the logger. |
I encountered a similar issue. Now Baggage is unstable for parallel cases. We change Baggage in the parent async context after we call a child task. The change is visible in the child async context. We need a snapshot of parent Baggage in the child context. It should be Copy-on-Write. // ------------- Parent Async Context -----------------
List<Task> tasks = new List<Task>();
foreach (Job job in jobs)
{
// Setup context for this job processing
Baggage.SetBaggage("JobResouceId", job.ResourceId);
// call async without await, for parallel
tasks.Add(this.TryTransitionToRunnableStateAsync(job, cancellationToken));
}
await Task.WhenAll(tasks);
// ------------- Parent Async Context -----------------
// ------------- Child Async Context -----------------
async Task TryTransitionToRunnableStateAsync(Job job, CancellationToken cancellationToken)
{
// the change of Baggage.Current in parent async context can propagate into the child async context.
// ResourceId in here is unstable, next job in the foreach loop will change Baggage and visible in here
Dictionary<string, string> baggageItems = new Dictionary<string, string>(Baggage.Current.GetBaggage());
} |
I don't think we could change this without breaking anyone. And I'm not sure we want to. The current setup is friendly to AspNetCore with a lot of middleware potentially executing asynchronously. Here is an example which is really just restating what I said above: public class Program
{
private static readonly AsyncLocal<Dictionary<string, string>> BaggageAsyncLocal = new();
public static async Task Main()
{
BaggageAsyncLocal.Value = new Dictionary<string, string>() { ["appInstanceId"] = "abcd" };
Baggage.SetBaggage("appInstanceId", "abcd");
await ReadTenantIdFromDatabase();
var otelBaggage = Baggage.GetBaggage(); // 2 keys (appInstanceId & tenantId)
var baggageAsyncLocal = BaggageAsyncLocal.Value; // 1 key (appInstanceId)
async Task ReadTenantIdFromDatabase()
{
// Simulate async call to DB
await Task.Yield();
Baggage.SetBaggage("tenantId", "1234");
BaggageAsyncLocal.Value = new Dictionary<string, string>(BaggageAsyncLocal.Value!) { ["tenantId"] = "1234" };
}
}
} Using a pure For the more rare parallel case, I think we could add some API to make what you want to do easier. But I think there is a workaround you can use right now: Call using OpenTelemetry;
using OpenTelemetry.Context;
public class Program
{
public static async Task Main()
{
// ------------- Parent Async Context -----------------
Baggage.SetBaggage("teantId", "1234");
Baggage.SetBaggage("flow_status", "jobs_starting");
Console.WriteLine($"Baggage[Main]: {string.Join(", ", Baggage.GetBaggage().Select(kvp => $"{kvp.Key}:{kvp.Value}"))}");
var jobs = new[] { new Job("1"), new Job("2") };
List<Task> tasks = new List<Task>();
foreach (Job job in jobs)
{
// call async without await, for parallel
tasks.Add(TryTransitionToRunnableStateAsync(job, cancellationToken: default));
}
Baggage.SetBaggage("flow_status", "jobs_started");
Console.WriteLine($"Baggage[Main]: {string.Join(", ", Baggage.GetBaggage().Select(kvp => $"{kvp.Key}:{kvp.Value}"))}");
await Task.WhenAll(tasks);
Baggage.SetBaggage("flow_status", "jobs_completed");
Console.WriteLine($"Baggage[Main]: {string.Join(", ", Baggage.GetBaggage().Select(kvp => $"{kvp.Key}:{kvp.Value}"))}");
// ------------- Parent Async Context -----------------
// ------------- Child Async Context -----------------
async Task TryTransitionToRunnableStateAsync(Job job, CancellationToken cancellationToken)
{
var jobBaggage = Baggage.Current.SetBaggage("JobResouceId", job.ResourceId);
// Simulate something async going on
await Task.Delay(5000);
// Clear the baggage kept in AsyncLocal inside OTel SDK
RuntimeContext.SetValue("otel.baggage", null);
// Establishes a new AsyncLocal which will flow to children but not parent
Baggage.Current = jobBaggage;
Console.WriteLine($"Baggage[{job.ResourceId}]: {string.Join(", ", Baggage.GetBaggage().Select(kvp => $"{kvp.Key}:{kvp.Value}"))}");
}
}
private sealed class Job
{
public string ResourceId { get; }
public Job(string resourceId)
{
this.ResourceId = resourceId;
}
}
} When I run that my output is:
|
I think parallel cases are not rare in Azure services. And the RuntimeContext.SetValue workaround make the instrument more uncomfortable for developers who uses Baggage, every user need to know how to use OT.Baggage in async cases. |
My take - the current behavior is buggy, code relying on such buggy behavior are going to have more issues with context leakage. If we need something that has the current behavior, it should be called something else instead of Baggage, ideally with an explicit name indicating its semantic.
I feel the above code is buggy, it should be written in one of the following ways: Return the value from the async method BaggageAsyncLocal.Value = new Dictionary<string, string>() { ["appInstanceId"] = "abcd" };
Baggage.SetBaggage("appInstanceId", "abcd");
string tenantId = await ReadTenantIdFromDatabase();
Baggage.SetBaggage("tenantId", tenantId); Get a handle of the current baggage and pass the handle to other methods BaggageAsyncLocal.Value = new Dictionary<string, string>() { ["appInstanceId"] = "abcd" };
Baggage.SetBaggage("appInstanceId", "abcd");
var handle = Baggage.Current;
await ReadTenantIdFromDatabaseAndUpdateBaggage(handle);
I don't think parallel case is rare at all, actually this is the common case since most services will call multiple underlying services for each incoming request. |
@CodeBlanch, @gaurav137 and @liutiexing We are also kind of facing the same issue. So I am trying to understand the lifecycle of a Baggage. As Gaurav asked BaggageHolder has only one instance. So is this statement true So how will Baggage work in ASP.NET Core. If there are N parallel client requests, will the Baggage instance is shared across all the requests? If this is the case then the basis use case of Baggage itself is flawed as we cannot keep the context information for each request. |
Not exactly! It wouldn't be very useful at all if it worked like that 😄
For AspNetCore that opentelemetry-dotnet/src/OpenTelemetry.Instrumentation.AspNetCore/Implementation/HttpInListener.cs Line 148 in 1b4f231
From that point on, all threads/tasks which flow from the one handling the request will share a If AspNetCore instrumentation isn't being used, or baggage has been disabled by changing the default propagator, |
Sorry to comment on a closed issue, but I realize the fix had been reverted as it contains breaking changes. We are trying to leverage Baggage exactly as OP, only to find out that the Otel Baggage doesn't behave the same as Activity.Current / AsyncLocal. To me the parallel case is actually very common when handling context - imaging you are dealing with a fan-out then fan-in operation where each child is supposed to replay the context (or even initialize brand new context) from somewhere and call to downstream. The context it replayed should only affect itself (e.g., the telemetry it emits) and its call to downstream (e.g., injected into request headers using a TextMapPropagator), but not its parent (and not the other fan-out operations). The parent, later on could be doing aggregation work, after all the fan-out operations complete. Apparently none of the context created by one of the fan-out tasks (the child) is representative enough (or is not even meaningful at all) as far as the parent (and other fan-out operations) is concerned. Should we just use Activity.Baggage then? @CodeBlanch any guidance would be greatly appreciated. |
My scenario is similar to the RabbitMQ microservice example (https://github.com/open-telemetry/opentelemetry-dotnet/tree/main/examples/MicroserviceExample) We have a backend service/receiver that keeps checking an Azure queue for messages in batches of 10 and it starts execution for each such message in parallel via Parallel.ForEach. These messages are submitted from a frontend service/sender and each such message within it contains our logging context which I'm now setting into Baggage.Current by writing a custom Propagator that injects/extracts our logging context into Baggage.Current. So the pseudo code for the above looks like:
I looked at the Baggage implementation and it is based on AsyncLocal but one thing I noticed is that the Baggage has a EnsureBaggageHolder method which ensures that the holder is only allocated once and manipulating Current ends up adding/removing a reference to the Baggage maintained in this holder. So all tasks running in parallel will access the same BaggerHolder instance (which is like a singleton) and thus end up sharing the BaggerHolder.Baggage reference. So Baggage.Current updates from tasks running in parallel can interfere with each other.
Our custom library also uses a static AsyncLocal context = new AsyncLocal and ProcessMessage() updates context.Value with new instances of CustomContext that are coming in the message. And each parallel task that is running sees their own CustomContext as value in the static AsyncLocal keeps getting replaced (its used as a value, not as a reference to value).
So could someone explain what is expected here? Have I misunderstood this completely or this could be a bug? I was expecting AsyncLocal semantics ie outer methods don't see changes made to Current by inner methods and N parallel tasks processing N messages can modify their Baggage.Current w/o affecting other tasks Baggage.Current values. Otherwise I'm at a loss how to use Baggage.Current correctly.
FYI, I'm using nuget package version 1.2.
The text was updated successfully, but these errors were encountered: