Skip to content

Commit

Permalink
Make EventHub tests more reliable (#8889)
Browse files Browse the repository at this point in the history
* Make sure the pulling agents are started before starting tests

* Reactivate some tests
  • Loading branch information
benjaminpetit authored Mar 7, 2024
1 parent 434adbf commit 85b011a
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 34 deletions.
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Diagnostics/Metrics/Instruments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Orleans.Runtime;

internal static class Instruments
public static class Instruments
{
internal static readonly Meter Meter = new("Microsoft.Orleans");
public static readonly Meter Meter = new("Microsoft.Orleans");
}
12 changes: 11 additions & 1 deletion test/Extensions/ServiceBus.Tests/CollectionFixtures.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Tester;
using Orleans.Runtime;
using Tester;
using TestExtensions;
using Xunit;

Expand All @@ -19,5 +20,14 @@ protected override void CheckPreconditionsOrThrow()
base.CheckPreconditionsOrThrow();
TestUtils.CheckForEventHub();
}

public override async Task InitializeAsync()
{
await base.InitializeAsync();
var collector = new Microsoft.Extensions.Diagnostics.Metrics.Testing.MetricCollector<long>(Instruments.Meter, "orleans-streams-queue-read-duration");
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
// Wait for 10 queue read
await collector.WaitForMeasurementsAsync(10, cts.Token);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Diagnostics.Metrics;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.TestingHost;
using TestExtensions;
using UnitTests.StreamingTests;
Expand Down Expand Up @@ -69,7 +71,7 @@ public async Task EHMultipleLinearSubscriptionTest()
await runner.MultipleLinearSubscriptionTest(Guid.NewGuid(), StreamNamespace);
}

[SkippableFact(Skip="https://github.com/dotnet/orleans/issues/5647"), TestCategory("EventHub"), TestCategory("Streaming")]
[SkippableFact, TestCategory("EventHub"), TestCategory("Streaming")]
public async Task EHMultipleSubscriptionTest_AddRemove()
{
this.fixture.Logger.LogInformation("************************ EHMultipleSubscriptionTest_AddRemove *********************************");
Expand Down Expand Up @@ -97,7 +99,7 @@ public async Task EHActiveSubscriptionTest()
await runner.ActiveSubscriptionTest(Guid.NewGuid(), StreamNamespace);
}

[SkippableFact(Skip="https://github.com/dotnet/orleans/issues/5653"), TestCategory("EventHub"), TestCategory("Streaming")]
[SkippableFact, TestCategory("EventHub"), TestCategory("Streaming")]
public async Task EHTwoIntermitentStreamTest()
{
this.fixture.Logger.LogInformation("************************ EHTwoIntermitentStreamTest *********************************");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ public async Task StreamingTests_Consumer_Producer_Subscribe()
var producer = this.fixture.HostedCluster.GrainFactory.GetGrain<ITypedProducerGrainProducingApple>(Guid.NewGuid());
await producer.BecomeProducer(streamId.Guid, streamId.Namespace, streamId.ProviderName);

await producer.StartPeriodicProducing();
for (var i = 0; i< 10; i++)
{
await producer.Produce();
}

int numProduced = 0;
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.ProducerHasProducedSinceLastCheck(numProduced, producer, lastTry), _timeout);
await producer.StopPeriodicProducing();
var implicitConsumer = this.fixture.HostedCluster.GrainFactory.GetGrain<IImplicitSubscribeGrain>(streamId.Guid);

var implicitConsumer =
this.fixture.HostedCluster.GrainFactory.GetGrain<IImplicitSubscribeGrain>(streamId.Guid);
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.CheckCounters(new List<ITypedProducerGrain> { producer },
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.CheckCounters(new List<ITypedProducerGrain> { producer },
implicitConsumer, lastTry, this.fixture.Logger), _timeout);

//clean up test
Expand All @@ -52,25 +51,25 @@ await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.Che
public async Task StreamingTests_Consumer_Producer_SubscribeToTwoStream_MessageWithPolymorphism()
{
var streamId = new FullStreamIdentity(Guid.NewGuid(), ImplicitSubscribeGrain.StreamNameSpace, StreamProviderName);

var producer = this.fixture.GrainFactory.GetGrain<ITypedProducerGrainProducingApple>(Guid.NewGuid());
await producer.BecomeProducer(streamId.Guid, streamId.Namespace, streamId.ProviderName);
await producer.StartPeriodicProducing();
int numProduced = 0;
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.ProducerHasProducedSinceLastCheck(numProduced, producer, lastTry), _timeout);

// set up the new stream with the same guid, but different namespace, so it would invoke the same consumer grain
var streamId2 = new FullStreamIdentity(streamId.Guid, ImplicitSubscribeGrain.StreamNameSpace2, StreamProviderName);
var producer2 = this.fixture.GrainFactory.GetGrain<ITypedProducerGrainProducingApple>(Guid.NewGuid());
await producer2.BecomeProducer(streamId2.Guid, streamId2.Namespace, streamId2.ProviderName);
await producer2.StartPeriodicProducing();
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.ProducerHasProducedSinceLastCheck(numProduced, producer2, lastTry), _timeout);

await producer.StopPeriodicProducing();
await producer2.StopPeriodicProducing();

var implicitConsumer =
this.fixture.HostedCluster.GrainFactory.GetGrain<IImplicitSubscribeGrain>(streamId.Guid);
// Produce 10 events in streamId, 8 on streamId2
for (var i = 0; i < 10; i++)
{
await producer.Produce();
if (i < 8)
{
await producer2.Produce();
}
}

var implicitConsumer = this.fixture.HostedCluster.GrainFactory.GetGrain<IImplicitSubscribeGrain>(streamId.Guid);
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.CheckCounters(new List<ITypedProducerGrain> { producer, producer2 },
implicitConsumer, lastTry, this.fixture.Logger), _timeout);

Expand All @@ -85,19 +84,22 @@ public async Task StreamingTests_Consumer_Producer_SubscribeToStreamsHandledByDi

var producer = this.fixture.GrainFactory.GetGrain<ITypedProducerGrainProducingApple>(Guid.NewGuid());
await producer.BecomeProducer(streamId.Guid, streamId.Namespace, streamId.ProviderName);
await producer.StartPeriodicProducing();
int numProduced = 0;
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.ProducerHasProducedSinceLastCheck(numProduced, producer, lastTry), _timeout);

// set up the new stream with the same guid, but different namespace, so it would invoke the same consumer grain
var streamId2 = new FullStreamIdentity(streamId.Guid, ImplicitSubscribeGrain.StreamNameSpace2, StreamProviderName2);
var producer2 = this.fixture.GrainFactory.GetGrain<ITypedProducerGrainProducingApple>(Guid.NewGuid());
await producer2.BecomeProducer(streamId2.Guid, streamId2.Namespace, streamId2.ProviderName);
await producer2.StartPeriodicProducing();
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.ProducerHasProducedSinceLastCheck(numProduced, producer2, lastTry), _timeout);
await producer.StopPeriodicProducing();
await producer2.StopPeriodicProducing();


// Produce 10 events in streamId, 8 on streamId2
for (var i = 0; i < 10; i++)
{
await producer.Produce();
if (i < 8)
{
await producer2.Produce();
}
}

var implicitConsumer =
this.fixture.HostedCluster.GrainFactory.GetGrain<IImplicitSubscribeGrain>(streamId.Guid);
await TestingUtils.WaitUntilAsync(lastTry => ProgrammaticSubcribeTestsRunner.CheckCounters(new List<ITypedProducerGrain> { producer, producer2 },
Expand Down
16 changes: 13 additions & 3 deletions test/Tester/StreamingTests/SubscriptionMultiplicityTestRunner.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
Expand Down Expand Up @@ -40,9 +42,7 @@ public async Task MultipleParallelSubscriptionTest(Guid streamGuid, string strea
// produce some messages
await producer.BecomeProducer(streamGuid, streamNamespace, streamProviderName);

await producer.StartPeriodicProducing();
await Task.Delay(TimeSpan.FromMilliseconds(1000));
await producer.StopPeriodicProducing();
await RunFor(() => producer.Produce(), TimeSpan.FromSeconds(1));

// check
await TestingUtils.WaitUntilAsync(lastTry => CheckCounters(producer, consumer, 2, lastTry), Timeout);
Expand Down Expand Up @@ -413,5 +413,15 @@ private async Task<bool> CheckCounters(ISampleStreaming_ProducerGrain producer,
numConsumed);
return true;
}

private async Task RunFor(Func<Task> func, TimeSpan duration)
{
var sw = Stopwatch.StartNew();
do
{
await func();
}
while (sw.Elapsed < duration);
}
}
}
1 change: 1 addition & 0 deletions test/Tester/Tester.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.Testing" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 85b011a

Please sign in to comment.