Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: TransformOnObservable Operator for SourceCache #841

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,14 @@ namespace DynamicData
where TSource : notnull
where TSourceKey : notnull
where TCollection : System.Collections.Specialized.INotifyCollectionChanged, System.Collections.Generic.IEnumerable<TDestination> { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafe<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TDestination> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, System.IObservable<System.Reactive.Unit> forceTransform)
where TDestination : notnull
where TSource : notnull
Expand Down Expand Up @@ -2869,4 +2877,4 @@ namespace DynamicData.Tests
public void Dispose() { }
protected virtual void Dispose(bool isDisposing) { }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,14 @@ namespace DynamicData
where TSource : notnull
where TSourceKey : notnull
where TCollection : System.Collections.Specialized.INotifyCollectionChanged, System.Collections.Generic.IEnumerable<TDestination> { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafe<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TDestination> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, System.IObservable<System.Reactive.Unit> forceTransform)
where TDestination : notnull
where TSource : notnull
Expand Down Expand Up @@ -2869,4 +2877,4 @@ namespace DynamicData.Tests
public void Dispose() { }
protected virtual void Dispose(bool isDisposing) { }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,14 @@ namespace DynamicData
where TSource : notnull
where TSourceKey : notnull
where TCollection : System.Collections.Specialized.INotifyCollectionChanged, System.Collections.Generic.IEnumerable<TDestination> { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafe<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TDestination> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, System.IObservable<System.Reactive.Unit> forceTransform)
where TDestination : notnull
where TSource : notnull
Expand Down
156 changes: 156 additions & 0 deletions src/DynamicData.Tests/Cache/TransformOnObservableFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;

using FluentAssertions;

using Xunit;

namespace DynamicData.Tests.Cache;

public class TransformOnObservableFixture : IDisposable
{
#if DEBUG
const int InitialCount = 7;
const int AddCount = 5;
const int RemoveCount = 3;
#else
const int InitialCount = 103;
const int AddCount = 53;
const int RemoveCount = 37;
#endif

private readonly ISourceCache<Animal, int> _animalCache = new SourceCache<Animal, int>(a => a.Id);
private readonly ChangeSetAggregator<Animal, int> _animalResults;
private readonly Faker<Animal> _animalFaker;
private readonly Randomizer _randomizer = new (0x2112_2112);

public TransformOnObservableFixture()
{
_animalFaker = Fakers.Animal.Clone().WithSeed(_randomizer);
_animalCache.AddOrUpdate(_animalFaker.Generate(InitialCount));
_animalResults = _animalCache.Connect().AsAggregator();
}

[Fact]
public void ResultContainsAllInitialChildren()
{
// Arrange

// Act
using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator();

// Assert
_animalResults.Data.Count.Should().Be(InitialCount);
results.Data.Count.Should().Be(InitialCount);
results.Messages.Count.Should().Be(1);
}

[Fact]
public void ResultContainsAddedValues()
{
// Arrange
using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator();

// Act
_animalCache.AddOrUpdate(_animalFaker.Generate(AddCount));

// Assert
_animalResults.Data.Count.Should().Be(InitialCount + AddCount);
results.Data.Count.Should().Be(_animalResults.Data.Count);
results.Messages.Count.Should().Be(2);
}

[Fact]
public void ResultDoesNotContainRemovedValues()
{
// Arrange
using var results = _animalCache.Connect().TransformOnObservable((ani, id) => Observable.Return(ani.Name)).AsAggregator();

// Act
_animalCache.RemoveKeys(_randomizer.ListItems(_animalCache.Items.ToList(), RemoveCount).Select(a => a.Id));

// Assert
_animalResults.Data.Count.Should().Be(InitialCount - RemoveCount);
results.Data.Count.Should().Be(_animalResults.Data.Count);
results.Messages.Count.Should().Be(2);
}

[Fact]
public async Task ResultUpdatesOnFutureValues()
{
IObservable<string> CreateChildObs(Animal a, int id) =>
Observable.Return(string.Empty).Concat(Observable.Timer(TimeSpan.FromMilliseconds(100)).Select(_ => a.Name));

// Arrange
var shared = _animalCache.Connect().TransformOnObservable(CreateChildObs).Publish();
using var results = shared.AsAggregator();
var task = Task.Run(async () => await shared);
using var cleanup = shared.Connect();
_animalCache.Dispose();

// Act
await task;

// Assert
_animalResults.Data.Count.Should().Be(InitialCount);
results.Data.Count.Should().Be(_animalResults.Data.Count);
results.Summary.Overall.Adds.Should().Be(InitialCount);
results.Summary.Overall.Updates.Should().Be(InitialCount);
results.Messages.Count.Should().BeGreaterThan(1);
_animalCache.Items.ForEach(animal => results.Data.Lookup(animal.Id).Should().Be(animal.Name));
}

[Theory]
[InlineData(false, false)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(true, true)]
public void ResultCompletesOnlyWhenSourceAndAllChildrenComplete(bool completeSource, bool completeChildren)
{
IObservable<string> CreateChildObs(Animal a, int id) =>
completeChildren
? Observable.Return(a.Name)
: Observable.Return(a.Name).Concat(Observable.Never<string>());

// Arrange
using var results = _animalCache.Connect().TransformOnObservable(CreateChildObs).AsAggregator();

// Act
if (completeSource)
{
_animalCache.Dispose();
}

// Assert
_animalResults.IsCompleted.Should().Be(completeSource);
results.IsCompleted.Should().Be(completeSource && completeChildren);
}

[Fact]
public void ResultFailsIfSourceFails()
{
// Arrange
var expectedError = new Exception("Expected");
var throwObservable = Observable.Throw<IChangeSet<Animal, int>>(expectedError);
using var results = _animalCache.Connect().Concat(throwObservable).TransformOnObservable(animal => Observable.Return(animal)).AsAggregator();

// Act
_animalCache.Dispose();

// Assert
results.Error.Should().Be(expectedError);
}

public void Dispose()
{
_animalCache.Dispose();
_animalResults.Dispose();
}
}
61 changes: 61 additions & 0 deletions src/DynamicData/Cache/Internal/TransformOnObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;

internal sealed class TransformOnObservable<TSource, TKey, TDestination>(IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, IObservable<TDestination>> transform)
where TSource : notnull
where TKey : notnull
where TDestination : notnull
{
public IObservable<IChangeSet<TDestination, TKey>> Run() => Observable.Create<IChangeSet<TDestination, TKey>>(observer =>
{
var locker = new object();
var cache = new ChangeAwareCache<TDestination, TKey>();
var updateCounter = 0;

// Helper to emit any pending changes when all the updates have been handled
void EmitChanges()
{
if (Interlocked.Decrement(ref updateCounter) == 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concept of an update counter to batch results is inspired. The same concept would probably benefit several other operations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the concept of an update counter to batch results is inspired. The same concept would probably benefit several other operations.

Thank you! 😊
I used it for TransformManyAsync to get behavior similar to TransformMany after you stressed the importance of minimizing the number of ChangeSets. I was also thinking that it might be beneficial to use in other places, so I'll keep a lookout for those.

There is some nuance to using it. For example, you have to increment the counter before the Synchronize so that updates coming in on other threads are counted and the emission downstream is deferred until it can be handled.

There are also some downsides:

  • More difficult (sometimes impossible) to deterministically know how many changesets you'll get
  • Possible to never get a downstream changeset (if updates happen constantly)

{
var changes = cache!.CaptureChanges();
if (changes.Count > 0)
{
observer.OnNext(changes);
}
}
}

IObservable<TDestination> CreateSubObservable(TSource obj, TKey key) =>
transform(obj, key)
.DistinctUntilChanged()
.Do(_ => Interlocked.Increment(ref updateCounter))
.Synchronize(locker!)
.Do(val => cache!.AddOrUpdate(val, key));

var shared = source
.Do(_ => Interlocked.Increment(ref updateCounter))
.Synchronize(locker!)
.Publish();

// Use MergeMany because it automatically handles OnCompleted/OnError correctly
var subMerged = shared
.MergeMany(CreateSubObservable)
.SubscribeSafe(_ => EmitChanges(), observer.OnError, observer.OnCompleted);

// Subscribe to the shared Observable to handle Remove events. MergeMany will unsubscribe from the sub-observable,
// but the corresponding key value needs to be removed from the Cache so the remove is observed downstream.
var subRemove = shared
.Do(changes => changes.Where(c => c.Reason == ChangeReason.Remove).ForEach(c => cache!.Remove(c.Key)))
.SubscribeSafe(_ => EmitChanges());

return new CompositeDisposable(shared.Connect(), subMerged, subRemove);
});
}
47 changes: 47 additions & 0 deletions src/DynamicData/Cache/ObservableCacheEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5222,6 +5222,53 @@ public static IObservable<IChangeSet<TDestination, TDestinationKey>> TransformMa
where TSource : notnull
where TSourceKey : notnull => source.TransformManySafeAsync((val, _) => manySelector(val), errorHandler, equalityComparer, comparer);

/// <summary>
/// Transforms each item in the ChangeSet into an Observable that provides the value for the Resulting ChangeSet.
/// </summary>
/// <typeparam name="TSource">The type of the source changeset.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TDestination">The type of the destination changeset.</typeparam>
/// <param name="source">The source changeset observable.</param>
/// <param name="transformFactory">Factory function to create the Observable that will provide the values in the result changeset from the given object in the source changeset.</param>
/// <returns>
/// A changeset whose value for a given key is the latest value emitted from the transformed Observable and will update to future values from that observable.
/// </returns>
/// <exception cref="ArgumentNullException">source
/// or
/// transformFactory.</exception>
public static IObservable<IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));
transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory));

return new TransformOnObservable<TSource, TKey, TDestination>(source, transformFactory).Run();
}

/// <summary>
/// Transforms each item in the ChangeSet into an Observable that provides the value for the Resulting ChangeSet.
/// </summary>
/// <typeparam name="TSource">The type of the source changeset.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TDestination">The type of the destination changeset.</typeparam>
/// <param name="source">The source changeset observable.</param>
/// <param name="transformFactory">Factory function to create the Observable that will provide the values in the result changeset from the given object in the source changeset.</param>
/// <returns>
/// A changeset whose value for a given key is the latest value emitted from the transformed Observable and will update to future values from that observable.
/// </returns>
/// <exception cref="ArgumentNullException">source or transformFactory.</exception>
public static IObservable<IChangeSet<TDestination, TKey>> TransformOnObservable<TSource, TKey, TDestination>(this IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, IObservable<TDestination>> transformFactory)
where TSource : notnull
where TKey : notnull
where TDestination : notnull
{
transformFactory.ThrowArgumentNullExceptionIfNull(nameof(transformFactory));

return source.TransformOnObservable((obj, _) => transformFactory(obj));
}

/// <summary>
/// Projects each update item to a new form using the specified transform function,
/// providing an error handling action to safely handle transform errors without killing the stream.
Expand Down
Loading