Skip to content

Commit

Permalink
[gRPC Core client interceptor] Don't mutate incoming call headers (op…
Browse files Browse the repository at this point in the history
…en-telemetry#143)

* [gRPC Core Interceptor] Don't mutate incoming call headers

Retry interceptors that sit above this interceptor would expect the original set of call headers, not mutated ones.

* Adjust the changelog.
  • Loading branch information
pcwiese authored Jul 16, 2021
1 parent 141007d commit 1cdfc7f
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@

* Updated OTel SDK package version to 1.1.0-beta1
([#100](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/100))

* Do NOT mutate incoming call headers, copy them before propagation
([#143](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/143))
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,20 @@ public ClientRpcScope(ClientInterceptorContext<TRequest, TResponse> context, Cli

var callOptions = context.Options;

if (callOptions.Headers == null)
// Do NOT mutate incoming call headers, make a new copy.
// Retry mechanisms that may sit above this interceptor rely on an original set of call headers.
var metadata = new Metadata();
if (callOptions.Headers != null)
{
callOptions = callOptions.WithHeaders(new Metadata());
for (var i = 0; i < callOptions.Headers.Count; i++)
{
metadata.Add(callOptions.Headers[i]);
}
}

// replace the CallOptions
callOptions = callOptions.WithHeaders(metadata);

this.SetActivity(rpcActivity);
options.Propagator.Inject(new PropagationContext(rpcActivity.Context, Baggage.Current), callOptions.Headers, MetadataSetter);
this.context = new ClientInterceptorContext<TRequest, TResponse>(context.Method, context.Host, callOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,23 @@ public static Foobar.FoobarClient ConstructRpcClient(
/// Makes a unary asynchronous request.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="additionalMetadata">The additional metadata.</param>
/// <returns>A Task.</returns>
public static async Task MakeUnaryAsyncRequest(Foobar.FoobarClient client)
public static async Task MakeUnaryAsyncRequest(Foobar.FoobarClient client, Metadata additionalMetadata)
{
using var call = client.UnaryAsync(DefaultRequestMessage);
using var call = client.UnaryAsync(DefaultRequestMessage, headers: additionalMetadata);
_ = await call.ResponseAsync.ConfigureAwait(false);
}

/// <summary>
/// Makes a client streaming request.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="additionalMetadata">The additional metadata.</param>
/// <returns>A Task.</returns>
public static async Task MakeClientStreamingRequest(Foobar.FoobarClient client)
public static async Task MakeClientStreamingRequest(Foobar.FoobarClient client, Metadata additionalMetadata)
{
using var call = client.ClientStreaming();
using var call = client.ClientStreaming(headers: additionalMetadata);
await call.RequestStream.WriteAsync(DefaultRequestMessage).ConfigureAwait(false);
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
_ = await call.ResponseAsync.ConfigureAwait(false);
Expand All @@ -164,10 +166,11 @@ public static async Task MakeClientStreamingRequest(Foobar.FoobarClient client)
/// Makes a server streaming request.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="additionalMetadata">The additional metadata.</param>
/// <returns>A Task.</returns>
public static async Task MakeServerStreamingRequest(Foobar.FoobarClient client)
public static async Task MakeServerStreamingRequest(Foobar.FoobarClient client, Metadata additionalMetadata)
{
using var call = client.ServerStreaming(DefaultRequestMessage);
using var call = client.ServerStreaming(DefaultRequestMessage, headers: additionalMetadata);
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
{
}
Expand All @@ -177,10 +180,11 @@ public static async Task MakeServerStreamingRequest(Foobar.FoobarClient client)
/// Makes a duplex streaming request.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="additionalMetadata">The additional metadata.</param>
/// <returns>A Task.</returns>
public static async Task MakeDuplexStreamingRequest(Foobar.FoobarClient client)
public static async Task MakeDuplexStreamingRequest(Foobar.FoobarClient client, Metadata additionalMetadata)
{
using var call = client.DuplexStreaming();
using var call = client.DuplexStreaming(headers: additionalMetadata);
await call.RequestStream.WriteAsync(DefaultRequestMessage).ConfigureAwait(false);
await call.RequestStream.CompleteAsync().ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ public class GrpcCoreClientInterceptorTests
/// </summary>
private static readonly string BogusServerUri = "dns:i.dont.exist:77923";

/// <summary>
/// The default metadata func.
/// </summary>
private static readonly Func<Metadata> DefaultMetadataFunc = () => new Metadata { new Metadata.Entry("foo", "bar") };

/// <summary>
/// Validates a successful AsyncUnary call.
/// </summary>
/// <returns>A task.</returns>
[Fact]
public async Task AsyncUnarySuccess()
{
await this.TestHandlerSuccess(FoobarService.MakeUnaryAsyncRequest).ConfigureAwait(false);
await this.TestHandlerSuccess(FoobarService.MakeUnaryAsyncRequest, DefaultMetadataFunc()).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -92,7 +97,7 @@ static void MakeRequest(Foobar.FoobarClient client)
[Fact]
public async Task ClientStreamingSuccess()
{
await this.TestHandlerSuccess(FoobarService.MakeClientStreamingRequest).ConfigureAwait(false);
await this.TestHandlerSuccess(FoobarService.MakeClientStreamingRequest, DefaultMetadataFunc()).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -140,7 +145,7 @@ static void MakeRequest(Foobar.FoobarClient client)
[Fact]
public async Task ServerStreamingSuccess()
{
await this.TestHandlerSuccess(FoobarService.MakeServerStreamingRequest).ConfigureAwait(false);
await this.TestHandlerSuccess(FoobarService.MakeServerStreamingRequest, DefaultMetadataFunc()).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -174,7 +179,7 @@ static void MakeRequest(Foobar.FoobarClient client)
[Fact]
public async Task DuplexStreamingSuccess()
{
await this.TestHandlerSuccess(FoobarService.MakeDuplexStreamingRequest).ConfigureAwait(false);
await this.TestHandlerSuccess(FoobarService.MakeDuplexStreamingRequest, DefaultMetadataFunc()).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -369,13 +374,15 @@ static void ValidateCommonEventAttributes(ActivityEvent activityEvent)
/// Tests basic handler success.
/// </summary>
/// <param name="clientRequestFunc">The client request function.</param>
/// <param name="additionalMetadata">The additional metadata, if any.</param>
/// <returns>A Task.</returns>
private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequestFunc)
private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Metadata, Task> clientRequestFunc, Metadata additionalMetadata)
{
var mockPropagator = new Mock<TextMapPropagator>();
PropagationContext capturedPropagationContext = default;
Metadata capturedCarrier = null;
var propagatorCalled = 0;
var originalMetadataCount = additionalMetadata.Count;

mockPropagator
.Setup(
Expand All @@ -390,9 +397,15 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
capturedPropagationContext = propagation;
capturedCarrier = carrier;
// Make sure the original metadata make it through
if (additionalMetadata != null)
{
Assert.Equal(capturedCarrier, additionalMetadata);
}
// Call the actual setter to ensure it updates the carrier.
// It doesn't matter what we put in
setter(capturedCarrier, "foo", "bar");
setter(capturedCarrier, "bar", "baz");
});

using var server = FoobarService.Start();
Expand All @@ -407,7 +420,7 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
{
var client = FoobarService.ConstructRpcClient(server.UriString, new ClientTracingInterceptor(interceptorOptions));
await clientRequestFunc(client).ConfigureAwait(false);
await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false);

Assert.Equal(default, Activity.Current);

Expand All @@ -416,6 +429,11 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
// Propagator was called exactly once
Assert.Equal(1, propagatorCalled);

// The client tracing interceptor should create a copy of the original call headers before passing to the propagator.
// Retries that sit above this interceptor rely on the original call metadata.
// The propagator should not have mutated the original CallOption headers.
Assert.Equal(originalMetadataCount, additionalMetadata.Count);

// There was no parent activity, so these will be default
Assert.Equal(default, capturedPropagationContext.ActivityContext.TraceId);
Assert.Equal(default, capturedPropagationContext.ActivityContext.SpanId);
Expand All @@ -437,7 +455,7 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
parentActivity.SetIdFormat(ActivityIdFormat.W3C);
parentActivity.Start();
var client = FoobarService.ConstructRpcClient(server.UriString, new ClientTracingInterceptor(interceptorOptions));
await clientRequestFunc(client).ConfigureAwait(false);
await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false);

Assert.Equal(parentActivity, Activity.Current);

Expand Down Expand Up @@ -465,7 +483,7 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
/// A Task.
/// </returns>
private async Task TestHandlerFailure(
Func<Foobar.FoobarClient, Task> clientRequestFunc,
Func<Foobar.FoobarClient, Metadata, Task> clientRequestFunc,
StatusCode statusCode = StatusCode.ResourceExhausted,
bool validateErrorDescription = true,
string serverUriString = null)
Expand All @@ -482,7 +500,7 @@ private async Task TestHandlerFailure(
});

using var activityListener = new InterceptorActivityListener(clientInterceptorOptions.ActivityIdentifierValue);
await Assert.ThrowsAsync<RpcException>(async () => await clientRequestFunc(client).ConfigureAwait(false));
await Assert.ThrowsAsync<RpcException>(async () => await clientRequestFunc(client, null).ConfigureAwait(false));

var activity = activityListener.Activity;
ValidateCommonActivityTags(activity, statusCode, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ public async Task DuplexStreamingServerHandlerFail()
/// A common method to test server interceptor handler success.
/// </summary>
/// <param name="clientRequestFunc">The specific client request function.</param>
/// <param name="additionalMetadata">The additional metadata, if any.</param>
/// <returns>A Task.</returns>
private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequestFunc)
private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Metadata, Task> clientRequestFunc, Metadata additionalMetadata = null)
{
// starts the server with the server interceptor
var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), RecordMessageEvents = true, ActivityIdentifierValue = Guid.NewGuid() };
Expand All @@ -123,7 +124,7 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
using (var activityListener = new InterceptorActivityListener(interceptorOptions.ActivityIdentifierValue))
{
var client = FoobarService.ConstructRpcClient(server.UriString);
await clientRequestFunc(client).ConfigureAwait(false);
await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false);

var activity = activityListener.Activity;
GrpcCoreClientInterceptorTests.ValidateCommonActivityTags(activity, StatusCode.OK, interceptorOptions.RecordMessageEvents);
Expand All @@ -140,7 +141,7 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
new Metadata.Entry("traceparent", FoobarService.DefaultTraceparentWithSampling),
});

await clientRequestFunc(client).ConfigureAwait(false);
await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false);

var activity = activityListener.Activity;
GrpcCoreClientInterceptorTests.ValidateCommonActivityTags(activity, StatusCode.OK, interceptorOptions.RecordMessageEvents);
Expand All @@ -152,8 +153,9 @@ private async Task TestHandlerSuccess(Func<Foobar.FoobarClient, Task> clientRequ
/// A common method to test server interceptor handler failure.
/// </summary>
/// <param name="clientRequestFunc">The specific client request function.</param>
/// <param name="additionalMetadata">The additional metadata, if any.</param>
/// <returns>A Task.</returns>
private async Task TestHandlerFailure(Func<Foobar.FoobarClient, Task> clientRequestFunc)
private async Task TestHandlerFailure(Func<Foobar.FoobarClient, Metadata, Task> clientRequestFunc, Metadata additionalMetadata = null)
{
// starts the server with the server interceptor
var interceptorOptions = new ServerTracingInterceptorOptions { Propagator = new TraceContextPropagator(), ActivityIdentifierValue = Guid.NewGuid() };
Expand All @@ -169,7 +171,7 @@ private async Task TestHandlerFailure(Func<Foobar.FoobarClient, Task> clientRequ
new Metadata.Entry(FoobarService.RequestHeaderErrorDescription, "fubar"),
});

await Assert.ThrowsAsync<RpcException>(async () => await clientRequestFunc(client).ConfigureAwait(false));
await Assert.ThrowsAsync<RpcException>(async () => await clientRequestFunc(client, additionalMetadata).ConfigureAwait(false));

var activity = activityListener.Activity;
GrpcCoreClientInterceptorTests.ValidateCommonActivityTags(activity, StatusCode.ResourceExhausted, interceptorOptions.RecordMessageEvents);
Expand Down

0 comments on commit 1cdfc7f

Please sign in to comment.