Skip to content

Commit

Permalink
Make MockPayloadSender a subclass of ApmChannel to assert tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Sep 11, 2023
1 parent 1cadb61 commit df0e533
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 89 deletions.
57 changes: 50 additions & 7 deletions src/Elastic.Apm/Ingest/ApmChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm.Api;
using Elastic.Apm.Logging;
using Elastic.Apm.Report;
using Elastic.Channels;
using Elastic.Ingest.Transport;
using Elastic.Transport;

namespace Elastic.Apm.Ingest;

#nullable enable
internal static class ApmChannelStatics
{
public static readonly byte[] LineFeed = { (byte)'\n' };
Expand All @@ -40,18 +44,56 @@ internal static class ApmChannelStatics
/// </summary>
public class ApmChannel
: TransportChannelBase<ApmChannelOptions, IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
, IPayloadSender
, IPayloadSender
{
private readonly List<Func<ITransaction, ITransaction?>> _transactionFilters = new();
private readonly List<Func<ISpan, ISpan?>> _spanFilters = new();
private readonly List<Func<IError, IError?>> _errorFilters = new();

/// <inheritdoc cref="ApmChannel"/>
public ApmChannel(ApmChannelOptions options) : base(options) { }
public ApmChannel(ApmChannelOptions options, IApmLogger? logger = null) : base(options) =>
PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, null, logger ?? new TraceLogger(LogLevel.Trace));

public IError? Filter(IError error) => _errorFilters.Aggregate(error, (current, filter) => filter(current)!);

public ISpan? Filter(ISpan span) => _spanFilters.Aggregate(span, (current, filter) => filter(current)!);

void IPayloadSender.QueueError(IError error) => TryWrite(error);
public ITransaction? Filter(ITransaction span) => _transactionFilters.Aggregate(span, (current, filter) => filter(current)!);

public bool TryFilter(IError error, [NotNullWhen(true)] out IError? filtered)
{
filtered = _errorFilters.Select(f => f(error)).TakeWhile(e => e != null).LastOrDefault();
return filtered != null;
}

void IPayloadSender.QueueMetrics(IMetricSet metrics) => TryWrite(metrics);
public bool TryFilter(ISpan span, [NotNullWhen(true)] out ISpan? filtered)
{
filtered = _spanFilters.Select(f => f(span)).TakeWhile(e => e != null).LastOrDefault();
return filtered != null;
}

void IPayloadSender.QueueSpan(ISpan span) => TryWrite(span);
public bool TryFilter(ITransaction transaction, [NotNullWhen(true)] out ITransaction? filtered)
{
filtered = _transactionFilters.Select(f => f(transaction)).TakeWhile(e => e != null).LastOrDefault();
return filtered != null;
}

void IPayloadSender.QueueTransaction(ITransaction transaction) => TryWrite(transaction);
public virtual void QueueMetrics(IMetricSet metrics) => TryWrite(metrics);

public virtual void QueueError(IError error)
{
if (TryFilter(error, out var e)) TryWrite(e);
}

public virtual void QueueSpan(ISpan span)
{
if (TryFilter(span, out var s)) TryWrite(s);
}

public virtual void QueueTransaction(ITransaction transaction)
{
if (TryFilter(transaction, out var t)) TryWrite(t);
}

//retry if APM server returns 429
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Retry"/>
Expand All @@ -74,7 +116,8 @@ public ApmChannel(ApmChannelOptions options) : base(options) { }
protected override bool RejectEvent((IIntakeRoot, IntakeErrorItem) @event) => false;

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>
protected override Task<EventIntakeResponse> ExportAsync(HttpTransport transport, ArraySegment<IIntakeRoot> page, CancellationToken ctx = default) =>
protected override Task<EventIntakeResponse>
ExportAsync(HttpTransport transport, ArraySegment<IIntakeRoot> page, CancellationToken ctx = default) =>
transport.RequestAsync<EventIntakeResponse>(HttpMethod.POST, "/intake/v2/events",
PostData.StreamHandler(page,
(_, _) =>
Expand Down
9 changes: 8 additions & 1 deletion src/Elastic.Apm/Ingest/ApmChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using Elastic.Apm.Api;
using Elastic.Ingest.Transport;
using Elastic.Transport;
Expand All @@ -14,5 +15,11 @@ namespace Elastic.Apm.Ingest;
public class ApmChannelOptions : TransportChannelOptionsBase<IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
{
/// <inheritdoc cref="ApmChannelOptions"/>
public ApmChannelOptions(HttpTransport transport) : base(transport) { }
private ApmChannelOptions(HttpTransport transport) : base(transport) { }

public ApmChannelOptions(Uri serverEndpoint, TransportClient transportClient = null)
: this(new DefaultHttpTransport(new TransportConfiguration(new SingleNodePool(serverEndpoint), connection: transportClient!)))
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#endregion

#nullable enable
#if !NET6_0_OR_GREATER
namespace System.Diagnostics.CodeAnalysis
{
/// <summary>Specifies that an output will not be null even if the corresponding type allows it.</summary>
Expand Down Expand Up @@ -78,3 +79,4 @@ internal class DoesNotReturnIfAttribute : Attribute
public bool ParameterValue { get; }
}
}
#endif
145 changes: 77 additions & 68 deletions test/Elastic.Apm.Tests.Utilities/MockPayloadSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,34 @@
using System.Linq;
using System.Threading;
using Elastic.Apm.Api;
using Elastic.Apm.Ingest;
using Elastic.Apm.Libraries.Newtonsoft.Json.Linq;
using Elastic.Apm.Logging;
using Elastic.Apm.Metrics;
using Elastic.Apm.Model;
using Elastic.Apm.Report;
using Elastic.Transport;
using FluentAssertions;

#nullable enable
namespace Elastic.Apm.Tests.Utilities
{
internal class MockPayloadSender : IPayloadSender
internal class MockPayloadSender : ApmChannel
{
private static readonly JObject JsonSpanTypesData =
JObject.Parse(File.ReadAllText("./TestResources/json-specs/span_types.json"));

private readonly List<IError> _errors = new List<IError>();
private readonly List<Func<IError, IError>> _errorFilters = new List<Func<IError, IError>>();
private readonly object _spanLock = new object();
private readonly object _transactionLock = new object();
private readonly object _metricsLock = new object();
private readonly object _errorLock = new object();
private readonly List<IMetricSet> _metrics = new List<IMetricSet>();
private readonly List<Func<ISpan, ISpan>> _spanFilters = new List<Func<ISpan, ISpan>>();
private readonly List<ISpan> _spans = new List<ISpan>();
private readonly List<Func<ITransaction, ITransaction>> _transactionFilters = new List<Func<ITransaction, ITransaction>>();
private readonly List<ITransaction> _transactions = new List<ITransaction>();

public MockPayloadSender(IApmLogger logger = null)
private readonly object _spanLock = new();
private readonly object _transactionLock = new();
private readonly object _metricsLock = new();
private readonly object _errorLock = new();
private readonly List<IMetricSet> _metrics = new();
private readonly List<IError> _errors = new();
private readonly List<ISpan> _spans = new();
private readonly List<ITransaction> _transactions = new();

public MockPayloadSender(IApmLogger? logger = null)
: base(new ApmChannelOptions(new Uri("http://localhost:8080"), transportClient: new InMemoryConnection()), logger)
{
_waitHandles = new[] { new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false), new AutoResetEvent(false) };

Expand All @@ -45,7 +46,6 @@ public MockPayloadSender(IApmLogger logger = null)
_errorWaitHandle = _waitHandles[2];
_metricSetWaitHandle = _waitHandles[3];

PayloadSenderV2.SetUpFilters(_transactionFilters, _spanFilters, _errorFilters, MockApmServerInfo.Version710, logger ?? new NoopLogger());
}

/// <summary>
Expand All @@ -61,6 +61,54 @@ public MockPayloadSender(IApmLogger logger = null)
private readonly AutoResetEvent[] _waitHandles;
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(1);

public override bool TryWrite(IIntakeRoot item)
{
var written = base.TryWrite(item);
switch (item)
{
case IError error:
_errors.Add(error);
_errorWaitHandle.Set();
break;
case ITransaction transaction:
_transactions.Add(transaction);
_transactionWaitHandle.Set();
break;
case ISpan span:
_spans.Add(span);
_spanWaitHandle.Set();
break;
case IMetricSet metricSet:
_metrics.Add(metricSet);
_metricSetWaitHandle.Set();
break;
}
return written;
}

public override void QueueError(IError error)
{
lock (_errorLock) base.QueueError(error);
}

public override void QueueTransaction(ITransaction transaction)
{
lock (_transactionLock) base.QueueTransaction(transaction);
}

public override void QueueSpan(ISpan span)
{
VerifySpan(span);
lock (_spanLock) base.QueueSpan(span);
}

public override void QueueMetrics(IMetricSet metricSet)
{
lock (_metricsLock) base.QueueMetrics(metricSet);
}



/// <summary>
/// Waits for any events to be queued
/// </summary>
Expand Down Expand Up @@ -191,27 +239,27 @@ public IReadOnlyList<IError> Errors
get
{
lock (_errorLock)
return CreateImmutableSnapshot<IError>(_errors);
return CreateImmutableSnapshot(_errors);
}
}

public Error FirstError => Errors.FirstOrDefault() as Error;
public MetricSet FirstMetric => Metrics.FirstOrDefault() as MetricSet;
public Error? FirstError => Errors.FirstOrDefault() as Error;
public MetricSet? FirstMetric => Metrics.FirstOrDefault() as MetricSet;

/// <summary>
/// The 1. Span on the 1. Transaction
/// </summary>
public Span FirstSpan => Spans.FirstOrDefault() as Span;
public Span? FirstSpan => Spans.FirstOrDefault() as Span;

public Transaction FirstTransaction =>
public Transaction? FirstTransaction =>
Transactions.FirstOrDefault() as Transaction;

public IReadOnlyList<IMetricSet> Metrics
{
get
{
lock (_metricsLock)
return CreateImmutableSnapshot<IMetricSet>(_metrics);
return CreateImmutableSnapshot(_metrics);
}
}

Expand All @@ -220,7 +268,7 @@ public IReadOnlyList<ISpan> Spans
get
{
lock (_spanLock)
return CreateImmutableSnapshot<ISpan>(_spans);
return CreateImmutableSnapshot(_spans);
}
}

Expand All @@ -229,45 +277,15 @@ public IReadOnlyList<ITransaction> Transactions
get
{
lock (_transactionLock)
return CreateImmutableSnapshot<ITransaction>(_transactions);
return CreateImmutableSnapshot(_transactions);
}
}

public Span[] SpansOnFirstTransaction =>
Spans.Where(n => n.TransactionId == Transactions.First().Id).Select(n => n as Span).ToArray();

public void QueueError(IError error)
{
lock (_errorLock)
{
error = _errorFilters.Aggregate(error,
(current, filter) => filter(current));
_errors.Add(error);
_errorWaitHandle.Set();
}
}

public virtual void QueueTransaction(ITransaction transaction)
{
lock (_transactionLock)
{
transaction = _transactionFilters.Aggregate(transaction,
(current, filter) => filter(current));
_transactions.Add(transaction);
_transactionWaitHandle.Set();
}
}

public void QueueSpan(ISpan span)
{
VerifySpan(span);
lock (_spanLock)
{
span = _spanFilters.Aggregate(span, (current, filter) => filter(current));
_spans.Add(span);
_spanWaitHandle.Set();
}
}
Spans
.Where(n => n.TransactionId == Transactions.First().Id)
.Select(n => (Span)n)
.ToArray();

private void VerifySpan(ISpan span)
{
Expand All @@ -279,7 +297,7 @@ private void VerifySpan(ISpan span)
var spanTypeInfo = JsonSpanTypesData[type] as JObject;
spanTypeInfo.Should().NotBeNull($"span type '{type}' is not allowed by the spec");

var allowNullSubtype = spanTypeInfo["allow_null_subtype"]?.Value<bool>();
var allowNullSubtype = spanTypeInfo!["allow_null_subtype"]?.Value<bool>();
var allowUnlistedSubtype = spanTypeInfo["allow_unlisted_subtype"]?.Value<bool>();
var subTypes = spanTypeInfo["subtypes"];
var hasSubtypes = subTypes != null && subTypes.Any();
Expand All @@ -289,7 +307,7 @@ private void VerifySpan(ISpan span)
{
if (!allowUnlistedSubtype.GetValueOrDefault() && hasSubtypes)
{
var subTypeInfo = subTypes[subType];
var subTypeInfo = subTypes![subType];
subTypeInfo.Should()
.NotBeNull($"span subtype '{subType}' is not allowed by the spec for type '{type}'");
}
Expand All @@ -305,15 +323,6 @@ private void VerifySpan(ISpan span)
}
}

public void QueueMetrics(IMetricSet metricSet)
{
lock (_metricsLock)
{
_metrics.Add(metricSet);
_metricSetWaitHandle.Set();
}
}

public void Clear()
{
lock (_spanLock)
Expand Down
Loading

0 comments on commit df0e533

Please sign in to comment.