Skip to content

Commit

Permalink
tests: Add WaitForSpansAsync mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
ruiqbarbosa committed Oct 15, 2023
1 parent e80ebdc commit c103860
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 36 deletions.
18 changes: 4 additions & 14 deletions src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using AutoFixture;
using KafkaFlow.Configuration;
using KafkaFlow.IntegrationTests.Core;
using KafkaFlow.IntegrationTests.Core.Exceptions;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Messages;
using KafkaFlow.IntegrationTests.Core.Middlewares;
Expand Down Expand Up @@ -240,19 +239,10 @@ private async Task<IServiceProvider> GetServiceProviderAsync(

private async Task WaitForPartitionAssignmentAsync()
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))));

await retryPolicy.ExecuteAsync(() =>
{
if (!this.isPartitionAssigned)
{
throw new PartitionAssignmentException();
}

return Task.CompletedTask;
});
await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
.WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))))
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}
}
}
44 changes: 22 additions & 22 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using KafkaFlow.Compressor.Gzip;
using KafkaFlow.Configuration;
using KafkaFlow.IntegrationTests.Core;
using KafkaFlow.IntegrationTests.Core.Exceptions;
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Middlewares;
using KafkaFlow.IntegrationTests.Core.Producers;
Expand Down Expand Up @@ -57,13 +56,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);

// Assert
await Task.Delay(8000).ConfigureAwait(false);
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);

var producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
var consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);

Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
Expand Down Expand Up @@ -102,13 +97,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);

// Assert
await Task.Delay(10000).ConfigureAwait(false);
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);

var producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
var consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);

Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
Assert.AreEqual(producerSpan.GetBaggageItem(baggageName1), baggageValue1);
Expand All @@ -119,7 +110,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp

private async Task<IServiceProvider> GetServiceProvider()
{
string topicName = "Otel";
var topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}";

this.isPartitionAssigned = false;

Expand Down Expand Up @@ -184,19 +175,28 @@ private async Task<IServiceProvider> GetServiceProvider()

private async Task WaitForPartitionAssignmentAsync()
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))));
await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
.WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))))
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}

private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
{
Activity producerSpan = null, consumerSpan = null;

await retryPolicy.ExecuteAsync(() =>
{
if (!this.isPartitionAssigned)
await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
.WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))))
.ExecuteAsync(() =>
{
throw new PartitionAssignmentException();
}
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);

return Task.FromResult(producerSpan != null && consumerSpan != null);
});

return Task.CompletedTask;
});
return (producerSpan, consumerSpan);
}
}
}

0 comments on commit c103860

Please sign in to comment.