From f8e504a9310bf5acfa10a3e0c27a2d801d404adb Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 17 Dec 2023 19:22:03 -0800 Subject: [PATCH 1/4] Stress Tests for the Cache-Cache Version --- .../Cache/MergeManyChangeSetsCacheFixture.cs | 137 +++++++++++++++++- src/DynamicData.Tests/Domain/Market.cs | 4 + src/DynamicData.Tests/Domain/MarketPrice.cs | 3 + .../Internal/MergeManyCacheChangeSets.cs | 21 ++- 4 files changed, 150 insertions(+), 15 deletions(-) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index ab96b0688..49131a80b 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -1,7 +1,13 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; +using Bogus; using DynamicData.Kernel; using DynamicData.Tests.Domain; using DynamicData.Tests.Utilities; @@ -29,16 +35,85 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable const decimal HighestPrice = BasePrice + PriceOffset + 1.0m; const decimal LowestPrice = BasePrice - 1.0m; - private static readonly Random Random = new (0x21123737); - - private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset); + private static readonly TimeSpan s_MaxAddTime = TimeSpan.FromSeconds(1.0); + private static readonly TimeSpan s_MaxRemoveTime = TimeSpan.FromSeconds(2.0); private readonly ISourceCache _marketCache = new SourceCache(p => p.Id); private readonly ChangeSetAggregator _marketCacheResults; + private readonly Randomizer _randomizer = new (0x21123737); + + private int _uniquePriceId; + public MergeManyChangeSetsCacheFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator(); + [Theory] + [InlineData(5, 7)] + [InlineData(10, 50)] + [InlineData(10, 1_000)] + [InlineData(200, 500)] + [InlineData(1_000, 10)] + public async Task MultiThreadedStressTest(int marketCount, int priceCount) + { + using var priceResults = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices).AsAggregator(); + + _ = await AddRemoveStress(marketCount, priceCount, TaskPoolScheduler.Default) + .Finally(() => CheckResultContents(_marketCacheResults, priceResults)); + } + + [Theory] + [InlineData(5, 7)] + [InlineData(5, 200)] + [InlineData(10, 100)] + [InlineData(20, 50)] + [InlineData(100, 10)] + public void NoDeadlockOrExceptionIfSubscribeDuringModify(int marketCount, int priceCount) + { + // Arrange + Func CreateTest(IScheduler sch, int markets, int prices) => + async () => + { + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices); + + var adding = true; + + using var addMarkets = GenerateMarkets(sch) + .Take(markets) + .StressAddRemove(_marketCache, _ => GetRemoveTime(), sch) + .Finally(() => _marketCache.Dispose()) + .Subscribe(); + + using var addPrices = _marketCache.Connect() + .MergeMany(market => AddRemove((Market)market, sch, prices)) + .Finally(() => adding = false) + .Subscribe(); + + do + { + // Ensure items are being added asynchronously before subscribing to the animal changes + await Task.Yield(); + + { + // Subscribe + var mergedSub = merged.Subscribe(); + + // Let other threads run + await Task.Yield(); + + // Unsubscribe + mergedSub.Dispose(); + } + } + while (adding); + }; + + // Act + + // Assert + CreateTest(TaskPoolScheduler.Default, marketCount, priceCount).Should().NotThrowAsync(); + } + [Fact] public void NullChecks() { @@ -691,10 +766,66 @@ public void Dispose() DisposeMarkets(); } + private IObservable AddRemoveStress(int marketCount, int priceCount, IScheduler scheduler) => + Observable.Create(observer => new CompositeDisposable + { + GenerateMarkets(scheduler) + .Take(marketCount) + .StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler) + .Finally(() => _marketCache.Dispose()) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex)), + + _marketCache.Connect() + .MergeMany(market => AddRemove((Market)market, scheduler, priceCount)) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex), + onCompleted: () => + { + observer.OnNext(Unit.Default); + observer.OnCompleted(); + }) + }); + + private IObservable AddRemove(Market market, IScheduler sch, int addCount) => + GeneratePrices(market, sch) + .Take(addCount) + .StressAddRemove(market.PricesCache, _ => GetRemoveTime(), sch) + .Finally(market.PricesCache.Dispose); + + private IObservable GenerateMarkets(IScheduler scheduler) => + _randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true))); + + private IObservable GeneratePrices(Market market, IScheduler scheduler) => + _randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => market.CreatePrice(Interlocked.Increment(ref _uniquePriceId), GetRandomPrice())); + + private void CheckResultContents(ChangeSetAggregator marketResults, ChangeSetAggregator priceResults) + { + var expectedMarkets = _marketCache.Items.ToList(); + var expectedPrices = expectedMarkets.SelectMany(market => ((Market)market).PricesCache.Items).ToList(); + + // These should be subsets of each other + expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items); + marketResults.Data.Items.Should().BeSubsetOf(expectedMarkets); + marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count); + + // These should be subsets of each other + expectedPrices.Should().BeSubsetOf(priceResults.Data.Items); + priceResults.Data.Items.Should().BeSubsetOf(expectedPrices); + priceResults.Data.Items.Count().Should().Be(expectedPrices.Count); + } + private void DisposeMarkets() { _marketCache.Items.ForEach(m => (m as IDisposable)?.Dispose()); _marketCache.Dispose(); _marketCache.Clear(); } + + private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset); + + private TimeSpan? GetRemoveTime() => _randomizer.Bool() ? NextRemoveTime() : null; + private TimeSpan NextRemoveTime() => _randomizer.TimeSpan(s_MaxRemoveTime); } diff --git a/src/DynamicData.Tests/Domain/Market.cs b/src/DynamicData.Tests/Domain/Market.cs index b7c23b76a..84d4733cf 100644 --- a/src/DynamicData.Tests/Domain/Market.cs +++ b/src/DynamicData.Tests/Domain/Market.cs @@ -39,6 +39,10 @@ public Market(int name) : this($"Market #{name}", Guid.NewGuid()) { } + public Market(string name) : this(name, Guid.NewGuid()) + { + } + public string Name { get; } public Guid Id { get; } diff --git a/src/DynamicData.Tests/Domain/MarketPrice.cs b/src/DynamicData.Tests/Domain/MarketPrice.cs index 05dfaa97b..1ac0cc823 100644 --- a/src/DynamicData.Tests/Domain/MarketPrice.cs +++ b/src/DynamicData.Tests/Domain/MarketPrice.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using Bogus; namespace DynamicData.Tests.Domain; @@ -42,6 +43,8 @@ public decimal Price public static decimal RandomPrice(Random r, decimal basePrice, decimal offset) => basePrice + (decimal)r.NextDouble() * offset; + public static decimal RandomPrice(Randomizer r, decimal basePrice, decimal offset) => r.Decimal(basePrice, basePrice + offset); + private class CurrentPriceEqualityComparer : IEqualityComparer { public virtual bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && x.ItemId == y.ItemId && x.Price == y.Price; diff --git a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs index 96df9dd47..957b2962e 100644 --- a/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs +++ b/src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs @@ -21,29 +21,26 @@ public IObservable> Run() => Observabl { var locker = new object(); - // Transform to an observable cache of merge containers. + // Transform to an observable changeset of cached changesets var sourceCacheOfCaches = source - .IgnoreSameReferenceUpdate() - .WhereReasonsAre(ChangeReason.Add, ChangeReason.Remove, ChangeReason.Update) - .Transform((obj, key) => new ChangeSetCache(selector(obj, key))) - .Synchronize(locker) - .AsObservableCache(); + .Transform((obj, key) => new ChangeSetCache(selector(obj, key).Synchronize(locker))) + .AsObservableCache(); - var shared = sourceCacheOfCaches.Connect().Publish(); - - // this is manages all of the changes + // This is manages all of the changes var changeTracker = new ChangeSetMergeTracker(() => sourceCacheOfCaches.Items, comparer, equalityComparer); - // merge the items back together + var shared = sourceCacheOfCaches.Connect().Publish(); + + // Merge the child changeset changes together and apply to the tracker var allChanges = shared.MergeMany(mc => mc.Source) - .Synchronize(locker) .Subscribe( changes => changeTracker.ProcessChangeSet(changes, observer), observer.OnError, observer.OnCompleted); - // when a source item is removed, all of its sub-items need to be removed + // When a source item is removed, all of its sub-items need to be removed var removedItems = shared + .Synchronize(locker) .OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer)) .OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer)) .Subscribe(); From 98db1a71eeb55217914a9bd59e2077fbc731c017 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 17 Dec 2023 19:35:07 -0800 Subject: [PATCH 2/4] Test tuning --- .../Cache/MergeManyChangeSetsCacheFixture.cs | 6 ++---- src/DynamicData.Tests/Domain/MarketPrice.cs | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index 49131a80b..de5dab9bd 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -51,9 +51,9 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable [Theory] [InlineData(5, 7)] [InlineData(10, 50)] - [InlineData(10, 1_000)] + [InlineData(10, 100)] [InlineData(200, 500)] - [InlineData(1_000, 10)] + [InlineData(100, 10)] public async Task MultiThreadedStressTest(int marketCount, int priceCount) { using var priceResults = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices).AsAggregator(); @@ -808,12 +808,10 @@ private void CheckResultContents(ChangeSetAggregator marketResult // These should be subsets of each other expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items); - marketResults.Data.Items.Should().BeSubsetOf(expectedMarkets); marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count); // These should be subsets of each other expectedPrices.Should().BeSubsetOf(priceResults.Data.Items); - priceResults.Data.Items.Should().BeSubsetOf(expectedPrices); priceResults.Data.Items.Count().Should().Be(expectedPrices.Count); } diff --git a/src/DynamicData.Tests/Domain/MarketPrice.cs b/src/DynamicData.Tests/Domain/MarketPrice.cs index 1ac0cc823..c02d8c3f3 100644 --- a/src/DynamicData.Tests/Domain/MarketPrice.cs +++ b/src/DynamicData.Tests/Domain/MarketPrice.cs @@ -45,18 +45,26 @@ public decimal Price public static decimal RandomPrice(Randomizer r, decimal basePrice, decimal offset) => r.Decimal(basePrice, basePrice + offset); + public override bool Equals(object? obj) => obj is MarketPrice price && Price == price.Price && TimeStamp.Equals(price.TimeStamp) && MarketId.Equals(price.MarketId) && ItemId == price.ItemId; + + public override int GetHashCode() => HashCode.Combine(Price, TimeStamp, MarketId, ItemId); + + public static bool operator ==(MarketPrice? left, MarketPrice? right) => EqualityComparer.Default.Equals(left, right); + + public static bool operator !=(MarketPrice? left, MarketPrice? right) => !(left == right); + private class CurrentPriceEqualityComparer : IEqualityComparer { public virtual bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && x.ItemId == y.ItemId && x.Price == y.Price; public int GetHashCode([DisallowNull] MarketPrice obj) => throw new NotImplementedException(); } - private class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer + private sealed class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer { public override bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => base.Equals(x, y) && x.TimeStamp == y.TimeStamp; } - private class LowestPriceComparer : IComparer + private sealed class LowestPriceComparer : IComparer { public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) { @@ -65,7 +73,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) } } - private class HighestPriceComparer : IComparer + private sealed class HighestPriceComparer : IComparer { public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) { @@ -74,7 +82,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) } } - private class LatestPriceComparer : IComparer + private sealed class LatestPriceComparer : IComparer { public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) { From 656576771bcaa9ba437a76ad4d081fe2b03cb3c7 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 18 Dec 2023 15:08:46 -0800 Subject: [PATCH 3/4] Improve Stress for Cache-Cache version --- .../Cache/MergeManyChangeSetsCacheFixture.cs | 168 ++++++++---------- src/DynamicData.Tests/Domain/Market.cs | 2 + 2 files changed, 74 insertions(+), 96 deletions(-) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index de5dab9bd..3d28dfc55 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -35,9 +35,6 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable const decimal HighestPrice = BasePrice + PriceOffset + 1.0m; const decimal LowestPrice = BasePrice - 1.0m; - private static readonly TimeSpan s_MaxAddTime = TimeSpan.FromSeconds(1.0); - private static readonly TimeSpan s_MaxRemoveTime = TimeSpan.FromSeconds(2.0); - private readonly ISourceCache _marketCache = new SourceCache(p => p.Id); private readonly ChangeSetAggregator _marketCacheResults; @@ -51,67 +48,81 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable [Theory] [InlineData(5, 7)] [InlineData(10, 50)] - [InlineData(10, 100)] + [InlineData(5, 100)] [InlineData(200, 500)] - [InlineData(100, 10)] + [InlineData(100, 5)] public async Task MultiThreadedStressTest(int marketCount, int priceCount) { - using var priceResults = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices).AsAggregator(); - - _ = await AddRemoveStress(marketCount, priceCount, TaskPoolScheduler.Default) - .Finally(() => CheckResultContents(_marketCacheResults, priceResults)); - } - - [Theory] - [InlineData(5, 7)] - [InlineData(5, 200)] - [InlineData(10, 100)] - [InlineData(20, 50)] - [InlineData(100, 10)] - public void NoDeadlockOrExceptionIfSubscribeDuringModify(int marketCount, int priceCount) - { - // Arrange - Func CreateTest(IScheduler sch, int markets, int prices) => - async () => - { - var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices); + var MaxAddTime = TimeSpan.FromSeconds(0.250); + var MaxRemoveTime = TimeSpan.FromSeconds(0.100); - var adding = true; + TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null; - using var addMarkets = GenerateMarkets(sch) - .Take(markets) - .StressAddRemove(_marketCache, _ => GetRemoveTime(), sch) - .Finally(() => _marketCache.Dispose()) - .Subscribe(); - - using var addPrices = _marketCache.Connect() - .MergeMany(market => AddRemove((Market)market, sch, prices)) - .Finally(() => adding = false) - .Subscribe(); - - do + IObservable AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) => + Observable.Create(observer => new CompositeDisposable { - // Ensure items are being added asynchronously before subscribing to the animal changes - await Task.Yield(); - - { - // Subscribe - var mergedSub = merged.Subscribe(); + AddRemoveMarkets(marketCount, parallel, scheduler) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex)), + + _marketCache.Connect() + .MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler)) + .Subscribe( + onNext: _ => { }, + onError: ex => observer.OnError(ex), + onCompleted: () => + { + observer.OnNext(Unit.Default); + observer.OnCompleted(); + }) + }); + + IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) => + _randomizer.Interval(MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true))) + //.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)) + .Take(ownerCount) + .StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler) + .Finally(_marketCache.Dispose); + + IObservable AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) => + _randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreatePrice(Interlocked.Increment(ref _uniquePriceId), GetRandomPrice())) + //.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler)) + .Take(priceCount) + .StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler) + .Finally(market.PricesCache.Dispose); + + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices); + using var priceResults = merged.AsAggregator(); + + var adding = true; + + // Start asynchrononously modifying the parent list and the child lists + using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) + .Finally(() => adding = false) + .Subscribe(); + + // Subscribe / unsubscribe over and over while the collections are being modified + do + { + // Ensure items are being added asynchronously before subscribing to changes + await Task.Yield(); - // Let other threads run - await Task.Yield(); + { + // Subscribe + var mergedSub = merged.Subscribe(); - // Unsubscribe - mergedSub.Dispose(); - } - } - while (adding); - }; + // Let other threads run + await Task.Yield(); - // Act + // Unsubscribe + mergedSub.Dispose(); + } + } + while (adding); - // Assert - CreateTest(TaskPoolScheduler.Default, marketCount, priceCount).Should().NotThrowAsync(); + // Verify the results + CheckResultContents(_marketCacheResults, priceResults); } [Fact] @@ -211,7 +222,7 @@ public void AllExistingSubItemsPresentInResult() // having var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when _marketCache.AddOrUpdate(markets); @@ -235,7 +246,7 @@ public void AllNewSubItemsPresentInResult() _marketCache.AddOrUpdate(markets); // when - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // then _marketCacheResults.Data.Count.Should().Be(MarketCount); @@ -254,7 +265,7 @@ public void AllRefreshedSubItemsAreRefreshed() var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); _marketCache.AddOrUpdate(markets); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when markets.ForEach(m => m.RefreshAllPrices(GetRandomPrice)); @@ -360,7 +371,7 @@ public void AnyRemovedSubItemIsRemoved() var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); _marketCache.AddOrUpdate(markets); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when markets.ForEach(m => m.PricesCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount)))); @@ -381,7 +392,7 @@ public void AnySourceItemRemovedRemovesAllSourceValues() var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray(); using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator(); _marketCache.AddOrUpdate(markets); - markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice)); + AddUniquePrices(markets); // when _marketCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount))); @@ -766,40 +777,8 @@ public void Dispose() DisposeMarkets(); } - private IObservable AddRemoveStress(int marketCount, int priceCount, IScheduler scheduler) => - Observable.Create(observer => new CompositeDisposable - { - GenerateMarkets(scheduler) - .Take(marketCount) - .StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler) - .Finally(() => _marketCache.Dispose()) - .Subscribe( - onNext: _ => { }, - onError: ex => observer.OnError(ex)), - - _marketCache.Connect() - .MergeMany(market => AddRemove((Market)market, scheduler, priceCount)) - .Subscribe( - onNext: _ => { }, - onError: ex => observer.OnError(ex), - onCompleted: () => - { - observer.OnNext(Unit.Default); - observer.OnCompleted(); - }) - }); - - private IObservable AddRemove(Market market, IScheduler sch, int addCount) => - GeneratePrices(market, sch) - .Take(addCount) - .StressAddRemove(market.PricesCache, _ => GetRemoveTime(), sch) - .Finally(market.PricesCache.Dispose); - - private IObservable GenerateMarkets(IScheduler scheduler) => - _randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true))); - - private IObservable GeneratePrices(Market market, IScheduler scheduler) => - _randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => market.CreatePrice(Interlocked.Increment(ref _uniquePriceId), GetRandomPrice())); + private void AddUniquePrices(Market[] markets) => + markets.ForEach(m => Enumerable.Range(0, PricesPerMarket).ForEach(_ => m.SetPrice(Interlocked.Increment(ref _uniquePriceId), GetRandomPrice))); private void CheckResultContents(ChangeSetAggregator marketResults, ChangeSetAggregator priceResults) { @@ -823,7 +802,4 @@ private void DisposeMarkets() } private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset); - - private TimeSpan? GetRemoveTime() => _randomizer.Bool() ? NextRemoveTime() : null; - private TimeSpan NextRemoveTime() => _randomizer.TimeSpan(s_MaxRemoveTime); } diff --git a/src/DynamicData.Tests/Domain/Market.cs b/src/DynamicData.Tests/Domain/Market.cs index 7a9858e5f..516e90c86 100644 --- a/src/DynamicData.Tests/Domain/Market.cs +++ b/src/DynamicData.Tests/Domain/Market.cs @@ -99,6 +99,8 @@ public Market SetPrices(int minId, int maxId, Func getPrice) => th public Market SetPrices(int minId, int maxId, decimal newPrice) => SetPrices(minId, maxId, _ => newPrice); + public Market SetPrice(int id, Func getPrice) => this.With(_ => _latestPrices.AddOrUpdate(CreatePrice(id, getPrice()))); + public void Dispose() => _latestPrices.Dispose(); public override string ToString() => $"Market '{Name}' [{Id}] (Rating: {Rating})"; From 6e833c3c1281b51e8162f2c3ecab436e8933d4d7 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 18 Dec 2023 15:35:33 -0800 Subject: [PATCH 4/4] Cache-to-Cache Improvements --- .../Cache/MergeManyChangeSetsCacheFixture.cs | 7 ++----- src/DynamicData.Tests/Domain/Fakers.cs | 2 ++ src/DynamicData.Tests/Domain/Market.cs | 15 +++++++++++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs index 3d28dfc55..896656840 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs @@ -41,8 +41,6 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable private readonly Randomizer _randomizer = new (0x21123737); - private int _uniquePriceId; - public MergeManyChangeSetsCacheFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator(); [Theory] @@ -86,7 +84,7 @@ IObservable AddRemoveMarkets(int ownerCount, int parallel, IScheduler s .Finally(_marketCache.Dispose); IObservable AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) => - _randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreatePrice(Interlocked.Increment(ref _uniquePriceId), GetRandomPrice())) + _randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice())) //.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler)) .Take(priceCount) .StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler) @@ -777,8 +775,7 @@ public void Dispose() DisposeMarkets(); } - private void AddUniquePrices(Market[] markets) => - markets.ForEach(m => Enumerable.Range(0, PricesPerMarket).ForEach(_ => m.SetPrice(Interlocked.Increment(ref _uniquePriceId), GetRandomPrice))); + private void AddUniquePrices(Market[] markets) => markets.ForEach(m => m.AddUniquePrices(PricesPerMarket, _ => GetRandomPrice())); private void CheckResultContents(ChangeSetAggregator marketResults, ChangeSetAggregator priceResults) { diff --git a/src/DynamicData.Tests/Domain/Fakers.cs b/src/DynamicData.Tests/Domain/Fakers.cs index a71cb37a7..069b0fcf6 100644 --- a/src/DynamicData.Tests/Domain/Fakers.cs +++ b/src/DynamicData.Tests/Domain/Fakers.cs @@ -45,6 +45,8 @@ internal static class Fakers public static Faker AnimalOwnerWithAnimals { get; } = AnimalOwner.Clone().WithInitialAnimals(Animal); + public static Faker Market { get; } = new Faker().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Market Id#{faker.Random.AlphaNumeric(5)}")); + public static Faker WithInitialAnimals(this Faker existing, Faker animalFaker, int minCount, int maxCount) => existing.FinishWith((faker, owner) => owner.Animals.AddRange(animalFaker.GenerateLazy(faker.Random.Number(minCount, maxCount)))); diff --git a/src/DynamicData.Tests/Domain/Market.cs b/src/DynamicData.Tests/Domain/Market.cs index 516e90c86..a100e6df3 100644 --- a/src/DynamicData.Tests/Domain/Market.cs +++ b/src/DynamicData.Tests/Domain/Market.cs @@ -3,6 +3,7 @@ using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Reactive.Linq; +using System.Threading; using DynamicData.Kernel; using DynamicData.Tests.Utilities; @@ -21,6 +22,8 @@ internal interface IMarket internal sealed class Market : IMarket, IDisposable { + private static int s_UniquePriceId; + private readonly ISourceCache _latestPrices = new SourceCache(p => p.ItemId); public static IComparer RatingCompare { get; } = new RatingComparer(); @@ -55,6 +58,12 @@ public Market(string name) : this(name, Guid.NewGuid()) public MarketPrice CreatePrice(int itemId, decimal price) => new(itemId, price, Id); + public MarketPrice CreateUniquePrice(Func getPrice) + { + var id = Interlocked.Increment(ref s_UniquePriceId); + return CreatePrice(id, getPrice(id)); + } + public Market AddRandomIdPrices(Random r, int count, int minId, int maxId, Func randPrices) { _latestPrices.AddOrUpdate(Enumerable.Range(0, int.MaxValue).Select(_ => r.Next(minId, maxId)).Distinct().Take(count).Select(id => CreatePrice(id, randPrices()))); @@ -101,10 +110,16 @@ public Market SetPrices(int minId, int maxId, Func getPrice) => th public Market SetPrice(int id, Func getPrice) => this.With(_ => _latestPrices.AddOrUpdate(CreatePrice(id, getPrice()))); + public Market AddUniquePrices(int count, Func getPrice) => + this.With(_ => _latestPrices.AddOrUpdate(CreateUniquePrices(count, getPrice))); + public void Dispose() => _latestPrices.Dispose(); public override string ToString() => $"Market '{Name}' [{Id}] (Rating: {Rating})"; + private IEnumerable CreateUniquePrices(int count, Func getPrice) => + Enumerable.Range(0, count).Select(_ => CreateUniquePrice(getPrice)); + private class RatingComparer : IComparer { public int Compare([DisallowNull] IMarket x, [DisallowNull] IMarket y) =>