Skip to content

Commit

Permalink
Fix for ChangeSetMergeTracker so that it correctly works with Value T…
Browse files Browse the repository at this point in the history
…ypes (#940)

* Fixes #931 with unit tests

* Minor tweaks

* Remove an extra equality check
  • Loading branch information
dwcullop authored Sep 16, 2024
1 parent 741cf6b commit 9f934a2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 26 deletions.
4 changes: 2 additions & 2 deletions src/DynamicData.Tests/Cache/MergeChangeSetsFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -651,11 +651,11 @@ public void EqualityComparerAndComparerWorkTogetherForRefreshes()
// then
_marketList.Count.Should().Be(2);
results1.Data.Count.Should().Be(PricesPerMarket);
results1.Messages.Count.Should().Be(2);
results1.Messages.Count.Should().Be(3);
results1.Summary.Overall.Adds.Should().Be(PricesPerMarket);
results1.Summary.Overall.Removes.Should().Be(0);
results1.Summary.Overall.Updates.Should().Be(PricesPerMarket);
results1.Summary.Overall.Refreshes.Should().Be(0);
results1.Summary.Overall.Refreshes.Should().Be(PricesPerMarket);
results2.Messages.Count.Should().Be(4);
results2.Summary.Overall.Adds.Should().Be(PricesPerMarket);
results2.Summary.Overall.Removes.Should().Be(0);
Expand Down
20 changes: 20 additions & 0 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,26 @@ public void MergedObservableWillFailIfSourceFails()
receivedError.Should().Be(expectedError);
}

[Fact]
public void MergeManyChangeSetsWorksCorrectlyWithValueTypes()
{
// having
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
_marketCache.AddOrUpdate(markets);
markets.ForEach(m => m.SetPrices(0, PricesPerMarket, GetRandomPrice));
using var results = _marketCache.Connect()
.MergeManyChangeSets(m => m.LatestPrices.Transform(p => p.Price))
.AsAggregator();

// when
markets.ForEach(m => m.RemoveAllPrices());

// then
results.Data.Count.Should().Be(0);
results.Summary.Overall.Adds.Should().Be(PricesPerMarket);
results.Summary.Overall.Removes.Should().Be(PricesPerMarket);
}

public void Dispose()
{
_marketCacheResults.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,11 +921,11 @@ public void EqualityComparerAndChildComparerWorkTogetherForRefreshes()
resultsLow.Summary.Overall.Removes.Should().Be(0);
resultsLow.Summary.Overall.Updates.Should().Be(0);
resultsLow.Summary.Overall.Refreshes.Should().Be(0);
resultsRecent.Messages.Count.Should().Be(3);
resultsRecent.Messages.Count.Should().Be(4);
resultsRecent.Summary.Overall.Adds.Should().Be(PricesPerMarket);
resultsRecent.Summary.Overall.Removes.Should().Be(0);
resultsRecent.Summary.Overall.Updates.Should().Be(PricesPerMarket * 2);
resultsRecent.Summary.Overall.Refreshes.Should().Be(0);
resultsRecent.Summary.Overall.Refreshes.Should().Be(PricesPerMarket);
resultsTimeStamp.Messages.Count.Should().Be(5);
resultsTimeStamp.Summary.Overall.Adds.Should().Be(PricesPerMarket);
resultsTimeStamp.Summary.Overall.Removes.Should().Be(0);
Expand Down
7 changes: 4 additions & 3 deletions src/DynamicData/Cache/Internal/ChangeSetMergeTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal sealed class ChangeSetMergeTracker<TObject, TKey>(Func<IEnumerable<Chan
where TKey : notnull
{
private readonly ChangeAwareCache<TObject, TKey> _resultCache = new();
private readonly IEqualityComparer<TObject> _equalityComparer = equalityComparer ?? EqualityComparer<TObject>.Default;
private bool _hasCompleted;

public void MarkComplete() => _hasCompleted = true;
Expand Down Expand Up @@ -201,7 +202,7 @@ private void OnItemRefreshed(ChangeSetCache<TObject, TKey>[] sources, TObject it
// In the sorting case, a refresh requires doing a full update because any change could alter what the best value is
// If we don't care about sorting OR if we do care, but re-selecting the best value didn't change anything
// AND the current value is the exact one being refreshed, then emit the refresh downstream
if (((comparer is null) || !UpdateToBestValue(sources, key, cached)) && ReferenceEquals(cached.Value, item))
if (((comparer is null) || !UpdateToBestValue(sources, key, cached)) && CheckEquality(cached.Value, item))
{
_resultCache.Refresh(key);
}
Expand Down Expand Up @@ -268,9 +269,9 @@ private Optional<TObject> LookupBestValue(ChangeSetCache<TObject, TKey>[] source
}

private bool CheckEquality(TObject left, TObject right) =>
ReferenceEquals(left, right) || (equalityComparer?.Equals(left, right) ?? false);
_equalityComparer.Equals(left, right);

// Return true if candidate should replace current as the observed downstream value
private bool ShouldReplace(TObject candidate, TObject current) =>
!ReferenceEquals(candidate, current) && (comparer?.Compare(candidate, current) < 0);
comparer?.Compare(candidate, current) < 0;
}
55 changes: 36 additions & 19 deletions src/DynamicData/Cache/Internal/ToObservableOptional.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,45 @@ internal sealed class ToObservableOptional<TObject, TKey>(IObservable<IChangeSet
where TObject : notnull
where TKey : notnull
{
private readonly IEqualityComparer<TObject> _equalityComparer = equalityComparer ?? EqualityComparer<TObject>.Default;
private readonly IObservable<IChangeSet<TObject, TKey>> _source = source ?? throw new ArgumentNullException(nameof(source));
private readonly TKey _key = key;

public IObservable<Optional<TObject>> Run() => Observable.Create<Optional<TObject>>(observer =>
_source.Subscribe(
changes =>
changes.Where(ShouldEmitChange).ForEach(change => observer.OnNext(change switch
{
{ Reason: ChangeReason.Remove } => Optional.None<TObject>(),
_ => Optional.Some(change.Current),
})),
observer.OnError,
observer.OnCompleted));

private bool ShouldEmitChange(Change<TObject, TKey> change) => change switch
{
{ Key: { } thekey } when !thekey.Equals(_key) => false,
{ Reason: ChangeReason.Add } => true,
{ Reason: ChangeReason.Remove } => true,
{ Reason: ChangeReason.Update, Previous.HasValue: false } => true,
{ Reason: ChangeReason.Update } when equalityComparer is not null => !equalityComparer.Equals(change.Current, change.Previous.Value),
{ Reason: ChangeReason.Update } => !ReferenceEquals(change.Current, change.Previous.Value),
_ => false,
};
var lastValue = Optional.None<TObject>();

return _source.Subscribe(
changes => lastValue = EmitChanges(changes, observer, lastValue),
observer.OnError,
observer.OnCompleted);
});

private Optional<TObject> EmitChanges(IChangeSet<TObject, TKey> changes, IObserver<Optional<TObject>> observer, Optional<TObject> lastValue)
{
foreach (var change in changes.ToConcreteType())
{
// Ignore changes for different keys
if (!change.Key.Equals(_key))
{
continue;
}

// Remove is None, everything else is the current value
var emitValue = change switch
{
{ Reason: ChangeReason.Remove } => Optional.None<TObject>(),
_ => Optional.Some(change.Current),
};

// Emit the value if it has changed
if ((emitValue.HasValue != lastValue.HasValue) || (emitValue.HasValue && !_equalityComparer.Equals(lastValue.Value, emitValue.Value)))
{
observer.OnNext(emitValue);
lastValue = emitValue;
}
}

return lastValue;
}
}

0 comments on commit 9f934a2

Please sign in to comment.