diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index 6edbf67ba..9a1416bd8 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -1,7 +1,12 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive; using System.Reactive.Linq; +using System.Threading.Tasks; +using Bogus; using DynamicData.Kernel; using DynamicData.Tests.Domain; using DynamicData.Tests.Utilities; @@ -29,15 +34,94 @@ public sealed class MergeManyChangeSetsCacheSourceCompareFixture : IDisposable const decimal HighestPrice = BasePrice + PriceOffset + 1.0m; const decimal LowestPrice = BasePrice - 1.0m; - private static readonly Random Random = new (0x10012022); - - private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset); - private readonly ISourceCache _marketCache = new SourceCache(p => p.Id); private readonly ChangeSetAggregator _marketCacheResults; - public MergeManyChangeSetsCacheSourceCompareFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator(); + private readonly Faker _marketFaker; + + private readonly Randomizer _randomizer; + + public MergeManyChangeSetsCacheSourceCompareFixture() + { + _randomizer = new(0x10012022); + _marketFaker = Fakers.Market.RuleFor(m => m.Rating, faker => faker.Random.Double(0, 5)).WithSeed(_randomizer); + _marketCacheResults = _marketCache.Connect().AsAggregator(); + } + + [Theory] +#if DEBUG + [InlineData(5, 7)] + [InlineData(10, 50)] +#else + [InlineData(10, 1_000)] + [InlineData(100, 100)] + [InlineData(1_000, 10)] +#endif + public async Task MultiThreadedStressTest(int marketCount, int priceCount) + { + const int MaxItemId = 50; + var MaxAddTime = TimeSpan.FromSeconds(0.250); + var MaxRemoveTime = TimeSpan.FromSeconds(0.100); + + TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null; + + IObservable AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) => + Observable.Create(observer => new CompositeDisposable + { + AddRemoveMarkets(marketCount, parallel, scheduler) + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError), + _marketCache.Connect() + .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) + .Subscribe( + onNext: static _ => { }, + onError: observer.OnError, + onCompleted: observer.OnCompleted) + }); + + IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) => + _marketFaker.IntervalGenerate(MaxAddTime, scheduler) + .Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)) + .Finally(_marketCache.Dispose); + + IObservable AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) => + _randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreatePrice(_randomizer.Number(MaxItemId), GetRandomPrice())) + .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) + .Finally(market.PricesCache.Dispose); + + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare); + var adding = true; + using var priceResults = merged.AsAggregator(); + + // Start asynchrononously modifying the parent list and the child lists + using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) + .Finally(() => adding = false) + .Subscribe(); + + // Subscribe / unsubscribe over and over while the collections are being modified + do + { + // Ensure items are being added asynchronously before subscribing to changes + await Task.Yield(); + + { + // Subscribe + var mergedSub = merged.Subscribe(); + + // Let other threads run + await Task.Yield(); + + // Unsubscribe + mergedSub.Dispose(); + } + } + while (adding); + + // Verify the results + CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare); + } [Fact] public void NullChecks() @@ -985,10 +1069,32 @@ public void Dispose() DisposeMarkets(); } + private void CheckResultContents(ChangeSetAggregator marketResults, ChangeSetAggregator priceResults, IComparer comparer) + { + var expectedMarkets = _marketCache.Items.ToList(); + + // These should be subsets of each other + expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items); + marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count); + + // Pair up all the Markets/Prices, Group them by ItemId, and sort each Group by the Market comparer + // Then pull out the first value from each group, which should be the price from the best market for each ItemId + var expectedPrices = expectedMarkets.Select(m => (Market)m).SelectMany(m => m.PricesCache.Items.Select(mp => (Market: m, MarketPrice: mp))) + .GroupBy(tuple => tuple.MarketPrice.ItemId) + .Select(group => group.OrderBy(tuple => tuple.Market, comparer).Select(tuple => tuple.MarketPrice).First()) + .ToList(); + + // These should be subsets of each other + expectedPrices.Should().BeSubsetOf(priceResults.Data.Items); + priceResults.Data.Items.Count().Should().Be(expectedPrices.Count); + } + private void DisposeMarkets() { _marketCache.Items.ForEach(m => (m as IDisposable)?.Dispose()); _marketCache.Dispose(); _marketCache.Clear(); } + + private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset); } diff --git a/src/DynamicData.Tests/Domain/Fakers.cs b/src/DynamicData.Tests/Domain/Fakers.cs index 069b0fcf6..214b7f703 100644 --- a/src/DynamicData.Tests/Domain/Fakers.cs +++ b/src/DynamicData.Tests/Domain/Fakers.cs @@ -45,7 +45,7 @@ internal static class Fakers public static Faker AnimalOwnerWithAnimals { get; } = AnimalOwner.Clone().WithInitialAnimals(Animal); - public static Faker Market { get; } = new Faker().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Market Id#{faker.Random.AlphaNumeric(5)}")); + public static Faker Market { get; } = new Faker().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Id#{faker.Random.AlphaNumeric(5)}")); public static Faker WithInitialAnimals(this Faker existing, Faker animalFaker, int minCount, int maxCount) => existing.FinishWith((faker, owner) => owner.Animals.AddRange(animalFaker.GenerateLazy(faker.Random.Number(minCount, maxCount)))); diff --git a/src/DynamicData/Cache/CacheChangeSetEx.cs b/src/DynamicData/Cache/CacheChangeSetEx.cs index 18640b395..382857b45 100644 --- a/src/DynamicData/Cache/CacheChangeSetEx.cs +++ b/src/DynamicData/Cache/CacheChangeSetEx.cs @@ -2,6 +2,8 @@ // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. +using DynamicData.Kernel; + namespace DynamicData.Cache; internal static class CacheChangeSetEx @@ -24,4 +26,32 @@ public static ChangeSet ToConcreteType(this IChang where TObject : notnull where TKey : notnull => changeSet as ChangeSet ?? throw new NotSupportedException("Dynamic Data does not support a custom implementation of IChangeSet"); + + /// + /// Transforms the change set into a different type using the specified transform function. + /// + /// The type of the source. + /// The type of the destination. + /// The type of the Key. + /// The source. + /// The transformer. + /// The change set. + /// + /// source + /// or + /// transformer. + /// + public static IChangeSet Transform(this IChangeSet source, Func transformer) + where TSource : notnull + where TDestination : notnull + where TKey : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformer.ThrowArgumentNullExceptionIfNull(nameof(transformer)); + + var changes = source.Select(change => + new Change(change.Reason, change.Key, transformer(change.Current), change.Previous.Convert(transformer), change.CurrentIndex, change.PreviousIndex)); + + return new ChangeSet(changes); + } } diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs index aa24816f2..f71552727 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSetsSourceCompare.cs @@ -19,7 +19,7 @@ internal sealed class MergeManyCacheChangeSetsSourceCompare>> _changeSetSelector = (obj, key) => selector(obj, key).Transform(dest => new ParentChildEntry(obj, dest)); - private readonly IComparer? _comparer = (childCompare is null) ? new ParentOnlyCompare(parentCompare) : new ParentChildCompare(parentCompare, childCompare); + private readonly IComparer _comparer = (childCompare is null) ? new ParentOnlyCompare(parentCompare) : new ParentChildCompare(parentCompare, childCompare); private readonly IEqualityComparer? _equalityComparer = (equalityComparer != null) ? new ParentChildEqualityCompare(equalityComparer) : null; @@ -30,25 +30,25 @@ public IObservable> Run() => Observabl // Transform to an observable cache of merge containers. var sourceCacheOfCaches = source - .Transform((obj, key) => new ChangeSetCache(_changeSetSelector(obj, key))) - .Synchronize(locker) + .Transform((obj, key) => new ChangeSetCache(_changeSetSelector(obj, key).Synchronize(locker))) .AsObservableCache(); + // Share a single connection to the cache var shared = sourceCacheOfCaches.Connect().Publish(); - // this is manages all of the changes + // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, _comparer, _equalityComparer); - // merge the items back together + // Merge the child changeset changes together and apply to the tracker var allChanges = shared.MergeMany(mc => mc.Source) - .Synchronize(locker) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), observer.OnError, observer.OnCompleted); - // when a source item is removed, all of its sub-items need to be removed + // When a source item is removed, all of its sub-items need to be removed var removedItems = shared + .Synchronize(locker) .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) .Subscribe(); @@ -56,11 +56,14 @@ public IObservable> Run() => Observabl // If requested, when the source sees a refresh event, re-evaluate all the keys associated with that source because the priority may have changed // Because the comparison is based on the parent, which has just been refreshed. var refreshItems = reevalOnRefresh - ? shared.OnItemRefreshed(mc => changeTracker.RefreshItems(mc.Cache.Keys, observer)).Subscribe() + ? shared + .Synchronize(locker) + .OnItemRefreshed(mc => changeTracker.RefreshItems(mc.Cache.Keys, observer)) + .Subscribe() : Disposable.Empty; return new CompositeDisposable(sourceCacheOfCaches, allChanges, removedItems, refreshItems, shared.Connect()); - }).Transform(entry => entry.Child); + }).Select(changes => changes.Transform(entry => entry.Child)); private sealed class ParentChildEntry(TObject parent, TDestination child) {