Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Stress Tests for Cache-to-Cache MergeManyChangeSets #802

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 111 additions & 9 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
Expand Down Expand Up @@ -29,16 +35,94 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable
const decimal HighestPrice = BasePrice + PriceOffset + 1.0m;
const decimal LowestPrice = BasePrice - 1.0m;

private static readonly Random Random = new (0x21123737);

private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset);

private readonly ISourceCache<IMarket, Guid> _marketCache = new SourceCache<IMarket, Guid>(p => p.Id);

private readonly ChangeSetAggregator<IMarket, Guid> _marketCacheResults;

private readonly Randomizer _randomizer = new (0x21123737);

public MergeManyChangeSetsCacheFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator();

[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(5, 100)]
[InlineData(200, 500)]
[InlineData(100, 5)]
public async Task MultiThreadedStressTest(int marketCount, int priceCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
var MaxRemoveTime = TimeSpan.FromSeconds(0.100);

TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null;

IObservable<Unit> AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveMarkets(marketCount, parallel, scheduler)
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex)),

_marketCache.Connect()
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});

IObservable<IMarket> AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true)))
//.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler))
.Take(ownerCount)
.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)
.Finally(_marketCache.Dispose);

IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice()))
//.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler))
.Take(priceCount)
.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)
.Finally(market.PricesCache.Dispose);

var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices);
using var priceResults = merged.AsAggregator();

var adding = true;

// Start asynchrononously modifying the parent list and the child lists
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
.Finally(() => adding = false)
.Subscribe();

// Subscribe / unsubscribe over and over while the collections are being modified
do
{
// Ensure items are being added asynchronously before subscribing to changes
await Task.Yield();

{
// Subscribe
var mergedSub = merged.Subscribe();

// Let other threads run
await Task.Yield();

// Unsubscribe
mergedSub.Dispose();
}
}
while (adding);

// Verify the results
CheckResultContents(_marketCacheResults, priceResults);
}

[Fact]
public void NullChecks()
{
Expand Down Expand Up @@ -136,7 +220,7 @@ public void AllExistingSubItemsPresentInResult()
// having
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
_marketCache.AddOrUpdate(markets);
Expand All @@ -160,7 +244,7 @@ public void AllNewSubItemsPresentInResult()
_marketCache.AddOrUpdate(markets);

// when
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// then
_marketCacheResults.Data.Count.Should().Be(MarketCount);
Expand All @@ -179,7 +263,7 @@ public void AllRefreshedSubItemsAreRefreshed()
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
_marketCache.AddOrUpdate(markets);
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
markets.ForEach(m => m.RefreshAllPrices(GetRandomPrice));
Expand Down Expand Up @@ -285,7 +369,7 @@ public void AnyRemovedSubItemIsRemoved()
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
_marketCache.AddOrUpdate(markets);
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
markets.ForEach(m => m.PricesCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount))));
Expand All @@ -306,7 +390,7 @@ public void AnySourceItemRemovedRemovesAllSourceValues()
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
_marketCache.AddOrUpdate(markets);
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
_marketCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount)));
Expand Down Expand Up @@ -691,10 +775,28 @@ public void Dispose()
DisposeMarkets();
}

private void AddUniquePrices(Market[] markets) => markets.ForEach(m => m.AddUniquePrices(PricesPerMarket, _ => GetRandomPrice()));

private void CheckResultContents(ChangeSetAggregator<IMarket, Guid> marketResults, ChangeSetAggregator<MarketPrice, int> priceResults)
{
var expectedMarkets = _marketCache.Items.ToList();
var expectedPrices = expectedMarkets.SelectMany(market => ((Market)market).PricesCache.Items).ToList();

// These should be subsets of each other
expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items);
marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count);

// These should be subsets of each other
expectedPrices.Should().BeSubsetOf(priceResults.Data.Items);
priceResults.Data.Items.Count().Should().Be(expectedPrices.Count);
}

private void DisposeMarkets()
{
_marketCache.Items.ForEach(m => (m as IDisposable)?.Dispose());
_marketCache.Dispose();
_marketCache.Clear();
}

private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset);
}
2 changes: 2 additions & 0 deletions src/DynamicData.Tests/Domain/Fakers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ internal static class Fakers

public static Faker<AnimalOwner> AnimalOwnerWithAnimals { get; } = AnimalOwner.Clone().WithInitialAnimals(Animal);

public static Faker<Market> Market { get; } = new Faker<Market>().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Market Id#{faker.Random.AlphaNumeric(5)}"));

public static Faker<AnimalOwner> WithInitialAnimals(this Faker<AnimalOwner> existing, Faker<Animal> animalFaker, int minCount, int maxCount) =>
existing.FinishWith((faker, owner) => owner.Animals.AddRange(animalFaker.GenerateLazy(faker.Random.Number(minCount, maxCount))));

Expand Down
21 changes: 21 additions & 0 deletions src/DynamicData.Tests/Domain/Market.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using DynamicData.Kernel;
using DynamicData.Tests.Utilities;

Expand All @@ -21,6 +22,8 @@ internal interface IMarket

internal sealed class Market : IMarket, IDisposable
{
private static int s_UniquePriceId;

private readonly ISourceCache<MarketPrice, int> _latestPrices = new SourceCache<MarketPrice, int>(p => p.ItemId);

public static IComparer<IMarket> RatingCompare { get; } = new RatingComparer();
Expand All @@ -39,6 +42,10 @@ public Market(int name) : this($"Market #{name}", Guid.NewGuid())
{
}

public Market(string name) : this(name, Guid.NewGuid())
{
}

public string Name { get; }

public Guid Id { get; }
Expand All @@ -51,6 +58,12 @@ public Market(int name) : this($"Market #{name}", Guid.NewGuid())

public MarketPrice CreatePrice(int itemId, decimal price) => new(itemId, price, Id);

public MarketPrice CreateUniquePrice(Func<int, decimal> getPrice)
{
var id = Interlocked.Increment(ref s_UniquePriceId);
return CreatePrice(id, getPrice(id));
}

public Market AddRandomIdPrices(Random r, int count, int minId, int maxId, Func<decimal> randPrices)
{
_latestPrices.AddOrUpdate(Enumerable.Range(0, int.MaxValue).Select(_ => r.Next(minId, maxId)).Distinct().Take(count).Select(id => CreatePrice(id, randPrices())));
Expand Down Expand Up @@ -95,10 +108,18 @@ public Market SetPrices(int minId, int maxId, Func<int, decimal> getPrice) => th

public Market SetPrices(int minId, int maxId, decimal newPrice) => SetPrices(minId, maxId, _ => newPrice);

public Market SetPrice(int id, Func<decimal> getPrice) => this.With(_ => _latestPrices.AddOrUpdate(CreatePrice(id, getPrice())));

public Market AddUniquePrices(int count, Func<int, decimal> getPrice) =>
this.With(_ => _latestPrices.AddOrUpdate(CreateUniquePrices(count, getPrice)));

public void Dispose() => _latestPrices.Dispose();

public override string ToString() => $"Market '{Name}' [{Id}] (Rating: {Rating})";

private IEnumerable<MarketPrice> CreateUniquePrices(int count, Func<int, decimal> getPrice) =>
Enumerable.Range(0, count).Select(_ => CreateUniquePrice(getPrice));

private class RatingComparer : IComparer<IMarket>
{
public int Compare([DisallowNull] IMarket x, [DisallowNull] IMarket y) =>
Expand Down
19 changes: 15 additions & 4 deletions src/DynamicData.Tests/Domain/MarketPrice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Bogus;

namespace DynamicData.Tests.Domain;

Expand Down Expand Up @@ -42,18 +43,28 @@ public decimal Price

public static decimal RandomPrice(Random r, decimal basePrice, decimal offset) => basePrice + (decimal)r.NextDouble() * offset;

public static decimal RandomPrice(Randomizer r, decimal basePrice, decimal offset) => r.Decimal(basePrice, basePrice + offset);

public override bool Equals(object? obj) => obj is MarketPrice price && Price == price.Price && TimeStamp.Equals(price.TimeStamp) && MarketId.Equals(price.MarketId) && ItemId == price.ItemId;

public override int GetHashCode() => HashCode.Combine(Price, TimeStamp, MarketId, ItemId);

public static bool operator ==(MarketPrice? left, MarketPrice? right) => EqualityComparer<MarketPrice>.Default.Equals(left, right);

public static bool operator !=(MarketPrice? left, MarketPrice? right) => !(left == right);

private class CurrentPriceEqualityComparer : IEqualityComparer<MarketPrice>
{
public virtual bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && x.ItemId == y.ItemId && x.Price == y.Price;
public int GetHashCode([DisallowNull] MarketPrice obj) => throw new NotImplementedException();
}

private class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer<MarketPrice>
private sealed class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer<MarketPrice>
{
public override bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => base.Equals(x, y) && x.TimeStamp == y.TimeStamp;
}

private class LowestPriceComparer : IComparer<MarketPrice>
private sealed class LowestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Expand All @@ -62,7 +73,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
}
}

private class HighestPriceComparer : IComparer<MarketPrice>
private sealed class HighestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Expand All @@ -71,7 +82,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
}
}

private class LatestPriceComparer : IComparer<MarketPrice>
private sealed class LatestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Expand Down
21 changes: 9 additions & 12 deletions src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,26 @@ public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observabl
{
var locker = new object();

// Transform to an observable cache of merge containers.
// Transform to an observable changeset of cached changesets
var sourceCacheOfCaches = source
.IgnoreSameReferenceUpdate()
.WhereReasonsAre(ChangeReason.Add, ChangeReason.Remove, ChangeReason.Update)
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key)))
.Synchronize(locker)
.AsObservableCache();
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key).Synchronize(locker)))
.AsObservableCache();

var shared = sourceCacheOfCaches.Connect().Publish();

// this is manages all of the changes
// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => sourceCacheOfCaches.Items, comparer, equalityComparer);

// merge the items back together
var shared = sourceCacheOfCaches.Connect().Publish();

// Merge the child changeset changes together and apply to the tracker
var allChanges = shared.MergeMany(mc => mc.Source)
.Synchronize(locker)
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);

// when a source item is removed, all of its sub-items need to be removed
// When a source item is removed, all of its sub-items need to be removed
var removedItems = shared
.Synchronize(locker)
.OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer))
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer))
.Subscribe();
Expand Down