diff --git a/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/CHANGELOG.md b/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/CHANGELOG.md index 248b02e51fc..60b95e6ce08 100644 --- a/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/CHANGELOG.md +++ b/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/CHANGELOG.md @@ -4,3 +4,6 @@ * Updated OTel SDK package version to 1.1.0-beta1 ([#100](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/100)) + +* Do NOT mutate incoming call headers, copy them before propagation + ([#143](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/143)) diff --git a/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/ClientTracingInterceptor.cs b/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/ClientTracingInterceptor.cs index 5879628055a..89a9dac2b90 100644 --- a/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/ClientTracingInterceptor.cs +++ b/src/OpenTelemetry.Contrib.Instrumentation.GrpcCore/ClientTracingInterceptor.cs @@ -324,11 +324,20 @@ public ClientRpcScope(ClientInterceptorContext context, Cli var callOptions = context.Options; - if (callOptions.Headers == null) + // Do NOT mutate incoming call headers, make a new copy. + // Retry mechanisms that may sit above this interceptor rely on an original set of call headers. + var metadata = new Metadata(); + if (callOptions.Headers != null) { - callOptions = callOptions.WithHeaders(new Metadata()); + for (var i = 0; i < callOptions.Headers.Count; i++) + { + metadata.Add(callOptions.Headers[i]); + } } + // replace the CallOptions + callOptions = callOptions.WithHeaders(metadata); + this.SetActivity(rpcActivity); options.Propagator.Inject(new PropagationContext(rpcActivity.Context, Baggage.Current), callOptions.Headers, MetadataSetter); this.context = new ClientInterceptorContext(context.Method, context.Host, callOptions); diff --git a/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/FoobarService.cs b/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/FoobarService.cs index 00fce38ca20..c07fbe47458 100644 --- a/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/FoobarService.cs +++ b/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/FoobarService.cs @@ -140,10 +140,11 @@ public static Foobar.FoobarClient ConstructRpcClient( /// Makes a unary asynchronous request. /// /// The client. + /// The additional metadata. /// A Task. - public static async Task MakeUnaryAsyncRequest(Foobar.FoobarClient client) + public static async Task MakeUnaryAsyncRequest(Foobar.FoobarClient client, Metadata additionalMetadata) { - using var call = client.UnaryAsync(DefaultRequestMessage); + using var call = client.UnaryAsync(DefaultRequestMessage, headers: additionalMetadata); _ = await call.ResponseAsync.ConfigureAwait(false); } @@ -151,10 +152,11 @@ public static async Task MakeUnaryAsyncRequest(Foobar.FoobarClient client) /// Makes a client streaming request. /// /// The client. + /// The additional metadata. /// A Task. - public static async Task MakeClientStreamingRequest(Foobar.FoobarClient client) + public static async Task MakeClientStreamingRequest(Foobar.FoobarClient client, Metadata additionalMetadata) { - using var call = client.ClientStreaming(); + using var call = client.ClientStreaming(headers: additionalMetadata); await call.RequestStream.WriteAsync(DefaultRequestMessage).ConfigureAwait(false); await call.RequestStream.CompleteAsync().ConfigureAwait(false); _ = await call.ResponseAsync.ConfigureAwait(false); @@ -164,10 +166,11 @@ public static async Task MakeClientStreamingRequest(Foobar.FoobarClient client) /// Makes a server streaming request. /// /// The client. + /// The additional metadata. /// A Task. - public static async Task MakeServerStreamingRequest(Foobar.FoobarClient client) + public static async Task MakeServerStreamingRequest(Foobar.FoobarClient client, Metadata additionalMetadata) { - using var call = client.ServerStreaming(DefaultRequestMessage); + using var call = client.ServerStreaming(DefaultRequestMessage, headers: additionalMetadata); while (await call.ResponseStream.MoveNext().ConfigureAwait(false)) { } @@ -177,10 +180,11 @@ public static async Task MakeServerStreamingRequest(Foobar.FoobarClient client) /// Makes a duplex streaming request. /// /// The client. + /// The additional metadata. /// A Task. - public static async Task MakeDuplexStreamingRequest(Foobar.FoobarClient client) + public static async Task MakeDuplexStreamingRequest(Foobar.FoobarClient client, Metadata additionalMetadata) { - using var call = client.DuplexStreaming(); + using var call = client.DuplexStreaming(headers: additionalMetadata); await call.RequestStream.WriteAsync(DefaultRequestMessage).ConfigureAwait(false); await call.RequestStream.CompleteAsync().ConfigureAwait(false); diff --git a/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreClientInterceptorTests.cs b/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreClientInterceptorTests.cs index 16549da4209..06fc7d51a2e 100644 --- a/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreClientInterceptorTests.cs +++ b/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreClientInterceptorTests.cs @@ -37,6 +37,11 @@ public class GrpcCoreClientInterceptorTests /// private static readonly string BogusServerUri = "dns:i.dont.exist:77923"; + /// + /// The default metadata func. + /// + private static readonly Func DefaultMetadataFunc = () => new Metadata { new Metadata.Entry("foo", "bar") }; + /// /// Validates a successful AsyncUnary call. /// @@ -44,7 +49,7 @@ public class GrpcCoreClientInterceptorTests [Fact] public async Task AsyncUnarySuccess() { - await this.TestHandlerSuccess(FoobarService.MakeUnaryAsyncRequest).ConfigureAwait(false); + await this.TestHandlerSuccess(FoobarService.MakeUnaryAsyncRequest, DefaultMetadataFunc()).ConfigureAwait(false); } /// @@ -92,7 +97,7 @@ static void MakeRequest(Foobar.FoobarClient client) [Fact] public async Task ClientStreamingSuccess() { - await this.TestHandlerSuccess(FoobarService.MakeClientStreamingRequest).ConfigureAwait(false); + await this.TestHandlerSuccess(FoobarService.MakeClientStreamingRequest, DefaultMetadataFunc()).ConfigureAwait(false); } /// @@ -140,7 +145,7 @@ static void MakeRequest(Foobar.FoobarClient client) [Fact] public async Task ServerStreamingSuccess() { - await this.TestHandlerSuccess(FoobarService.MakeServerStreamingRequest).ConfigureAwait(false); + await this.TestHandlerSuccess(FoobarService.MakeServerStreamingRequest, DefaultMetadataFunc()).ConfigureAwait(false); } /// @@ -174,7 +179,7 @@ static void MakeRequest(Foobar.FoobarClient client) [Fact] public async Task DuplexStreamingSuccess() { - await this.TestHandlerSuccess(FoobarService.MakeDuplexStreamingRequest).ConfigureAwait(false); + await this.TestHandlerSuccess(FoobarService.MakeDuplexStreamingRequest, DefaultMetadataFunc()).ConfigureAwait(false); } /// @@ -369,13 +374,15 @@ static void ValidateCommonEventAttributes(ActivityEvent activityEvent) /// Tests basic handler success. /// /// The client request function. + /// The additional metadata, if any. /// A Task. - private async Task TestHandlerSuccess(Func clientRequestFunc) + private async Task TestHandlerSuccess(Func clientRequestFunc, Metadata additionalMetadata) { var mockPropagator = new Mock(); PropagationContext capturedPropagationContext = default; Metadata capturedCarrier = null; var propagatorCalled = 0; + var originalMetadataCount = additionalMetadata.Count; mockPropagator .Setup( @@ -390,9 +397,15 @@ private async Task TestHandlerSuccess(Func clientRequ capturedPropagationContext = propagation; capturedCarrier = carrier; + // Make sure the original metadata make it through + if (additionalMetadata != null) + { + Assert.Equal(capturedCarrier, additionalMetadata); + } + // Call the actual setter to ensure it updates the carrier. // It doesn't matter what we put in - setter(capturedCarrier, "foo", "bar"); + setter(capturedCarrier, "bar", "baz"); }); using var server = FoobarService.Start(); @@ -407,7 +420,7 @@ private async Task TestHandlerSuccess(Func clientRequ using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue)) { var client = FoobarService.ConstructRpcClient(server.UriString, new ClientTracingInterceptor(interceptorOptions)); - await clientRequestFunc(client).ConfigureAwait(false); + await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false); Assert.Equal(default, Activity.Current); @@ -416,6 +429,11 @@ private async Task TestHandlerSuccess(Func clientRequ // Propagator was called exactly once Assert.Equal(1, propagatorCalled); + // The client tracing interceptor should create a copy of the original call headers before passing to the propagator. + // Retries that sit above this interceptor rely on the original call metadata. + // The propagator should not have mutated the original CallOption headers. + Assert.Equal(originalMetadataCount, additionalMetadata.Count); + // There was no parent activity, so these will be default Assert.Equal(default, capturedPropagationContext.ActivityContext.TraceId); Assert.Equal(default, capturedPropagationContext.ActivityContext.SpanId); @@ -437,7 +455,7 @@ private async Task TestHandlerSuccess(Func clientRequ parentActivity.SetIdFormat(ActivityIdFormat.W3C); parentActivity.Start(); var client = FoobarService.ConstructRpcClient(server.UriString, new ClientTracingInterceptor(interceptorOptions)); - await clientRequestFunc(client).ConfigureAwait(false); + await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false); Assert.Equal(parentActivity, Activity.Current); @@ -465,7 +483,7 @@ private async Task TestHandlerSuccess(Func clientRequ /// A Task. /// private async Task TestHandlerFailure( - Func clientRequestFunc, + Func clientRequestFunc, StatusCode statusCode = StatusCode.ResourceExhausted, bool validateErrorDescription = true, string serverUriString = null) @@ -482,7 +500,7 @@ private async Task TestHandlerFailure( }); using var activityListener = new InterceptorActivityListener(clientInterceptorOptions.ActivityIdentifierValue); - await Assert.ThrowsAsync(async () => await clientRequestFunc(client).ConfigureAwait(false)); + await Assert.ThrowsAsync(async () => await clientRequestFunc(client, null).ConfigureAwait(false)); var activity = activityListener.Activity; ValidateCommonActivityTags(activity, statusCode, false); diff --git a/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreServerInterceptorTests.cs b/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreServerInterceptorTests.cs index aef179a74db..9434231ae35 100644 --- a/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreServerInterceptorTests.cs +++ b/test/OpenTelemetry.Contrib.Instrumentation.GrpcCore.Tests/GrpcCoreServerInterceptorTests.cs @@ -112,8 +112,9 @@ public async Task DuplexStreamingServerHandlerFail() /// A common method to test server interceptor handler success. /// /// The specific client request function. + /// The additional metadata, if any. /// A Task. - private async Task TestHandlerSuccess(Func clientRequestFunc) + private async Task TestHandlerSuccess(Func clientRequestFunc, Metadata additionalMetadata = null) { // starts the server with the server interceptor var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), RecordMessageEvents = true, ActivityIdentifierValue = Guid.NewGuid() }; @@ -123,7 +124,7 @@ private async Task TestHandlerSuccess(Func clientRequ using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue)) { var client = FoobarService.ConstructRpcClient(server.UriString); - await clientRequestFunc(client).ConfigureAwait(false); + await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false); var activity = activityListener.Activity; GrpcCoreClientInterceptorTests.ValidateCommonActivityTags(activity, StatusCode.OK, interceptorOptions.RecordMessageEvents); @@ -140,7 +141,7 @@ private async Task TestHandlerSuccess(Func clientRequ new Metadata.Entry("traceparent", FoobarService.DefaultTraceparentWithSampling), }); - await clientRequestFunc(client).ConfigureAwait(false); + await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false); var activity = activityListener.Activity; GrpcCoreClientInterceptorTests.ValidateCommonActivityTags(activity, StatusCode.OK, interceptorOptions.RecordMessageEvents); @@ -152,8 +153,9 @@ private async Task TestHandlerSuccess(Func clientRequ /// A common method to test server interceptor handler failure. /// /// The specific client request function. + /// The additional metadata, if any. /// A Task. - private async Task TestHandlerFailure(Func clientRequestFunc) + private async Task TestHandlerFailure(Func clientRequestFunc, Metadata additionalMetadata = null) { // starts the server with the server interceptor var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), ActivityIdentifierValue = Guid.NewGuid() }; @@ -169,7 +171,7 @@ private async Task TestHandlerFailure(Func clientRequ new Metadata.Entry(FoobarService.RequestHeaderErrorDescription, "fubar"), }); - await Assert.ThrowsAsync(async () => await clientRequestFunc(client).ConfigureAwait(false)); + await Assert.ThrowsAsync(async () => await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false)); var activity = activityListener.Activity; GrpcCoreClientInterceptorTests.ValidateCommonActivityTags(activity, StatusCode.ResourceExhausted, interceptorOptions.RecordMessageEvents);