Skip to content

Commit

Permalink
Ensure that WithSenderMiddleware actually chains middleware calls (#…
Browse files Browse the repository at this point in the history
…1832)

I also removed `WithInnerContext` since it seems a little silly to have to allocate another decorator object when `WithSenderMiddleware` could just set the inner context. I'm happy to change this back if you feel it's necessary.
  • Loading branch information
jstnlef authored Oct 23, 2022
1 parent d611307 commit 8f5c237
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 18 deletions.
4 changes: 1 addition & 3 deletions examples/ContextDecorators/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ public LoggingRootDecorator(IRootContext context) : base(context)
{
}

protected override IRootContext WithInnerContext(IRootContext context) => new LoggingRootDecorator(context);

public override async Task<T> RequestAsync<T>(PID target, object message, CancellationToken ct)
{
Console.WriteLine("Enter RequestAsync");
Expand Down Expand Up @@ -73,4 +71,4 @@ private static void Main(string[] args)
Console.WriteLine("Got result " + res);
Console.ReadLine();
}
}
}
15 changes: 10 additions & 5 deletions src/Proto.Actor/Context/RootContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,20 @@ public RootContext(ActorSystem system, MessageHeader? messageHeader, params Func
{
System = system;

SenderMiddleware = middleware.Reverse()
.Aggregate((Sender)DefaultSender, (inner, outer) => outer(inner));
SenderMiddleware = AggregateMiddleware(middleware);

Headers = messageHeader ?? MessageHeader.Empty;
}

private Sender? SenderMiddleware { get; init; }

private Sender AggregateMiddleware(params Func<Sender, Sender>[] middleware)
{
return middleware
.Reverse()
.Aggregate(SenderMiddleware ?? (Sender)DefaultSender, (inner, outer) => outer(inner));
}

private TypeDictionary<object, RootContext> Store { get; } = new(0, 1);
public ActorSystem System { get; }

Expand Down Expand Up @@ -108,8 +114,7 @@ public Task<T> RequestAsync<T>(PID target, object message, CancellationToken can
public IRootContext WithSenderMiddleware(params Func<Sender, Sender>[] middleware) =>
this with
{
SenderMiddleware = middleware.Reverse()
.Aggregate((Sender)DefaultSender, (inner, outer) => outer(inner))
SenderMiddleware = AggregateMiddleware(middleware)
};

public IFuture GetFuture() => System.Future.Get();
Expand Down Expand Up @@ -172,4 +177,4 @@ private void SendUserMessage(PID target, object message)
target.SendUserMessage(System, message);
}
}
}
}
8 changes: 3 additions & 5 deletions src/Proto.Actor/Context/RootContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace Proto;
[PublicAPI]
public abstract class RootContextDecorator : IRootContext
{
private readonly IRootContext _context;
private IRootContext _context;

protected RootContextDecorator(IRootContext context)
{
Expand Down Expand Up @@ -49,7 +49,7 @@ public virtual Task<T> RequestAsync<T>(PID target, object message, CancellationT
public virtual Task PoisonAsync(PID pid) => _context.PoisonAsync(pid);

public IRootContext WithSenderMiddleware(params Func<Sender, Sender>[] middleware) =>
WithInnerContext(_context.WithSenderMiddleware(middleware));
_context = _context.WithSenderMiddleware(middleware);

public virtual PID? Parent => null;
public virtual PID Self => null!;
Expand All @@ -65,7 +65,5 @@ public IRootContext WithSenderMiddleware(params Func<Sender, Sender>[] middlewar

public IFuture GetFuture() => _context.GetFuture();

protected abstract IRootContext WithInnerContext(IRootContext context);

public virtual void Request(PID target, object message) => _context.Request(target, message);
}
}
5 changes: 1 addition & 4 deletions src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ public OpenTelemetryRootContextDecorator(IRootContext context, ActivitySetup sen

private static string Source => "Root";

protected override IRootContext WithInnerContext(IRootContext context) =>
new OpenTelemetryRootContextDecorator(context, _sendActivitySetup);

public override void Send(PID target, object message) =>
OpenTelemetryMethodsDecorators.Send(Source, target, message, _sendActivitySetup,
() => base.Send(target, message));
Expand Down Expand Up @@ -271,4 +268,4 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti
throw;
}
}
}
}
18 changes: 17 additions & 1 deletion tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ await VerifyTrace(async (rootContext, target) =>
}
);

[Fact]
public async Task TracesPropagateCorrectlyForRequestWithSenderWithAdditionalMiddleware() =>
await VerifyTrace(async (tracedRoot, target) =>
{
var middleContext = tracedRoot.WithSenderMiddleware(next => async (context, _, envelope) =>
{
var updatedEnvelope = envelope.WithHeader("test", "value");
await next(context, target, updatedEnvelope);
});
var future = new FutureProcess(middleContext.System);
middleContext.Request(target, new TraceMe(SendAs.Request), future.Pid);
var response = (MessageEnvelope)await future.Task;
response.Message.Should().Be(new TraceResponse());
}
);

/// <summary>
/// Checks that we have both the outer and innermost trace present, meaning that the trace has propagated
/// across the context boundaries
Expand Down Expand Up @@ -210,4 +226,4 @@ private static void ConditionalRespond(IContext context, object remainder)

private PID GetChild(IContext context) => _child ??= context.Spawn(InnerTraceActorProps);
}
}
}

0 comments on commit 8f5c237

Please sign in to comment.