diff --git a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt index 63a91b115..be978158d 100644 --- a/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt +++ b/src/DynamicData.Tests/API/ApiApprovalTests.DynamicDataTests.DotNet8_0.verified.txt @@ -1585,15 +1585,27 @@ namespace DynamicData public static System.IObservable> OnItemAdded(this System.IObservable> source, System.Action addAction) where TObject : notnull where TKey : notnull { } + public static System.IObservable> OnItemAdded(this System.IObservable> source, System.Action addAction) + where TObject : notnull + where TKey : notnull { } public static System.IObservable> OnItemRefreshed(this System.IObservable> source, System.Action refreshAction) where TObject : notnull where TKey : notnull { } + public static System.IObservable> OnItemRefreshed(this System.IObservable> source, System.Action refreshAction) + where TObject : notnull + where TKey : notnull { } public static System.IObservable> OnItemRemoved(this System.IObservable> source, System.Action removeAction, bool invokeOnUnsubscribe = true) where TObject : notnull where TKey : notnull { } + public static System.IObservable> OnItemRemoved(this System.IObservable> source, System.Action removeAction, bool invokeOnUnsubscribe = true) + where TObject : notnull + where TKey : notnull { } public static System.IObservable> OnItemUpdated(this System.IObservable> source, System.Action updateAction) where TObject : notnull where TKey : notnull { } + public static System.IObservable> OnItemUpdated(this System.IObservable> source, System.Action updateAction) + where TObject : notnull + where TKey : notnull { } public static System.IObservable> Or(this DynamicData.IObservableList> sources) where TObject : notnull where TKey : notnull { } @@ -1931,6 +1943,14 @@ namespace DynamicData where TSource : notnull where TSourceKey : notnull where TCollection : System.Collections.Specialized.INotifyCollectionChanged, System.Collections.Generic.IEnumerable { } + public static System.IObservable> TransformOnObservable(this System.IObservable> source, System.Func> transformFactory) + where TSource : notnull + where TKey : notnull + where TDestination : notnull { } + public static System.IObservable> TransformOnObservable(this System.IObservable> source, System.Func> transformFactory) + where TSource : notnull + where TKey : notnull + where TDestination : notnull { } public static System.IObservable> TransformSafe(this System.IObservable> source, System.Func transformFactory, System.Action> errorHandler, System.IObservable forceTransform) where TDestination : notnull where TSource : notnull diff --git a/src/DynamicData.Tests/Cache/TransformOnObservableFixture.cs b/src/DynamicData.Tests/Cache/TransformOnObservableFixture.cs new file mode 100644 index 000000000..8b4dc16ff --- /dev/null +++ b/src/DynamicData.Tests/Cache/TransformOnObservableFixture.cs @@ -0,0 +1,164 @@ +using System; +using System.Linq; +using System.Reactive; +using System.Reactive.Linq; +using System.Threading.Tasks; +using Bogus; +using DynamicData.Kernel; +using DynamicData.Tests.Domain; +using DynamicData.Tests.Utilities; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Cache; + +public class TransformOnObservableFixture : IDisposable +{ +#if DEBUG + private const int InitialCount = 7; + private const int AddCount = 5; + private const int RemoveCount = 3; + private const int UpdateCount = 2; +#else + private const int InitialCount = 103; + private const int AddCount = 53; + private const int RemoveCount = 37; + private const int UpdateCount = 31; +#endif + private static readonly TimeSpan UpdateTime = TimeSpan.FromMilliseconds(50); + + private readonly ISourceCache _animalCache = new SourceCache(a => a.Id); + private readonly ChangeSetAggregator _animalResults; + private readonly Faker _animalFaker; + private readonly Randomizer _randomizer = new (0x2112_2112); + + public TransformOnObservableFixture() + { + _animalFaker = Fakers.Animal.Clone().WithSeed(_randomizer); + _animalCache.AddOrUpdate(_animalFaker.Generate(InitialCount)); + _animalResults = _animalCache.Connect().AsAggregator(); + } + + [Fact] + public void ResultContainsAllInitialChildren() + { + // Arrange + + // Act + using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator(); + + // Assert + _animalResults.Data.Count.Should().Be(InitialCount); + results.Data.Count.Should().Be(InitialCount); + results.Messages.Count.Should().Be(1, "The child observables fire on subscription so everything should appear as a single changeset"); + } + + [Fact] + public void ResultContainsAddedValues() + { + // Arrange + using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator(); + + // Act + _animalCache.AddOrUpdate(_animalFaker.Generate(AddCount)); + + // Assert + _animalResults.Data.Count.Should().Be(InitialCount + AddCount); + results.Data.Count.Should().Be(_animalResults.Data.Count); + results.Messages.Count.Should().Be(2, "Initial Adds and then the subsequent Additions should each be a single message"); + } + + [Fact] + public void ResultDoesNotContainRemovedValues() + { + // Arrange + using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator(); + + // Act + _animalCache.RemoveKeys(_randomizer.ListItems(_animalCache.Items.ToList(), RemoveCount).Select(a => a.Id)); + + // Assert + _animalResults.Data.Count.Should().Be(InitialCount - RemoveCount); + results.Data.Count.Should().Be(_animalResults.Data.Count); + results.Messages.Count.Should().Be(2, "1 for Adds and 1 for Removes"); + } + + [Fact] + public async Task ResultUpdatesOnFutureValues() + { + // Create an observable that fires a wrong value on an interval a fixed number of times + // then fires the expected value before completing + IObservable CreateChildObs(Animal a, int id) => + Observable.Interval(UpdateTime) + .Select(n => $"{a.Name}-{id}-{n}") + .Take(UpdateCount) + .Concat(Observable.Return(a.Name)); + + // Arrange + var shared = _animalCache.Connect().TransformOnObservable(CreateChildObs).Publish(); + using var results = shared.AsAggregator(); + var task = Task.Run(async () => await shared); + using var cleanup = shared.Connect(); + _animalCache.Dispose(); + + // Act + await task; + + // Assert + _animalResults.Data.Count.Should().Be(InitialCount); + results.Data.Count.Should().Be(_animalResults.Data.Count); + results.Summary.Overall.Adds.Should().Be(InitialCount); + results.Summary.Overall.Updates.Should().Be(InitialCount * UpdateCount, $"Each item should update {UpdateCount} times"); + results.Messages.Count.Should().BeGreaterThanOrEqualTo(1, "The delay may cause the messages to appear as multiple changesets"); + _animalCache.Items.ForEach(animal => results.Data.Lookup(animal.Id).Should().Be(Optional.Some(animal.Name))); + } + + [Theory] + [InlineData(false, false)] + [InlineData(false, true)] + [InlineData(true, false)] + [InlineData(true, true)] + public void ResultCompletesOnlyWhenSourceAndAllChildrenComplete(bool completeSource, bool completeChildren) + { + IObservable CreateChildObs(Animal a, int id) => + completeChildren + ? Observable.Return(a.Name) + : Observable.Return(a.Name).Concat(Observable.Never()); + + // Arrange + using var results = _animalCache.Connect().TransformOnObservable(CreateChildObs).AsAggregator(); + + // Act + if (completeSource) + { + _animalCache.Dispose(); + } + + // Assert + _animalResults.IsCompleted.Should().Be(completeSource); + results.IsCompleted.Should().Be(completeSource && completeChildren); + } + + [Fact] + public void ResultFailsIfSourceFails() + { + // Arrange + var expectedError = new Exception("Expected"); + var throwObservable = Observable.Throw>(expectedError); + using var results = _animalCache.Connect().Concat(throwObservable).TransformOnObservable(animal => Observable.Return(animal)).AsAggregator(); + + // Act + _animalCache.Dispose(); + + // Assert + results.Error.Should().Be(expectedError); + } + + public void Dispose() + { + _animalCache.Dispose(); + _animalResults.Dispose(); + } +} diff --git a/src/DynamicData/Cache/Internal/OnBeingRemoved.cs b/src/DynamicData/Cache/Internal/OnBeingRemoved.cs index cc695a20e..3e1a24ecc 100644 --- a/src/DynamicData/Cache/Internal/OnBeingRemoved.cs +++ b/src/DynamicData/Cache/Internal/OnBeingRemoved.cs @@ -9,11 +9,11 @@ namespace DynamicData.Cache.Internal; -internal sealed class OnBeingRemoved(IObservable> source, Action removeAction, bool invokeOnUnsubscribe) +internal sealed class OnBeingRemoved(IObservable> source, Action removeAction) where TObject : notnull where TKey : notnull { - private readonly Action _removeAction = removeAction ?? throw new ArgumentNullException(nameof(removeAction)); + private readonly Action _removeAction = removeAction ?? throw new ArgumentNullException(nameof(removeAction)); private readonly IObservable> _source = source ?? throw new ArgumentNullException(nameof(source)); public IObservable> Run() => Observable.Create>( @@ -30,11 +30,7 @@ public IObservable> Run() => Observable.Create _removeAction(t)); - } - + cache.KeyValues.ForEach(kvp => _removeAction(kvp.Value, kvp.Key)); cache.Clear(); } }); @@ -49,7 +45,7 @@ private void RegisterForRemoval(IChangeSet changes, Cache(IObservable> source, Func> transform) + where TSource : notnull + where TKey : notnull + where TDestination : notnull +{ + public IObservable> Run() => Observable.Create>(observer => + { + var cache = new ChangeAwareCache(); + var locker = new object(); + var pendingUpdates = 0; + + // Helper to emit any pending changes when all the updates have been handled + void EmitChanges() + { + if (Interlocked.Decrement(ref pendingUpdates) == 0) + { + var changes = cache!.CaptureChanges(); + if (changes.Count > 0) + { + observer.OnNext(changes); + } + } + } + + // Create the sub-observable that takes the result of the transformation, + // filters out unchanged values, and then updates the cache + IObservable CreateSubObservable(TSource obj, TKey key) => + transform(obj, key) + .DistinctUntilChanged() + .Do(_ => Interlocked.Increment(ref pendingUpdates)) + .Synchronize(locker!) + .Do(val => cache!.AddOrUpdate(val, key)); + + // Always increment the counter OUTSIDE of the lock to signal any thread currently holding the lock + // to not emit the changeset because more changes are incoming. + var shared = source + .Do(_ => Interlocked.Increment(ref pendingUpdates)) + .Synchronize(locker!) + .Publish(); + + // Use MergeMany because it automatically handles Add/Update/Remove and OnCompleted/OnError correctly + var subMerged = shared + .MergeMany(CreateSubObservable) + .SubscribeSafe(_ => EmitChanges(), observer.OnError, observer.OnCompleted); + + // Subscribe to the shared Observable to handle Remove events. MergeMany will unsubscribe from the sub-observable, + // but the corresponding key value needs to be removed from the Cache so the remove is observed downstream. + var subRemove = shared + .OnItemRemoved((_, key) => cache!.Remove(key), invokeOnUnsubscribe: false) + .SubscribeSafe(_ => EmitChanges()); + + return new CompositeDisposable(shared.Connect(), subMerged, subRemove); + }); +} diff --git a/src/DynamicData/Cache/ObservableCacheEx.cs b/src/DynamicData/Cache/ObservableCacheEx.cs index 9244fe607..ac9006cef 100644 --- a/src/DynamicData/Cache/ObservableCacheEx.cs +++ b/src/DynamicData/Cache/ObservableCacheEx.cs @@ -3189,55 +3189,71 @@ public static IObservable> NotEmpty(thi /// The type of the object. /// The type of the key. /// The source. - /// The add action. + /// The add action that takes the new value and the associated key. /// An observable which emits a change set with items being added. - public static IObservable> OnItemAdded(this IObservable> source, Action addAction) + public static IObservable> OnItemAdded(this IObservable> source, Action addAction) where TObject : notnull where TKey : notnull { source.ThrowArgumentNullExceptionIfNull(nameof(source)); addAction.ThrowArgumentNullExceptionIfNull(nameof(addAction)); - return source.Do(changes => changes.Where(c => c.Reason == ChangeReason.Add).ForEach(c => addAction(c.Current))); + return source.OnChangeAction(ChangeReason.Add, addAction); } + /// + /// Callback for each item as and when it is being added to the stream. + /// + /// The type of the object. + /// The type of the key. + /// The source. + /// The add action that takes the new value. + /// An observable which emits a change set with items being added. + /// Overload for with a callback that doesn't use a key. + public static IObservable> OnItemAdded(this IObservable> source, Action addAction) + where TObject : notnull + where TKey : notnull + => source.OnItemAdded((obj, _) => addAction(obj)); + /// /// Callback for each item as and when it is being refreshed in the stream. /// /// The type of the object. /// The type of the key. /// The source. - /// The refresh action. + /// The refresh action that takes the refreshed value and the key. /// An observable which emits a change set with items being added. - public static IObservable> OnItemRefreshed(this IObservable> source, Action refreshAction) + public static IObservable> OnItemRefreshed(this IObservable> source, Action refreshAction) where TObject : notnull where TKey : notnull { - var refreshAction2 = refreshAction; source.ThrowArgumentNullExceptionIfNull(nameof(source)); - refreshAction2.ThrowArgumentNullExceptionIfNull(nameof(refreshAction)); + refreshAction.ThrowArgumentNullExceptionIfNull(nameof(refreshAction)); - return source.Do(changes => - { - foreach (var change in changes.ToConcreteType()) - { - if (change.Reason != ChangeReason.Refresh) - { - continue; - } - - refreshAction2(change.Current); - } - }); + return source.OnChangeAction(ChangeReason.Refresh, refreshAction); } /// - /// Callback for each item as and when it is being removed from the stream. + /// Callback for each item as and when it is being refreshed in the stream. + /// + /// The type of the object. + /// The type of the key. + /// The source. + /// The refresh action that takes the refreshed value. + /// An observable which emits a change set with items being added. + /// Overload for with a callback that doesn't use a key. + public static IObservable> OnItemRefreshed(this IObservable> source, Action refreshAction) + where TObject : notnull + where TKey : notnull + => source.OnItemRefreshed((obj, _) => refreshAction(obj)); + + /// + /// Callback for each item/key as and when it is being removed from the stream. /// /// The type of the object. /// The type of the key. /// The source. - /// The remove action. + /// The remove action that takes the removed value and the key. /// Should the remove action be invoked when the subscription is disposed. /// An observable which emits a change set with items being removed. /// @@ -3245,34 +3261,73 @@ public static IObservable> OnItemRefreshed - public static IObservable> OnItemRemoved(this IObservable> source, Action removeAction, bool invokeOnUnsubscribe = true) + public static IObservable> OnItemRemoved(this IObservable> source, Action removeAction, bool invokeOnUnsubscribe = true) where TObject : notnull where TKey : notnull { source.ThrowArgumentNullExceptionIfNull(nameof(source)); removeAction.ThrowArgumentNullExceptionIfNull(nameof(removeAction)); - return new OnBeingRemoved(source, removeAction, invokeOnUnsubscribe).Run(); + if (invokeOnUnsubscribe) + { + return new OnBeingRemoved(source, removeAction).Run(); + } + + return source.OnChangeAction(ChangeReason.Remove, removeAction); } + /// + /// Callback for each item as and when it is being removed from the stream. + /// + /// The type of the object. + /// The type of the key. + /// The source. + /// The remove action that takes the removed value. + /// Should the remove action be invoked when the subscription is disposed. + /// An observable which emits a change set with items being removed. + /// + /// source + /// or + /// removeAction. + /// + /// Overload for with a callback that doesn't use the key. + public static IObservable> OnItemRemoved(this IObservable> source, Action removeAction, bool invokeOnUnsubscribe = true) + where TObject : notnull + where TKey : notnull + => source.OnItemRemoved((obj, _) => removeAction(obj), invokeOnUnsubscribe); + /// /// Callback when an item has been updated eg. (current, previous)=>{}. /// /// The type of the object. /// The type of the key. /// The source. - /// The update action. + /// The update action that takes current value, previous value, and the key. /// An observable which emits a change set with items being updated. - public static IObservable> OnItemUpdated(this IObservable> source, Action updateAction) + public static IObservable> OnItemUpdated(this IObservable> source, Action updateAction) where TObject : notnull where TKey : notnull { source.ThrowArgumentNullExceptionIfNull(nameof(source)); updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); - return source.Do(changes => changes.Where(c => c.Reason == ChangeReason.Update).ForEach(c => updateAction(c.Current, c.Previous.Value))); + return source.OnChangeAction(static change => change.Reason == ChangeReason.Update, change => updateAction(change.Current, change.Previous.Value, change.Key)); } + /// + /// Callback when an item has been updated eg. (current, previous)=>{}. + /// + /// The type of the object. + /// The type of the key. + /// The source. + /// The update action that takes the current value and previous value. + /// An observable which emits a change set with items being updated. + /// Overload for with a callback that doesn't use the key. + public static IObservable> OnItemUpdated(this IObservable> source, Action updateAction) + where TObject : notnull + where TKey : notnull + => source.OnItemUpdated((cur, prev, _) => updateAction(cur, prev)); + /// /// Apply a logical Or operator between the collections i.e items which are in any of the sources are included. /// @@ -5222,6 +5277,53 @@ public static IObservable> TransformMa where TSource : notnull where TSourceKey : notnull => source.TransformManySafeAsync((val, _) => manySelector(val), errorHandler, equalityComparer, comparer); + /// + /// Transforms each item in the ChangeSet into an Observable that provides the value for the Resulting ChangeSet. + /// + /// The type of the source changeset. + /// The type of the key. + /// The type of the destination changeset. + /// The source changeset observable. + /// Factory function to create the Observable that will provide the values in the result changeset from the given object in the source changeset. + /// + /// A changeset whose value for a given key is the latest value emitted from the transformed Observable and will update to future values from that observable. + /// + /// source + /// or + /// transformFactory. + public static IObservable> TransformOnObservable(this IObservable> source, Func> transformFactory) + where TSource : notnull + where TKey : notnull + where TDestination : notnull + { + source.ThrowArgumentNullExceptionIfNull(nameof(source)); + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + + return new TransformOnObservable(source, transformFactory).Run(); + } + + /// + /// Transforms each item in the ChangeSet into an Observable that provides the value for the Resulting ChangeSet. + /// + /// The type of the source changeset. + /// The type of the key. + /// The type of the destination changeset. + /// The source changeset observable. + /// Factory function to create the Observable that will provide the values in the result changeset from the given object in the source changeset. + /// + /// A changeset whose value for a given key is the latest value emitted from the transformed Observable and will update to future values from that observable. + /// + /// source or transformFactory. + public static IObservable> TransformOnObservable(this IObservable> source, Func> transformFactory) + where TSource : notnull + where TKey : notnull + where TDestination : notnull + { + transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory)); + + return source.TransformOnObservable((obj, _) => transformFactory(obj)); + } + /// /// Projects each update item to a new form using the specified transform function, /// providing an error handling action to safely handle transform errors without killing the stream. @@ -6225,6 +6327,29 @@ void UpdateAction(IChangeSet updates) return (Func)Transformer; }); + private static IObservable> OnChangeAction(this IObservable> source, Predicate> predicate, Action> changeAction) + where TObject : notnull + where TKey : notnull + { + return source.Do(changes => + { + foreach (var change in changes.ToConcreteType()) + { + if (!predicate(change)) + { + continue; + } + + changeAction(change); + } + }); + } + + private static IObservable> OnChangeAction(this IObservable> source, ChangeReason reason, Action action) + where TObject : notnull + where TKey : notnull + => source.OnChangeAction(change => change.Reason == reason, change => action(change.Current, change.Key)); + private static IObservable TrueFor(this IObservable> source, Func> observableSelector, Func>, bool> collectionMatcher) where TObject : notnull where TKey : notnull