Skip to content

Commit c1cfac8

Browse files
committed
Clean up
1 parent 2cfd4c0 commit c1cfac8

File tree

3 files changed

+38
-32
lines changed

3 files changed

+38
-32
lines changed

src/Aspire.Hosting/Dashboard/DashboardServiceData.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ static GenericResourceSnapshot CreateResourceSnapshot(IResource resource, string
6262
logger.LogDebug("Updating resource snapshot for {Name}/{DisplayName}: {State}", snapshot.Name, snapshot.DisplayName, snapshot.State);
6363
}
6464

65-
await _resourcePublisher.IntegrateAsync(snapshot, @event.Resource, ResourceSnapshotChangeType.Upsert)
65+
await _resourcePublisher.IntegrateAsync(@event.Resource, snapshot, ResourceSnapshotChangeType.Upsert)
6666
.ConfigureAwait(false);
6767
}
6868
catch (Exception ex) when (ex is not OperationCanceledException)

src/Aspire.Hosting/Dashboard/ResourcePublisher.cs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ namespace Aspire.Hosting.Dashboard;
1616
/// </summary>
1717
internal sealed class ResourcePublisher(CancellationToken cancellationToken)
1818
{
19+
private sealed record SourceAndResourceSnapshot(IResource Source, ResourceSnapshot Snapshot);
20+
1921
private readonly object _syncLock = new();
20-
private readonly Dictionary<string, (ResourceSnapshot, IResource)> _snapshot = [];
22+
private readonly Dictionary<string, SourceAndResourceSnapshot> _snapshot = [];
2123
private ImmutableHashSet<Channel<ResourceSnapshotChange>> _outgoingChannels = [];
2224

2325
// For testing purposes
@@ -29,7 +31,8 @@ internal bool TryGetResource(string resourceName, [NotNullWhen(returnValue: true
2931
{
3032
if (_snapshot.TryGetValue(resourceName, out var r))
3133
{
32-
(snapshot, resource) = r;
34+
snapshot = r.Snapshot;
35+
resource = r.Source;
3336
return true;
3437
}
3538

@@ -49,7 +52,7 @@ public ResourceSnapshotSubscription Subscribe()
4952
ImmutableInterlocked.Update(ref _outgoingChannels, static (set, channel) => set.Add(channel), channel);
5053

5154
return new ResourceSnapshotSubscription(
52-
InitialState: _snapshot.Values.Select(r => r.Item1).ToImmutableArray(),
55+
InitialState: _snapshot.Select(r => r.Value.Snapshot).ToImmutableArray(),
5356
Subscription: StreamUpdates());
5457

5558
async IAsyncEnumerable<IReadOnlyList<ResourceSnapshotChange>> StreamUpdates([EnumeratorCancellation] CancellationToken enumeratorCancellationToken = default)
@@ -74,11 +77,11 @@ async IAsyncEnumerable<IReadOnlyList<ResourceSnapshotChange>> StreamUpdates([Enu
7477
/// <summary>
7578
/// Integrates a changed resource within the cache, and broadcasts the update to any subscribers.
7679
/// </summary>
77-
/// <param name="resource">The resource that was modified.</param>
78-
/// <param name="resourceModel">The resource model.</param>
80+
/// <param name="source">The source resource.</param>
81+
/// <param name="snapshot">The resource snapshot that was modified.</param>
7982
/// <param name="changeType">The change type (Added, Modified, Deleted).</param>
8083
/// <returns>A task that completes when the cache has been updated and all subscribers notified.</returns>
81-
internal async ValueTask IntegrateAsync(ResourceSnapshot resource, IResource resourceModel, ResourceSnapshotChangeType changeType)
84+
internal async ValueTask IntegrateAsync(IResource source, ResourceSnapshot snapshot, ResourceSnapshotChangeType changeType)
8285
{
8386
ImmutableHashSet<Channel<ResourceSnapshotChange>> channels;
8487

@@ -87,11 +90,11 @@ internal async ValueTask IntegrateAsync(ResourceSnapshot resource, IResource res
8790
switch (changeType)
8891
{
8992
case ResourceSnapshotChangeType.Upsert:
90-
_snapshot[resource.Name] = (resource, resourceModel);
93+
_snapshot[snapshot.Name] = new SourceAndResourceSnapshot(source, snapshot);
9194
break;
9295

9396
case ResourceSnapshotChangeType.Delete:
94-
_snapshot.Remove(resource.Name);
97+
_snapshot.Remove(snapshot.Name);
9598
break;
9699
}
97100

@@ -100,7 +103,7 @@ internal async ValueTask IntegrateAsync(ResourceSnapshot resource, IResource res
100103

101104
foreach (var channel in channels)
102105
{
103-
await channel.Writer.WriteAsync(new(changeType, resource), cancellationToken).ConfigureAwait(false);
106+
await channel.Writer.WriteAsync(new(changeType, snapshot), cancellationToken).ConfigureAwait(false);
104107
}
105108
}
106109
}

tests/Aspire.Hosting.Tests/Dashboard/ResourcePublisherTests.cs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace Aspire.Hosting.Tests.Dashboard;
99

1010
public class ResourcePublisherTests
1111
{
12-
[Fact(Skip = "Passes locally but fails in CI. https://github.com/dotnet/aspire/issues/1410")]
12+
[Fact]
1313
public async Task ProducesExpectedSnapshotAndUpdates()
1414
{
1515
CancellationTokenSource cts = new();
@@ -19,8 +19,8 @@ public async Task ProducesExpectedSnapshotAndUpdates()
1919
var b = CreateResourceSnapshot("B");
2020
var c = CreateResourceSnapshot("C");
2121

22-
await publisher.IntegrateAsync(a, new TestResource("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
23-
await publisher.IntegrateAsync(b, new TestResource("B"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
22+
await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
23+
await publisher.IntegrateAsync(new TestResource("B"), b, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
2424

2525
Assert.Equal(0, publisher.OutgoingSubscriberCount);
2626

@@ -32,29 +32,32 @@ public async Task ProducesExpectedSnapshotAndUpdates()
3232
Assert.Single(snapshot.Where(s => s.Name == "A"));
3333
Assert.Single(snapshot.Where(s => s.Name == "B"));
3434

35-
using AutoResetEvent sync = new(initialState: false);
36-
List<IReadOnlyList<ResourceSnapshotChange>> changeBatches = [];
35+
var tcs = new TaskCompletionSource<IReadOnlyList<ResourceSnapshotChange>>(TaskCreationOptions.RunContinuationsAsynchronously);
3736

3837
var task = Task.Run(async () =>
3938
{
4039
await foreach (var change in subscription)
4140
{
42-
changeBatches.Add(change);
43-
sync.Set();
41+
tcs.TrySetResult(change);
4442
}
4543
});
4644

47-
await publisher.IntegrateAsync(c, new TestResource("C"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
48-
49-
Assert.True(sync.WaitOne(TimeSpan.FromSeconds(1)));
45+
await publisher.IntegrateAsync(new TestResource("C"), c, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
5046

51-
var change = Assert.Single(changeBatches.SelectMany(o => o));
47+
var change = Assert.Single(await tcs.Task);
5248
Assert.Equal(ResourceSnapshotChangeType.Upsert, change.ChangeType);
5349
Assert.Equal("C", change.Resource.Name);
5450

5551
await cts.CancelAsync();
5652

57-
await Assert.ThrowsAsync<OperationCanceledException>(() => task);
53+
try
54+
{
55+
await task;
56+
}
57+
catch (OperationCanceledException)
58+
{
59+
// Ignore possible cancellation error.
60+
}
5861

5962
Assert.Equal(0, publisher.OutgoingSubscriberCount);
6063
}
@@ -69,8 +72,8 @@ public async Task SupportsMultipleSubscribers()
6972
var b = CreateResourceSnapshot("B");
7073
var c = CreateResourceSnapshot("C");
7174

72-
await publisher.IntegrateAsync(a, new TestResource("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
73-
await publisher.IntegrateAsync(b, new TestResource("B"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
75+
await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
76+
await publisher.IntegrateAsync(new TestResource("B"), b, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
7477

7578
Assert.Equal(0, publisher.OutgoingSubscriberCount);
7679

@@ -82,7 +85,7 @@ public async Task SupportsMultipleSubscribers()
8285
Assert.Equal(2, snapshot1.Length);
8386
Assert.Equal(2, snapshot2.Length);
8487

85-
await publisher.IntegrateAsync(c, new TestResource("C"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
88+
await publisher.IntegrateAsync(new TestResource("C"), c, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
8689

8790
var enumerator1 = subscription1.GetAsyncEnumerator(cts.Token);
8891
var enumerator2 = subscription2.GetAsyncEnumerator(cts.Token);
@@ -116,9 +119,9 @@ public async Task MergesResourcesInSnapshot()
116119
var a2 = CreateResourceSnapshot("A");
117120
var a3 = CreateResourceSnapshot("A");
118121

119-
await publisher.IntegrateAsync(a1, new TestResource("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
120-
await publisher.IntegrateAsync(a2, new TestResource("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
121-
await publisher.IntegrateAsync(a3, new TestResource("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
122+
await publisher.IntegrateAsync(new TestResource("A"), a1, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
123+
await publisher.IntegrateAsync(new TestResource("A"), a2, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
124+
await publisher.IntegrateAsync(new TestResource("A"), a3, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
122125

123126
var (snapshot, _) = publisher.Subscribe();
124127

@@ -136,9 +139,9 @@ public async Task DeletesRemoveFromSnapshot()
136139
var a = CreateResourceSnapshot("A");
137140
var b = CreateResourceSnapshot("B");
138141

139-
await publisher.IntegrateAsync(a, new TestResource("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
140-
await publisher.IntegrateAsync(b, new TestResource("B"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
141-
await publisher.IntegrateAsync(a, new TestResource("A"), ResourceSnapshotChangeType.Delete).ConfigureAwait(false);
142+
await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
143+
await publisher.IntegrateAsync(new TestResource("B"), b, ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
144+
await publisher.IntegrateAsync(new TestResource("A"), a, ResourceSnapshotChangeType.Delete).ConfigureAwait(false);
142145

143146
var (snapshot, _) = publisher.Subscribe();
144147

@@ -170,7 +173,7 @@ public async Task CancelledSubscriptionIsCleanedUp()
170173
});
171174

172175
// Push through an update.
173-
await publisher.IntegrateAsync(CreateResourceSnapshot("A"), new TestResource("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
176+
await publisher.IntegrateAsync(new TestResource("A"), CreateResourceSnapshot("A"), ResourceSnapshotChangeType.Upsert).ConfigureAwait(false);
174177

175178
// Let the subscriber exit.
176179
await task;

0 commit comments

Comments
 (0)